diff --git a/src/engine/api/geary-engine-folder.vala b/src/engine/api/geary-engine-folder.vala index 9ba50f08..ffbfcab5 100644 --- a/src/engine/api/geary-engine-folder.vala +++ b/src/engine/api/geary-engine-folder.vala @@ -693,7 +693,8 @@ private class Geary.EngineFolder : Geary.AbstractFolder { // or it's simply not present. If it's not present, want to ensure that the Message-ID // is requested, as that's a good way to manage duplicate messages in the system Geary.Email.Field available_fields; - bool is_present = yield local_folder.is_email_present(id, out available_fields, cancellable); + bool is_present = yield local_folder.is_email_present_async(id, out available_fields, + cancellable); if (!is_present) fields = fields.set(Geary.Email.Field.REFERENCES); diff --git a/src/engine/api/geary-generic-imap-folder.vala b/src/engine/api/geary-generic-imap-folder.vala index c92b2992..c9ea5b5f 100644 --- a/src/engine/api/geary-generic-imap-folder.vala +++ b/src/engine/api/geary-generic-imap-folder.vala @@ -72,8 +72,10 @@ private class Geary.GenericImapFolder : Geary.EngineFolder { for (;;) { Geary.EmailIdentifier start_id = new Imap.EmailIdentifier(new Imap.UID(uid_start_value)); Geary.Email.Field available_fields; - if (!yield imap_local_folder.is_email_present(start_id, out available_fields, cancellable)) + if (!yield imap_local_folder.is_email_present_async(start_id, out available_fields, + cancellable)) { break; + } debug("already have UID %lld in %s local store", uid_start_value, to_string()); diff --git a/src/engine/api/geary-local-interfaces.vala b/src/engine/api/geary-local-interfaces.vala index ea02618e..fe81ab81 100644 --- a/src/engine/api/geary-local-interfaces.vala +++ b/src/engine/api/geary-local-interfaces.vala @@ -24,7 +24,7 @@ public interface Geary.LocalAccount : Object, Geary.Account { } public interface Geary.LocalFolder : Object, Geary.Folder { - public async abstract bool is_email_present(Geary.EmailIdentifier id, + public async abstract bool is_email_present_async(Geary.EmailIdentifier id, out Geary.Email.Field available_fields, Cancellable? cancellable = null) throws Error; /** diff --git a/src/engine/imap/transport/imap-client-connection.vala b/src/engine/imap/transport/imap-client-connection.vala index 75ddc97f..ef494a31 100644 --- a/src/engine/imap/transport/imap-client-connection.vala +++ b/src/engine/imap/transport/imap-client-connection.vala @@ -156,7 +156,7 @@ public class Geary.Imap.ClientConnection { yield cmd.serialize(ser); - send_mutex.release(token); + send_mutex.release(ref token); if (flush_timeout_id == 0) flush_timeout_id = Timeout.add(FLUSH_TIMEOUT_MSEC, on_flush_timeout); diff --git a/src/engine/imap/transport/imap-client-session-manager.vala b/src/engine/imap/transport/imap-client-session-manager.vala index bab2f6cb..1b4bd65e 100644 --- a/src/engine/imap/transport/imap-client-session-manager.vala +++ b/src/engine/imap/transport/imap-client-session-manager.vala @@ -220,7 +220,7 @@ public class Geary.Imap.ClientSessionManager { if (found_session == null) found_session = yield create_new_authorized_session(cancellable); - sessions_mutex.release(token); + sessions_mutex.release(ref token); return found_session; } diff --git a/src/engine/nonblocking/nonblocking-abstract-semaphore.vala b/src/engine/nonblocking/nonblocking-abstract-semaphore.vala index d0d33677..96c57264 100644 --- a/src/engine/nonblocking/nonblocking-abstract-semaphore.vala +++ b/src/engine/nonblocking/nonblocking-abstract-semaphore.vala @@ -106,8 +106,11 @@ public abstract class Geary.NonblockingAbstractSemaphore { pending.cancelled.disconnect(on_pending_cancelled); - if (pending.passed) + if (pending.passed) { + check_user_cancelled(cancellable); + return; + } } } diff --git a/src/engine/nonblocking/nonblocking-mutex.vala b/src/engine/nonblocking/nonblocking-mutex.vala index 7286cd8a..f1818341 100644 --- a/src/engine/nonblocking/nonblocking-mutex.vala +++ b/src/engine/nonblocking/nonblocking-mutex.vala @@ -5,10 +5,12 @@ */ public class Geary.NonblockingMutex { + public const int INVALID_TOKEN = -1; + private NonblockingSpinlock spinlock = new NonblockingSpinlock(); private bool locked = false; - private int next_token = 0; - private int locked_token = -1; + private int next_token = INVALID_TOKEN + 1; + private int locked_token = INVALID_TOKEN; public NonblockingMutex() { } @@ -17,7 +19,9 @@ public class Geary.NonblockingMutex { for (;;) { if (!locked) { locked = true; - locked_token = next_token++; + do { + locked_token = next_token++; + } while (locked_token == INVALID_TOKEN); return locked_token; } @@ -26,12 +30,13 @@ public class Geary.NonblockingMutex { } } - public void release(int token) throws Error { - if (token != locked_token) + public void release(ref int token) throws Error { + if (token != locked_token || token == INVALID_TOKEN) throw new IOError.INVALID_ARGUMENT("Token %d is not the lock token", token); locked = false; - locked_token = -1; + token = INVALID_TOKEN; + locked_token = INVALID_TOKEN; spinlock.notify(); } diff --git a/src/engine/sqlite/abstract/sqlite-database.vala b/src/engine/sqlite/abstract/sqlite-database.vala index f05dba28..9f8b7907 100644 --- a/src/engine/sqlite/abstract/sqlite-database.vala +++ b/src/engine/sqlite/abstract/sqlite-database.vala @@ -33,5 +33,12 @@ public abstract class Geary.Sqlite.Database { return table; } + + public async Transaction begin_transaction_async(string name, Cancellable? cancellable) throws Error { + Transaction t = new Transaction(db, name); + yield t.begin_async(cancellable); + + return t; + } } diff --git a/src/engine/sqlite/abstract/sqlite-table.vala b/src/engine/sqlite/abstract/sqlite-table.vala index 1caa7188..24324c29 100644 --- a/src/engine/sqlite/abstract/sqlite-table.vala +++ b/src/engine/sqlite/abstract/sqlite-table.vala @@ -5,12 +5,6 @@ */ public abstract class Geary.Sqlite.Table { - internal SQLHeavy.Database db { - get { - return gdb.db; - } - } - internal weak Geary.Sqlite.Database gdb; internal SQLHeavy.Table table; @@ -31,6 +25,37 @@ public abstract class Geary.Sqlite.Table { return !(i == 0); } + protected async Transaction obtain_lock_async(Transaction? supplied_lock, string single_use_name, + Cancellable? cancellable) throws Error { + // if the user supplied the lock for multiple operations, use that + if (supplied_lock != null) { + if (!supplied_lock.is_locked) + yield supplied_lock.begin_async(cancellable); + + return supplied_lock; + } + + // create a single-use lock for the transaction + return yield begin_transaction_async(single_use_name, cancellable); + } + + // Technically this only needs to be called for locks that have a required commit. + protected async void release_lock_async(Transaction? supplied_lock, Transaction actual_lock, + Cancellable? cancellable) throws Error { + // if user supplied a lock, don't touch it + if (supplied_lock != null) + return; + + // only commit if required (and the lock was single-use) + if (actual_lock.is_commit_required) + yield actual_lock.commit_async(cancellable); + } + + protected async Transaction begin_transaction_async(string name, Cancellable? cancellable) + throws Error { + return yield gdb.begin_transaction_async(name, cancellable); + } + public string to_string() { return table.name; } diff --git a/src/engine/sqlite/abstract/sqlite-transaction.vala b/src/engine/sqlite/abstract/sqlite-transaction.vala new file mode 100644 index 00000000..e9446248 --- /dev/null +++ b/src/engine/sqlite/abstract/sqlite-transaction.vala @@ -0,0 +1,99 @@ +/* Copyright 2011 Yorba Foundation + * + * This software is licensed under the GNU Lesser General Public License + * (version 2.1 or later). See the COPYING file in this distribution. + */ + +public class Geary.Sqlite.Transaction { + private static NonblockingMutex? transaction_lock = null; + private static int next_id = 0; + + public bool is_locked { get { + return claim_stub != NonblockingMutex.INVALID_TOKEN; + } } + + public bool is_commit_required { get; private set; default = false; } + + private SQLHeavy.Database db; + private string name; + private int id; + private int claim_stub = NonblockingMutex.INVALID_TOKEN; + + internal Transaction(SQLHeavy.Database db, string name) throws Error { + if (transaction_lock == null) + transaction_lock = new NonblockingMutex(); + + this.db = db; + this.name = name; + id = next_id++; + } + + ~Transaction() { + if (is_locked) { + if (is_commit_required) + warning("[%s] destroyed without committing or rolling back changes", to_string()); + + resolve(false, null); + } + } + + public async void begin_async(Cancellable? cancellable = null) throws Error { + assert(!is_locked); +#if TRACE_TRANSACTIONS + debug("[%s] claiming lock", to_string()); +#endif + claim_stub = yield transaction_lock.claim_async(cancellable); +#if TRACE_TRANSACTIONS + debug("[%s] lock claimed", to_string()); +#endif + } + + private void resolve(bool commit, Cancellable? cancellable) throws Error { + if (!is_locked) { + warning("[%s] attempting to resolve an unlocked transaction", to_string()); + + return; + } + + if (commit) + is_commit_required = false; + +#if TRACE_TRANSACTIONS + debug("[%s] releasing lock", to_string()); +#endif + transaction_lock.release(ref claim_stub); +#if TRACE_TRANSACTIONS + debug("[%s] released lock", to_string()); +#endif + } + + public SQLHeavy.Query prepare(string sql) throws Error { + return db.prepare(sql); + } + + public async void commit_async(Cancellable? cancellable) throws Error { + resolve(true, cancellable); + } + + public async void commit_if_required_async(Cancellable? cancellable) throws Error { + if (is_commit_required) + resolve(true, cancellable); + } + + public async void rollback_async(Cancellable? cancellable) throws Error { + resolve(false, cancellable); + } + + public void set_commit_required() { +#if TRACE_TRANSACTIONS + debug("[%s] commit required", to_string()); +#endif + is_commit_required = true; + } + + public string to_string() { + return "%d %s (%s%s)".printf(id, name, is_locked ? "locked" : "unlocked", + is_commit_required ? ", commit required" : ""); + } +} + diff --git a/src/engine/sqlite/api/sqlite-account.vala b/src/engine/sqlite/api/sqlite-account.vala index df24460c..7b2033de 100644 --- a/src/engine/sqlite/api/sqlite-account.vala +++ b/src/engine/sqlite/api/sqlite-account.vala @@ -40,18 +40,20 @@ public class Geary.Sqlite.Account : Geary.AbstractAccount, Geary.LocalAccount { return Geary.Email.Field.NONE; } - private async int64 fetch_id_async(Geary.FolderPath path, Cancellable? cancellable = null) - throws Error { - FolderRow? row = yield folder_table.fetch_descend_async(path.as_list(), cancellable); + private async int64 fetch_id_async(Transaction? transaction, Geary.FolderPath path, + Cancellable? cancellable = null) throws Error { + FolderRow? row = yield folder_table.fetch_descend_async(transaction, path.as_list(), + cancellable); if (row == null) throw new EngineError.NOT_FOUND("Cannot find local path to %s", path.to_string()); return row.id; } - private async int64 fetch_parent_id_async(Geary.FolderPath path, Cancellable? cancellable = null) - throws Error { - return path.is_root() ? Row.INVALID_ID : yield fetch_id_async(path.get_parent(), cancellable); + private async int64 fetch_parent_id_async(Transaction? transaction, Geary.FolderPath path, + Cancellable? cancellable = null) throws Error { + return path.is_root() ? Row.INVALID_ID : yield fetch_id_async(transaction, path.get_parent(), + cancellable); } public async void clone_folder_async(Geary.Folder folder, Cancellable? cancellable = null) @@ -63,14 +65,19 @@ public class Geary.Sqlite.Account : Geary.AbstractAccount, Geary.LocalAccount { // properties *must* be available to perform a clone assert(imap_folder_properties != null); - int64 parent_id = yield fetch_parent_id_async(folder.get_path(), cancellable); + Transaction transaction = yield db.begin_transaction_async("Account.clone_folder_async", + cancellable); - int64 folder_id = yield folder_table.create_async(new FolderRow(folder_table, + int64 parent_id = yield fetch_parent_id_async(transaction, folder.get_path(), cancellable); + + int64 folder_id = yield folder_table.create_async(transaction, new FolderRow(folder_table, imap_folder.get_path().basename, parent_id), cancellable); - yield folder_properties_table.create_async( + yield folder_properties_table.create_async(transaction, new ImapFolderPropertiesRow.from_imap_properties(folder_properties_table, folder_id, - imap_folder_properties)); + imap_folder_properties), cancellable); + + yield transaction.commit_async(cancellable); } public async void update_folder_async(Geary.Folder folder, Cancellable? cancellable = null) @@ -82,32 +89,40 @@ public class Geary.Sqlite.Account : Geary.AbstractAccount, Geary.LocalAccount { // properties *must* be available assert(imap_folder_properties != null); - int64 parent_id = yield fetch_parent_id_async(folder.get_path(), cancellable); - - FolderRow? row = yield folder_table.fetch_async(parent_id, folder.get_path().basename, + Transaction transaction = yield db.begin_transaction_async("Account.update_folder_async", cancellable); + + int64 parent_id = yield fetch_parent_id_async(transaction, folder.get_path(), cancellable); + + FolderRow? row = yield folder_table.fetch_async(transaction, parent_id, + folder.get_path().basename, cancellable); if (row == null) throw new EngineError.NOT_FOUND("Can't find in local store %s", folder.get_path().to_string()); - yield folder_properties_table.update_async(row.id, + yield folder_properties_table.update_async(transaction, row.id, new ImapFolderPropertiesRow.from_imap_properties(folder_properties_table, row.id, - imap_folder_properties)); + imap_folder_properties), cancellable); FolderReference? folder_ref = folder_refs.get(folder.get_path()); if (folder_ref != null) ((Geary.Sqlite.Folder) folder_ref.get_reference()).update_properties(imap_folder_properties); + + yield transaction.commit_async(cancellable); } public override async Gee.Collection list_folders_async(Geary.FolderPath? parent, Cancellable? cancellable = null) throws Error { + Transaction transaction = yield db.begin_transaction_async("Account.list_folders_async", + cancellable); + int64 parent_id = (parent != null) - ? yield fetch_id_async(parent, cancellable) + ? yield fetch_id_async(transaction, parent, cancellable) : Row.INVALID_ID; if (parent != null) assert(parent_id != Row.INVALID_ID); - Gee.List rows = yield folder_table.list_async(parent_id, cancellable); + Gee.List rows = yield folder_table.list_async(transaction, parent_id, cancellable); if (rows.size == 0) { throw new EngineError.NOT_FOUND("No local folders in %s", (parent != null) ? parent.get_fullpath() : "root"); @@ -115,8 +130,8 @@ public class Geary.Sqlite.Account : Geary.AbstractAccount, Geary.LocalAccount { Gee.Collection folders = new Gee.ArrayList(); foreach (FolderRow row in rows) { - ImapFolderPropertiesRow? properties = yield folder_properties_table.fetch_async(row.id, - cancellable); + ImapFolderPropertiesRow? properties = yield folder_properties_table.fetch_async( + transaction, row.id, cancellable); Geary.FolderPath path = (parent != null) ? parent.get_child(row.name) @@ -136,7 +151,7 @@ public class Geary.Sqlite.Account : Geary.AbstractAccount, Geary.LocalAccount { public override async bool folder_exists_async(Geary.FolderPath path, Cancellable? cancellable = null) throws Error { try { - int64 id = yield fetch_id_async(path, cancellable); + int64 id = yield fetch_id_async(null, path, cancellable); return (id != Row.INVALID_ID); } catch (EngineError err) { @@ -154,14 +169,18 @@ public class Geary.Sqlite.Account : Geary.AbstractAccount, Geary.LocalAccount { if (folder != null) return folder; + Transaction transaction = yield db.begin_transaction_async("Account.fetch_folder_async", + cancellable); + // locate in database - FolderRow? row = yield folder_table.fetch_descend_async(path.as_list(), cancellable); + FolderRow? row = yield folder_table.fetch_descend_async(transaction, path.as_list(), + cancellable); if (row == null) throw new EngineError.NOT_FOUND("%s not found in local database", path.to_string()); // fetch it's IMAP-specific properties - ImapFolderPropertiesRow? properties = yield folder_properties_table.fetch_async(row.id, - cancellable); + ImapFolderPropertiesRow? properties = yield folder_properties_table.fetch_async( + transaction, row.id, cancellable); return create_sqlite_folder(row, (properties != null) ? properties.get_imap_folder_properties() : null, path); @@ -169,7 +188,7 @@ public class Geary.Sqlite.Account : Geary.AbstractAccount, Geary.LocalAccount { public async bool has_message_id_async(Geary.RFC822.MessageID message_id, out int count, Cancellable? cancellable = null) throws Error { - count = yield message_table.search_message_id_count_async(message_id); + count = yield message_table.search_message_id_count_async(null, message_id, cancellable); return (count > 0); } diff --git a/src/engine/sqlite/api/sqlite-folder.vala b/src/engine/sqlite/api/sqlite-folder.vala index 398d790e..5aaf7dac 100644 --- a/src/engine/sqlite/api/sqlite-folder.vala +++ b/src/engine/sqlite/api/sqlite-folder.vala @@ -76,26 +76,33 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear check_open(); // TODO: This can be cached and updated when changes occur - return yield location_table.fetch_count_for_folder_async(folder_row.id, cancellable); + return yield location_table.fetch_count_for_folder_async(null, folder_row.id, cancellable); } public override async void create_email_async(Geary.Email email, Cancellable? cancellable = null) throws Error { + yield atomic_create_email_async(null, email, cancellable); + } + + private async void atomic_create_email_async(Transaction? supplied_transaction, Geary.Email email, + Cancellable? cancellable) throws Error { check_open(); Geary.Imap.EmailIdentifier id = (Geary.Imap.EmailIdentifier) email.id; + Transaction transaction = supplied_transaction ?? yield db.begin_transaction_async( + "Folder.atomic_create_email_async", cancellable); + // See if it already exists; first by UID (which is only guaranteed to be unique in a folder, // not account-wide) int64 message_id; - if (yield location_table.does_ordering_exist_async(folder_row.id, email.location.ordering, - out message_id, cancellable)) { + if (yield location_table.does_ordering_exist_async(transaction, folder_row.id, + email.location.ordering, out message_id, cancellable)) { throw new EngineError.ALREADY_EXISTS("Email with UID %s already exists in %s", id.uid.to_string(), to_string()); } - // TODO: The following steps should be atomic - message_id = yield message_table.create_async( + message_id = yield message_table.create_async(transaction, new MessageRow.from_email(message_table, email), cancellable); @@ -103,7 +110,7 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear // (which fulfills the requirements for the ordering column) MessageLocationRow location_row = new MessageLocationRow(location_table, Row.INVALID_ID, message_id, folder_row.id, email.location.ordering, email.location.position); - yield location_table.create_async(location_row, cancellable); + yield location_table.create_async(transaction, location_row, cancellable); // only write out the IMAP email properties if they're supplied and there's something to // write out -- no need to create an empty row @@ -111,10 +118,17 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear if (email.fields.fulfills(Geary.Email.Field.PROPERTIES) && properties != null) { ImapMessagePropertiesRow properties_row = new ImapMessagePropertiesRow.from_imap_properties( imap_message_properties_table, message_id, properties); - yield imap_message_properties_table.create_async(properties_row, cancellable); + yield imap_message_properties_table.create_async(transaction, properties_row, cancellable); } - notify_messages_appended(yield get_email_count_async(cancellable)); + int count = yield location_table.fetch_count_for_folder_async(transaction, folder_row.id, + cancellable); + + // only commit if not supplied a transaction + if (supplied_transaction == null) + yield transaction.commit_async(cancellable); + + notify_messages_appended(count); } public override async Gee.List? list_email_async(int low, int count, @@ -127,10 +141,13 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear if (count == 0) return null; - Gee.List? list = yield location_table.list_async(folder_row.id, low, - count, cancellable); + Transaction transaction = yield db.begin_transaction_async("Folder.list_email_async", + cancellable); - return yield list_email(list, required_fields, cancellable); + Gee.List? list = yield location_table.list_async(transaction, + folder_row.id, low, count, cancellable); + + return yield list_email(transaction, list, required_fields, cancellable); } public override async Gee.List? list_email_sparse_async(int[] by_position, @@ -138,24 +155,32 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear throws Error { check_open(); - Gee.List? list = yield location_table.list_sparse_async(folder_row.id, - by_position, cancellable); + Transaction transaction = yield db.begin_transaction_async("Folder.list_email_sparse_async", + cancellable); - return yield list_email(list, required_fields, cancellable); + Gee.List? list = yield location_table.list_sparse_async(transaction, + folder_row.id, by_position, cancellable); + + return yield list_email(transaction, list, required_fields, cancellable); } public async Gee.List? list_email_uid_async(Geary.Imap.UID? low, Geary.Imap.UID? high, Geary.Email.Field required_fields, Cancellable? cancellable = null) throws Error { check_open(); - Gee.List? list = yield location_table.list_ordering_async(folder_row.id, - (low != null) ? low.value : 1, (high != null) ? high.value : -1, cancellable); + Transaction transaction = yield db.begin_transaction_async("Folder.list_email_uid_async", + cancellable); - return yield list_email(list, required_fields, cancellable); + Gee.List? list = yield location_table.list_ordering_async(transaction, + folder_row.id,(low != null) ? low.value : 1, (high != null) ? high.value : -1, + cancellable); + + return yield list_email(transaction, list, required_fields, cancellable); } - private async Gee.List? list_email(Gee.List? list, - Geary.Email.Field required_fields, Cancellable? cancellable) throws Error { + private async Gee.List? list_email(Transaction transaction, + Gee.List? list, Geary.Email.Field required_fields, Cancellable? cancellable) + throws Error { check_open(); if (list == null || list.size == 0) @@ -167,8 +192,8 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear Gee.List emails = new Gee.ArrayList(); foreach (MessageLocationRow location_row in list) { // fetch the message itself - MessageRow? message_row = yield message_table.fetch_async(location_row.message_id, - required_fields, cancellable); + MessageRow? message_row = yield message_table.fetch_async(transaction, + location_row.message_id, required_fields, cancellable); assert(message_row != null); // only add to the list if the email contains all the required fields (because @@ -178,14 +203,14 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear ImapMessagePropertiesRow? properties = null; if (required_fields.require(Geary.Email.Field.PROPERTIES)) { - properties = yield imap_message_properties_table.fetch_async(location_row.message_id, - cancellable); + properties = yield imap_message_properties_table.fetch_async(transaction, + location_row.message_id, cancellable); if (properties == null) continue; } Geary.Imap.UID uid = new Geary.Imap.UID(location_row.ordering); - int position = yield location_row.get_position_async(cancellable); + int position = yield location_row.get_position_async(transaction, cancellable); if (position == -1) { debug("Unable to locate position of email during list of %s, dropping", to_string()); @@ -210,15 +235,18 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear Geary.Imap.UID uid = ((Imap.EmailIdentifier) id).uid; - MessageLocationRow? location_row = yield location_table.fetch_by_ordering_async(folder_row.id, - uid.value, cancellable); + Transaction transaction = yield db.begin_transaction_async("Folder.fetch_email_async", + cancellable); + + MessageLocationRow? location_row = yield location_table.fetch_by_ordering_async(transaction, + folder_row.id, uid.value, cancellable); if (location_row == null) { throw new EngineError.NOT_FOUND("No message with ID %s in folder %s", id.to_string(), to_string()); } - MessageRow? message_row = yield message_table.fetch_async(location_row.message_id, - required_fields, cancellable); + MessageRow? message_row = yield message_table.fetch_async(transaction, + location_row.message_id, required_fields, cancellable); if (message_row == null) { throw new EngineError.NOT_FOUND("No message with ID %s in folder %s", id.to_string(), to_string()); @@ -234,8 +262,8 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear ImapMessagePropertiesRow? properties = null; if (required_fields.require(Geary.Email.Field.PROPERTIES)) { - properties = yield imap_message_properties_table.fetch_async(location_row.message_id, - cancellable); + properties = yield imap_message_properties_table.fetch_async(transaction, + location_row.message_id, cancellable); if (properties == null) { throw new EngineError.INCOMPLETE_MESSAGE( "Message %s in folder %s does not have PROPERTIES field", id.to_string(), @@ -243,7 +271,7 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear } } - int position = yield location_row.get_position_async(cancellable); + int position = yield location_row.get_position_async(transaction, cancellable); if (position == -1) { throw new EngineError.NOT_FOUND("Unable to determine position of email %s in %s", id.to_string(), to_string()); @@ -259,7 +287,8 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear public async Geary.Imap.UID? get_earliest_uid_async(Cancellable? cancellable = null) throws Error { check_open(); - int64 ordering = yield location_table.get_earliest_ordering_async(folder_row.id, cancellable); + int64 ordering = yield location_table.get_earliest_ordering_async(null, folder_row.id, + cancellable); return (ordering >= 1) ? new Geary.Imap.UID(ordering) : null; } @@ -268,19 +297,27 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear throws Error { check_open(); + Transaction transaction = yield db.begin_transaction_async("Folder.remove_email_async", + cancellable); + // TODO: Right now, deleting an email is merely detaching its association with a folder // (since it may be located in multiple folders). This means at some point in the future // a vacuum will be required to remove emails that are completely unassociated with the // account - if (!yield location_table.remove_by_position_async(folder_row.id, position, cancellable)) { + if (!yield location_table.remove_by_position_async(transaction, folder_row.id, position, cancellable)) { throw new EngineError.NOT_FOUND("Message #%d in local store of %s not found", position, to_string()); } - notify_message_removed(position, yield get_email_count_async(cancellable)); + int count = yield location_table.fetch_count_for_folder_async(transaction, folder_row.id, + cancellable); + + yield transaction.commit_async(cancellable); + + notify_message_removed(position, count); } - public async bool is_email_present(Geary.EmailIdentifier id, out Geary.Email.Field available_fields, + public async bool is_email_present_async(Geary.EmailIdentifier id, out Geary.Email.Field available_fields, Cancellable? cancellable = null) throws Error { check_open(); @@ -288,13 +325,16 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear available_fields = Geary.Email.Field.NONE; - MessageLocationRow? location_row = yield location_table.fetch_by_ordering_async(folder_row.id, - uid.value, cancellable); + Transaction transaction = yield db.begin_transaction_async("Folder.is_email_present", + cancellable); + + MessageLocationRow? location_row = yield location_table.fetch_by_ordering_async(transaction, + folder_row.id, uid.value, cancellable); if (location_row == null) return false; - return yield message_table.fetch_fields_async(location_row.message_id, out available_fields, - cancellable); + return yield message_table.fetch_fields_async(transaction, location_row.message_id, + out available_fields, cancellable); } public async bool is_email_associated_async(Geary.Email email, Cancellable? cancellable = null) @@ -302,7 +342,7 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear check_open(); int64 message_id; - return yield location_table.does_ordering_exist_async(folder_row.id, + return yield location_table.does_ordering_exist_async(null, folder_row.id, ((Geary.Imap.EmailIdentifier) email.id).uid.value, out message_id, cancellable); } @@ -313,10 +353,13 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear Geary.Imap.EmailLocation location = (Geary.Imap.EmailLocation) email.location; Geary.Imap.EmailIdentifier id = (Geary.Imap.EmailIdentifier) email.id; + Transaction transaction = yield db.begin_transaction_async("Folder.update_email_async", + cancellable); + // See if the message can be identified in the folder (which both reveals association and // a message_id that can be used for a merge; note that this works without a Message-ID) int64 message_id; - bool associated = yield location_table.does_ordering_exist_async(folder_row.id, + bool associated = yield location_table.does_ordering_exist_async(transaction, folder_row.id, id.uid.value, out message_id, cancellable); // If working around the lack of a Message-ID and not associated with this folder, treat @@ -327,29 +370,35 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear if (!duplicate_okay) throw new EngineError.INCOMPLETE_MESSAGE("No Message-ID"); - yield create_email_async(email, cancellable); + yield atomic_create_email_async(transaction, email, cancellable); } else { - yield merge_email_async(message_id, email, cancellable); + yield merge_email_async(transaction, message_id, email, cancellable); } + yield transaction.commit_if_required_async(cancellable); + return; } // If not associated, find message with matching Message-ID if (!associated) { - Gee.List? list = yield message_table.search_message_id_async(email.message_id, - cancellable); + Gee.List? list = yield message_table.search_message_id_async(transaction, + email.message_id, cancellable); // If none found, this operation is a create if (list == null || list.size == 0) { - yield create_email_async(email, cancellable); + yield atomic_create_email_async(transaction, email, cancellable); + + yield transaction.commit_if_required_async(cancellable); return; } // Too many found turns this operation into a create if (list.size != 1) { - yield create_email_async(email, cancellable); + yield atomic_create_email_async(transaction, email, cancellable); + + yield transaction.commit_if_required_async(cancellable); return; } @@ -361,8 +410,8 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear // TODO: Need to lock the database during this operation, as these steps should be atomic. if (!associated) { // see if an email exists at this position - MessageLocationRow? location_row = yield location_table.fetch_async(folder_row.id, - location.position); + MessageLocationRow? location_row = yield location_table.fetch_async(transaction, + folder_row.id, location.position, cancellable); if (location_row != null) { throw new EngineError.ALREADY_EXISTS("Email already exists at position %d in %s", email.location.position, to_string()); @@ -371,17 +420,18 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear // insert email at supplied position location_row = new MessageLocationRow(location_table, Row.INVALID_ID, message_id, folder_row.id, id.uid.value, location.position); - yield location_table.create_async(location_row, cancellable); + yield location_table.create_async(transaction, location_row, cancellable); } // Merge any new information with the existing message in the local store - yield merge_email_async(message_id, email, cancellable); + yield merge_email_async(transaction, message_id, email, cancellable); + + yield transaction.commit_if_required_async(cancellable); // Done. } - // TODO: The database should be locked around this method, as it should be atomic. - private async void merge_email_async(int64 message_id, Geary.Email email, + private async void merge_email_async(Transaction transaction, int64 message_id, Geary.Email email, Cancellable? cancellable = null) throws Error { assert(message_id != Row.INVALID_ID); @@ -389,7 +439,7 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear if (email.fields == Geary.Email.Field.NONE) return; - MessageRow? message_row = yield message_table.fetch_async(message_id, email.fields, + MessageRow? message_row = yield message_table.fetch_async(transaction, message_id, email.fields, cancellable); assert(message_row != null); @@ -397,7 +447,7 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear // possible nothing has changed or been added if (message_row.fields != Geary.Email.Field.NONE) - yield message_table.merge_async(message_row, cancellable); + yield message_table.merge_async(transaction, message_row, cancellable); // update IMAP properties if (email.fields.fulfills(Geary.Email.Field.PROPERTIES)) { @@ -407,8 +457,8 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear long rfc822_size = (properties.rfc822_size != null) ? properties.rfc822_size.value : -1; - yield imap_message_properties_table.update_async(message_id, properties.flags.serialize(), - internaldate, rfc822_size, cancellable); + yield imap_message_properties_table.update_async(transaction, message_id, + properties.flags.serialize(), internaldate, rfc822_size, cancellable); } } } diff --git a/src/engine/sqlite/email/sqlite-folder-table.vala b/src/engine/sqlite/email/sqlite-folder-table.vala index 09e92012..21166020 100644 --- a/src/engine/sqlite/email/sqlite-folder-table.vala +++ b/src/engine/sqlite/email/sqlite-folder-table.vala @@ -16,38 +16,38 @@ public class Geary.Sqlite.FolderTable : Geary.Sqlite.Table { base (gdb, table); } - private SQLHeavy.Query create_query(SQLHeavy.Queryable? queryable = null) throws SQLHeavy.Error { - SQLHeavy.Queryable q = queryable ?? db; - SQLHeavy.Query query = q.prepare( - "INSERT INTO FolderTable (name, parent_id) VALUES (?, ?)"); + public async int64 create_async(Transaction? transaction, FolderRow row, + Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "FolderTable.create_async", + cancellable); - return query; - } - - private void create_binding(SQLHeavy.Query query, FolderRow row) throws SQLHeavy.Error { - query.clear(); + SQLHeavy.Query query = locked.prepare( + "INSERT INTO FolderTable (name, parent_id) VALUES (?, ?)"); query.bind_string(0, row.name); if (row.parent_id != Row.INVALID_ID) query.bind_int64(1, row.parent_id); else query.bind_null(1); - } - - public async int64 create_async(FolderRow row, Cancellable? cancellable = null) throws Error { - SQLHeavy.Query query = create_query(); - create_binding(query, row); - return yield query.execute_insert_async(cancellable); + int64 id = yield query.execute_insert_async(cancellable); + locked.set_commit_required(); + + yield release_lock_async(transaction, locked, cancellable); + + return id; } - public async Gee.List list_async(int64 parent_id, Cancellable? cancellable = null) - throws Error { + public async Gee.List list_async(Transaction? transaction, int64 parent_id, + Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "FolderTable.list_async", + cancellable); + SQLHeavy.Query query; if (parent_id != Row.INVALID_ID) { - query = db.prepare("SELECT * FROM FolderTable WHERE parent_id=?"); + query = locked.prepare("SELECT * FROM FolderTable WHERE parent_id=?"); query.bind_int64(0, parent_id); } else { - query = db.prepare("SELECT * FROM FolderTable WHERE parent_id IS NULL"); + query = locked.prepare("SELECT * FROM FolderTable WHERE parent_id IS NULL"); } SQLHeavy.QueryResult result = yield query.execute_async(cancellable); @@ -62,15 +62,18 @@ public class Geary.Sqlite.FolderTable : Geary.Sqlite.Table { return rows; } - public async FolderRow? fetch_async(int64 parent_id, string name, Cancellable? cancellable = null) - throws Error { + public async FolderRow? fetch_async(Transaction? transaction, int64 parent_id, + string name, Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "FolderTable.fetch_async", + cancellable); + SQLHeavy.Query query; if (parent_id != Row.INVALID_ID) { - query = db.prepare("SELECT * FROM FolderTable WHERE parent_id=? AND name=?"); + query = locked.prepare("SELECT * FROM FolderTable WHERE parent_id=? AND name=?"); query.bind_int64(0, parent_id); query.bind_string(1, name); } else { - query = db.prepare("SELECT * FROM FolderTable WHERE name=? AND parent_id IS NULL"); + query = locked.prepare("SELECT * FROM FolderTable WHERE name=? AND parent_id IS NULL"); query.bind_string(0, name); } @@ -79,10 +82,13 @@ public class Geary.Sqlite.FolderTable : Geary.Sqlite.Table { return (!result.finished) ? new FolderRow.from_query_result(this, result) : null; } - public async FolderRow? fetch_descend_async(Gee.List path, Cancellable? cancellable = null) - throws Error { + public async FolderRow? fetch_descend_async(Transaction? transaction, + Gee.List path, Cancellable? cancellable) throws Error { assert(path.size > 0); + Transaction locked = yield obtain_lock_async(transaction, "FolderTable.fetch_descend_async", + cancellable); + int64 parent_id = Row.INVALID_ID; // walk the folder tree to the final node (which is at length - 1 - 1) @@ -90,11 +96,13 @@ public class Geary.Sqlite.FolderTable : Geary.Sqlite.Table { for (int ctr = 0; ctr < length - 1; ctr++) { SQLHeavy.Query query; if (parent_id != Row.INVALID_ID) { - query = db.prepare("SELECT id FROM FolderTable WHERE parent_id=? AND name=?"); + query = locked.prepare( + "SELECT id FROM FolderTable WHERE parent_id=? AND name=?"); query.bind_int64(0, parent_id); query.bind_string(1, path[ctr]); } else { - query = db.prepare("SELECT id FROM FolderTable WHERE parent_id IS NULL AND name=?"); + query = locked.prepare( + "SELECT id FROM FolderTable WHERE parent_id IS NULL AND name=?"); query.bind_string(0, path[ctr]); } @@ -117,7 +125,7 @@ public class Geary.Sqlite.FolderTable : Geary.Sqlite.Table { } // do full fetch on this folder - return yield fetch_async(parent_id, path.last(), cancellable); + return yield fetch_async(locked, parent_id, path.last(), cancellable); } } diff --git a/src/engine/sqlite/email/sqlite-message-location-row.vala b/src/engine/sqlite/email/sqlite-message-location-row.vala index 07aca2e8..c6a16c80 100644 --- a/src/engine/sqlite/email/sqlite-message-location-row.vala +++ b/src/engine/sqlite/email/sqlite-message-location-row.vala @@ -44,11 +44,12 @@ public class Geary.Sqlite.MessageLocationRow : Geary.Sqlite.Row { * If the call ever returns a position of -1, that indicates the message does not exist in the * database. */ - public async int get_position_async(Cancellable? cancellable = null) throws Error { + public async int get_position_async(Transaction? transaction, Cancellable? cancellable) + throws Error { if (position >= 1) return position; - position = yield ((MessageLocationTable) table).fetch_position_async(id, folder_id, + position = yield ((MessageLocationTable) table).fetch_position_async(transaction, id, folder_id, cancellable); return (position >= 1) ? position : -1; diff --git a/src/engine/sqlite/email/sqlite-message-location-table.vala b/src/engine/sqlite/email/sqlite-message-location-table.vala index 262f9878..80adb259 100644 --- a/src/engine/sqlite/email/sqlite-message-location-table.vala +++ b/src/engine/sqlite/email/sqlite-message-location-table.vala @@ -17,28 +17,39 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table { base (db, table); } - public async int64 create_async(MessageLocationRow row, Cancellable? cancellable = null) - throws Error { - SQLHeavy.Query query = db.prepare( + public async int64 create_async(Transaction? transaction, MessageLocationRow row, + Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "MessageLocationTable.create_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "INSERT INTO MessageLocationTable (message_id, folder_id, ordering) VALUES (?, ?, ?)"); query.bind_int64(0, row.message_id); query.bind_int64(1, row.folder_id); query.bind_int64(2, row.ordering); - return yield query.execute_insert_async(cancellable); + int64 id = yield query.execute_insert_async(cancellable); + locked.set_commit_required(); + + yield release_lock_async(transaction, locked, cancellable); + + return id; } /** * low is one-based. If count is -1, all messages starting at low are returned. */ - public async Gee.List? list_async(int64 folder_id, int low, int count, - Cancellable? cancellable = null) throws Error { + public async Gee.List? list_async(Transaction? transaction, + int64 folder_id, int low, int count, Cancellable? cancellable) throws Error { assert(low >= 1); assert(count >= 0 || count == -1); + Transaction locked = yield obtain_lock_async(transaction, "MessageLocationTable.list_async", + cancellable); + SQLHeavy.Query query; if (count >= 0) { - query = db.prepare( + query = locked.prepare( "SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? " + "ORDER BY ordering LIMIT ? OFFSET ?"); query.bind_int64(0, folder_id); @@ -46,7 +57,7 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table { query.bind_int(2, low - 1); } else { // count == -1 - query = db.prepare( + query = locked.prepare( "SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? " + "ORDER BY ordering OFFSET ?"); query.bind_int64(0, folder_id); @@ -72,10 +83,13 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table { /** * All positions are one-based. */ - public async Gee.List? list_sparse_async(int64 folder_id, int[] by_position, - Cancellable? cancellable = null) throws Error { + public async Gee.List? list_sparse_async(Transaction? transaction, + int64 folder_id, int[] by_position, Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "MessageLocationTable.list_sparse_async", + cancellable); + // reuse the query for each iteration - SQLHeavy.Query query = db.prepare( + SQLHeavy.Query query = locked.prepare( "SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? " + "ORDER BY ordering LIMIT 1 OFFSET ?"); @@ -99,28 +113,32 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table { return (list.size > 0) ? list : null; } - public async Gee.List? list_ordering_async(int64 folder_id, int64 low_ordering, - int64 high_ordering, Cancellable? cancellable = null) throws Error { + public async Gee.List? list_ordering_async(Transaction? transaction, + int64 folder_id, int64 low_ordering, int64 high_ordering, Cancellable? cancellable) + throws Error { + Transaction locked = yield obtain_lock_async(transaction, "MessageLocationTable.list_ordering_async", + cancellable); + assert(low_ordering >= 0 || low_ordering == -1); assert(high_ordering >= 0 || high_ordering == -1); SQLHeavy.Query query; if (high_ordering != -1 && low_ordering != -1) { - query = db.prepare( + query = locked.prepare( "SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? " + "AND ordering >= ? AND ordering <= ? ORDER BY ordering ASC"); query.bind_int64(0, folder_id); query.bind_int64(1, low_ordering); query.bind_int64(2, high_ordering); } else if (high_ordering == -1) { - query = db.prepare( + query = locked.prepare( "SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? " + "AND ordering >= ? ORDER BY ordering ASC"); query.bind_int64(0, folder_id); query.bind_int64(1, low_ordering); } else { assert(low_ordering == -1); - query = db.prepare( + query = locked.prepare( "SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? " + "AND ordering <= ? ORDER BY ordering ASC"); query.bind_int64(0, folder_id); @@ -145,11 +163,14 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table { /** * position is one-based. */ - public async MessageLocationRow? fetch_async(int64 folder_id, int position, - Cancellable? cancellable = null) throws Error { + public async MessageLocationRow? fetch_async(Transaction transaction, int64 folder_id, + int position, Cancellable? cancellable) throws Error { assert(position >= 1); - SQLHeavy.Query query = db.prepare( + Transaction locked = yield obtain_lock_async(transaction, "MessageLocationTable.fetch_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? " + "ORDER BY ordering LIMIT 1 OFFSET ?"); query.bind_int64(0, folder_id); @@ -163,9 +184,12 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table { results.fetch_int64(2), position); } - public async MessageLocationRow? fetch_by_ordering_async(int64 folder_id, int64 ordering, - Cancellable? cancellable = null) throws Error { - SQLHeavy.Query query = db.prepare( + public async MessageLocationRow? fetch_by_ordering_async(Transaction? transaction, + int64 folder_id, int64 ordering, Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "MessageLocationTable.fetch_by_ordering_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "SELECT id, message_id FROM MessageLocationTable WHERE folder_id = ? AND ordering = ? "); query.bind_int64(0, folder_id); query.bind_int64(1, ordering); @@ -178,9 +202,12 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table { folder_id, ordering, -1); } - public async int fetch_position_async(int64 id, int64 folder_id, Cancellable? cancellable = null) - throws Error { - SQLHeavy.Query query = db.prepare( + public async int fetch_position_async(Transaction? transaction, int64 id, + int64 folder_id, Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "MessageLocationTable.fetch_position_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "SELECT id FROM MessageLocationTable WHERE folder_id = ? ORDER BY ordering"); query.bind_int64(0, folder_id); @@ -199,9 +226,12 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table { return -1; } - public async int fetch_count_for_folder_async(int64 folder_id, Cancellable? cancellable = null) - throws Error { - SQLHeavy.Query query = db.prepare( + public async int fetch_count_for_folder_async(Transaction? transaction, + int64 folder_id, Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, + "MessageLocationTable.fetch_count_for_folder_async", cancellable); + + SQLHeavy.Query query = locked.prepare( "SELECT COUNT(*) FROM MessageLocationTable WHERE folder_id = ?"); query.bind_int64(0, folder_id); @@ -213,11 +243,14 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table { /** * Find a row based on its ordering value in the folder. */ - public async bool does_ordering_exist_async(int64 folder_id, int64 ordering, - out int64 message_id, Cancellable? cancellable = null) throws Error { + public async bool does_ordering_exist_async(Transaction? transaction, int64 folder_id, + int64 ordering, out int64 message_id, Cancellable? cancellable) throws Error { message_id = Row.INVALID_ID; - SQLHeavy.Query query = db.prepare( + Transaction locked = yield obtain_lock_async(transaction, + "MessageLocationTable.does_ordering_exist_async", cancellable); + + SQLHeavy.Query query = locked.prepare( "SELECT message_id FROM MessageLocationTable WHERE folder_id = ? AND ordering = ?"); query.bind_int64(0, folder_id); query.bind_int64(1, ordering); @@ -231,9 +264,12 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table { return true; } - public async int64 get_earliest_ordering_async(int64 folder_id, Cancellable? cancellable = null) - throws Error { - SQLHeavy.Query query = db.prepare( + public async int64 get_earliest_ordering_async(Transaction? transaction, int64 folder_id, + Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, + "MessageLocationTable.get_earliest_ordering_async", cancellable); + + SQLHeavy.Query query = locked.prepare( "SELECT MIN(ordering) FROM MessageLocationTable WHERE folder_id = ?"); query.bind_int64(0, folder_id); @@ -242,13 +278,14 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table { return (!result.finished) ? result.fetch_int64(0) : -1; } - public async bool remove_by_position_async(int64 folder_id, int position, - Cancellable? cancellable = null) throws Error { + public async bool remove_by_position_async(Transaction? transaction, int64 folder_id, + int position, Cancellable? cancellable) throws Error { assert(position >= 1); - SQLHeavy.Transaction transaction = db.begin_transaction(); + Transaction locked = yield obtain_lock_async(transaction, + "MessageLocationTable.remove_by_position_async", cancellable); - SQLHeavy.Query query = transaction.prepare( + SQLHeavy.Query query = locked.prepare( "SELECT id FROM MessageLocationTable WHERE folder_id = ? ORDER BY ordering LIMIT 1 OFFSET ?"); query.bind_int64(0, folder_id); query.bind_int(1, position - 1); @@ -257,13 +294,18 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table { if (results.finished) return false; - query = transaction.prepare( + query = locked.prepare( "DELETE FROM MessageLocationTable WHERE id = ?"); query.bind_int64(0, results.fetch_int(0)); yield query.execute_async(cancellable); + locked.set_commit_required(); - yield transaction.commit_async(); + // only commit if performing our own transaction + if (transaction == null) + yield locked.commit_async(cancellable); + + yield release_lock_async(transaction, locked, cancellable); return true; } diff --git a/src/engine/sqlite/email/sqlite-message-table.vala b/src/engine/sqlite/email/sqlite-message-table.vala index dfbab4bc..d3b073eb 100644 --- a/src/engine/sqlite/email/sqlite-message-table.vala +++ b/src/engine/sqlite/email/sqlite-message-table.vala @@ -35,8 +35,12 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table { base (gdb, table); } - public async int64 create_async(MessageRow row, Cancellable? cancellable) throws Error { - SQLHeavy.Query query = db.prepare( + public async int64 create_async(Transaction? transaction, MessageRow row, + Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "MessageTable.create_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "INSERT INTO MessageTable " + "(fields, date_field, date_time_t, from_field, sender, reply_to, to_field, cc, bcc, " + "message_id, in_reply_to, subject, header, body) " @@ -56,22 +60,30 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table { query.bind_string(12, row.header); query.bind_string(13, row.body); - return yield query.execute_insert_async(cancellable); + int64 id = yield query.execute_insert_async(cancellable); + locked.set_commit_required(); + + yield release_lock_async(transaction, locked, cancellable); + + return id; } - public async void merge_async(MessageRow row, Cancellable? cancellable = null) throws Error { - SQLHeavy.Transaction transaction = db.begin_transaction(); + public async void merge_async(Transaction? transaction, MessageRow row, + Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "MessageTable.merge_async", + cancellable); // merge the valid fields in the row - SQLHeavy.Query query = transaction.prepare( + SQLHeavy.Query query = locked.prepare( "UPDATE MessageTable SET fields = fields | ? WHERE id=?"); query.bind_int(0, row.fields); query.bind_int64(1, row.id); yield query.execute_async(cancellable); + locked.set_commit_required(); if (row.fields.is_any_set(Geary.Email.Field.DATE)) { - query = transaction.prepare( + query = locked.prepare( "UPDATE MessageTable SET date_field=?, date_time_t=? WHERE id=?"); query.bind_string(0, row.date); query.bind_int64(1, row.date_time_t); @@ -81,7 +93,7 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table { } if (row.fields.is_any_set(Geary.Email.Field.ORIGINATORS)) { - query = transaction.prepare( + query = locked.prepare( "UPDATE MessageTable SET from_field=?, sender=?, reply_to=? WHERE id=?"); query.bind_string(0, row.from); query.bind_string(1, row.sender); @@ -92,7 +104,7 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table { } if (row.fields.is_any_set(Geary.Email.Field.RECEIVERS)) { - query = transaction.prepare( + query = locked.prepare( "UPDATE MessageTable SET to_field=?, cc=?, bcc=? WHERE id=?"); query.bind_string(0, row.to); query.bind_string(1, row.cc); @@ -103,7 +115,7 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table { } if (row.fields.is_any_set(Geary.Email.Field.REFERENCES)) { - query = transaction.prepare( + query = locked.prepare( "UPDATE MessageTable SET message_id=?, in_reply_to=? WHERE id=?"); query.bind_string(0, row.message_id); query.bind_string(1, row.in_reply_to); @@ -113,7 +125,7 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table { } if (row.fields.is_any_set(Geary.Email.Field.SUBJECT)) { - query = transaction.prepare( + query = locked.prepare( "UPDATE MessageTable SET subject=? WHERE id=?"); query.bind_string(0, row.subject); query.bind_int64(1, row.id); @@ -122,7 +134,7 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table { } if (row.fields.is_any_set(Geary.Email.Field.HEADER)) { - query = transaction.prepare( + query = locked.prepare( "UPDATE MessageTable SET header=? WHERE id=?"); query.bind_string(0, row.header); query.bind_int64(1, row.id); @@ -131,7 +143,7 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table { } if (row.fields.is_any_set(Geary.Email.Field.BODY)) { - query = transaction.prepare( + query = locked.prepare( "UPDATE MessageTable SET body=? WHERE id=?"); query.bind_string(0, row.body); query.bind_int64(1, row.id); @@ -139,14 +151,22 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table { yield query.execute_async(cancellable); } - yield transaction.commit_async(); + // only commit if internally atomic + if (transaction == null) + yield locked.commit_async(cancellable); + + yield release_lock_async(transaction, locked, cancellable); } - public async Gee.List? list_by_message_id_async(Geary.RFC822.MessageID message_id, - Geary.Email.Field fields, Cancellable? cancellable) throws Error { + public async Gee.List? list_by_message_id_async(Transaction? transaction, + Geary.RFC822.MessageID message_id, Geary.Email.Field fields, Cancellable? cancellable) + throws Error { assert(fields != Geary.Email.Field.NONE); - SQLHeavy.Query query = db.prepare( + Transaction locked = yield obtain_lock_async(transaction, "MessageTable.list_by_message_id_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "SELECT %s FROM MessageTable WHERE message_id=?".printf(fields_to_columns(fields))); query.bind_string(0, message_id.value); @@ -163,11 +183,14 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table { return (list.size > 0) ? list : null; } - public async MessageRow? fetch_async(int64 id, Geary.Email.Field requested_fields, - Cancellable? cancellable = null) throws Error { + public async MessageRow? fetch_async(Transaction? transaction, int64 id, + Geary.Email.Field requested_fields, Cancellable? cancellable = null) throws Error { assert(requested_fields != Geary.Email.Field.NONE); - SQLHeavy.Query query = db.prepare( + Transaction locked = yield obtain_lock_async(transaction, "MessageTable.fetch_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "SELECT %s FROM MessageTable WHERE id=?".printf(fields_to_columns(requested_fields))); query.bind_int64(0, id); @@ -180,11 +203,14 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table { return row; } - public async bool fetch_fields_async(int64 id, out Geary.Email.Field available_fields, - Cancellable? cancellable = null) throws Error { + public async bool fetch_fields_async(Transaction? transaction, int64 id, + out Geary.Email.Field available_fields, Cancellable? cancellable) throws Error { available_fields = Geary.Email.Field.NONE; - SQLHeavy.Query query = db.prepare( + Transaction locked = yield obtain_lock_async(transaction, "MessageTable.fetch_fields_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "SELECT fields FROM MessageTable WHERE id=?"); query.bind_int64(0, id); @@ -244,9 +270,12 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table { return builder.str; } - public async int search_message_id_count_async(Geary.RFC822.MessageID message_id, - Cancellable? cancellable = null) throws Error { - SQLHeavy.Query query = db.prepare( + public async int search_message_id_count_async(Transaction? transaction, + Geary.RFC822.MessageID message_id, Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "MessageTable.search_message_id_count", + cancellable); + + SQLHeavy.Query query = locked.prepare( "SELECT COUNT(*) FROM MessageTable WHERE message_id=?"); query.bind_string(0, message_id.value); @@ -255,9 +284,12 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table { return (result.finished) ? 0 : result.fetch_int(0); } - public async Gee.List? search_message_id_async(Geary.RFC822.MessageID message_id, - Cancellable? cancellable = null) throws Error { - SQLHeavy.Query query = db.prepare( + public async Gee.List? search_message_id_async(Transaction? transaction, + Geary.RFC822.MessageID message_id, Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "MessageTable.search_message_id_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "SELECT id FROM MessageTable WHERE message_id=?"); query.bind_string(0, message_id.value); diff --git a/src/engine/sqlite/imap/sqlite-imap-folder-properties-table.vala b/src/engine/sqlite/imap/sqlite-imap-folder-properties-table.vala index 7da3a5dd..67a55b88 100644 --- a/src/engine/sqlite/imap/sqlite-imap-folder-properties-table.vala +++ b/src/engine/sqlite/imap/sqlite-imap-folder-properties-table.vala @@ -19,9 +19,12 @@ public class Geary.Sqlite.ImapFolderPropertiesTable : Geary.Sqlite.Table { base (gdb, table); } - public async int64 create_async(ImapFolderPropertiesRow row, Cancellable? cancellable = null) - throws Error { - SQLHeavy.Query query = db.prepare( + public async int64 create_async(Transaction? transaction, ImapFolderPropertiesRow row, + Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "ImapFolderPropertiesTable.create_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "INSERT INTO ImapFolderPropertiesTable (folder_id, last_seen_total, uid_validity, uid_next, attributes) " + "VALUES (?, ?, ?, ?, ?)"); query.bind_int64(0, row.folder_id); @@ -30,12 +33,20 @@ public class Geary.Sqlite.ImapFolderPropertiesTable : Geary.Sqlite.Table { query.bind_int64(3, (row.uid_next != null) ? row.uid_next.value : -1); query.bind_string(4, row.attributes); - return yield query.execute_insert_async(cancellable); + int64 id = yield query.execute_insert_async(cancellable); + locked.set_commit_required(); + + yield release_lock_async(transaction, locked, cancellable); + + return id; } - public async void update_async(int64 folder_id, ImapFolderPropertiesRow row, - Cancellable? cancellable = null) throws Error { - SQLHeavy.Query query = db.prepare( + public async void update_async(Transaction? transaction, int64 folder_id, + ImapFolderPropertiesRow row, Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "ImapFolderPropertiesTable.update_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "UPDATE ImapFolderPropertiesTable " + "SET last_seen_total = ?, uid_validity = ?, uid_next = ?, attributes = ? " + "WHERE folder_id = ?"); @@ -46,11 +57,17 @@ public class Geary.Sqlite.ImapFolderPropertiesTable : Geary.Sqlite.Table { query.bind_int64(4, folder_id); yield query.execute_async(cancellable); + locked.set_commit_required(); + + yield release_lock_async(transaction, locked, cancellable); } - public async ImapFolderPropertiesRow? fetch_async(int64 folder_id, Cancellable? cancellable = null) - throws Error { - SQLHeavy.Query query = db.prepare( + public async ImapFolderPropertiesRow? fetch_async(Transaction? transaction, + int64 folder_id, Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "ImapFolderPropertiesTable.fetch_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "SELECT id, last_seen_total, uid_validity, uid_next, attributes " + "FROM ImapFolderPropertiesTable WHERE folder_id = ?"); query.bind_int64(0, folder_id); diff --git a/src/engine/sqlite/imap/sqlite-imap-message-properties-table.vala b/src/engine/sqlite/imap/sqlite-imap-message-properties-table.vala index 36b8506c..37489aca 100644 --- a/src/engine/sqlite/imap/sqlite-imap-message-properties-table.vala +++ b/src/engine/sqlite/imap/sqlite-imap-message-properties-table.vala @@ -18,9 +18,12 @@ public class Geary.Sqlite.ImapMessagePropertiesTable : Geary.Sqlite.Table { base (gdb, table); } - public async int64 create_async(ImapMessagePropertiesRow row, Cancellable? cancellable = null) - throws Error { - SQLHeavy.Query query = db.prepare( + public async int64 create_async(Transaction? transaction, ImapMessagePropertiesRow row, + Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, + "ImapMessagePropertiesTable.create_async", cancellable); + + SQLHeavy.Query query = locked.prepare( "INSERT INTO ImapMessagePropertiesTable (message_id, flags, internaldate, rfc822_size) " + "VALUES (?, ?, ?, ?)"); query.bind_int64(0, row.message_id); @@ -28,12 +31,20 @@ public class Geary.Sqlite.ImapMessagePropertiesTable : Geary.Sqlite.Table { query.bind_string(2, row.internaldate); query.bind_int64(3, row.rfc822_size); - return yield query.execute_insert_async(cancellable); + int64 id = yield query.execute_insert_async(cancellable); + locked.set_commit_required(); + + yield release_lock_async(transaction, locked, cancellable); + + return id; } - public async ImapMessagePropertiesRow? fetch_async(int64 message_id, Cancellable? cancellable = null) - throws Error { - SQLHeavy.Query query = db.prepare( + public async ImapMessagePropertiesRow? fetch_async(Transaction? transaction, int64 message_id, + Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "ImapMessagePropertiesTable.fetch_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "SELECT id, flags internaldate, rfc822_size FROM ImapMessagePropertiesTable " + "WHERE message_id = ?"); query.bind_int64(0, message_id); @@ -46,10 +57,12 @@ public class Geary.Sqlite.ImapMessagePropertiesTable : Geary.Sqlite.Table { result.fetch_string(1), result.fetch_string(2), (long) result.fetch_int64(3)); } - public async void update_async(int64 message_id, string? flags, string? internaldate, long rfc822_size, - Cancellable? cancellable = null) - throws Error { - SQLHeavy.Query query = db.prepare( + public async void update_async(Transaction? transaction, int64 message_id, string? flags, + string? internaldate, long rfc822_size, Cancellable? cancellable) throws Error { + Transaction locked = yield obtain_lock_async(transaction, "ImapMessagePropertiesTable.update_async", + cancellable); + + SQLHeavy.Query query = locked.prepare( "UPDATE ImapMessagePropertiesTable SET flags = ?, internaldate = ?, rfc822_size = ? " + "WHERE message_id = ?"); query.bind_string(0, flags); @@ -58,6 +71,9 @@ public class Geary.Sqlite.ImapMessagePropertiesTable : Geary.Sqlite.Table { query.bind_int64(3, message_id); yield query.execute_async(cancellable); + locked.set_commit_required(); + + yield release_lock_async(transaction, locked, cancellable); } } diff --git a/src/wscript b/src/wscript index 2d477fbc..f27368ad 100644 --- a/src/wscript +++ b/src/wscript @@ -112,6 +112,7 @@ def build(bld): '../engine/sqlite/abstract/sqlite-database.vala', '../engine/sqlite/abstract/sqlite-row.vala', '../engine/sqlite/abstract/sqlite-table.vala', + '../engine/sqlite/abstract/sqlite-transaction.vala', '../engine/sqlite/api/sqlite-account.vala', '../engine/sqlite/api/sqlite-folder.vala', '../engine/sqlite/email/sqlite-folder-row.vala',