diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index b5903d52..bd05efc9 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -227,6 +227,7 @@ engine/memory/memory-unowned-string-buffer.vala engine/nonblocking/nonblocking-abstract-semaphore.vala engine/nonblocking/nonblocking-batch.vala +engine/nonblocking/nonblocking-concurrent.vala engine/nonblocking/nonblocking-counting-semaphore.vala engine/nonblocking/nonblocking-error.vala engine/nonblocking/nonblocking-mailbox.vala diff --git a/src/engine/imap-db/imap-db-folder.vala b/src/engine/imap-db/imap-db-folder.vala index 6197b71c..5a1fd96a 100644 --- a/src/engine/imap-db/imap-db-folder.vala +++ b/src/engine/imap-db/imap-db-folder.vala @@ -512,7 +512,8 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics { } // Note that this does INCLUDES messages marked for removal - public async Gee.SortedSet? list_uids_by_range_async(Imap.UID first_uid, Imap.UID last_uid, + // TODO: Let the user request a SortedSet, or have them provide the Set to add to + public async Gee.Set? list_uids_by_range_async(Imap.UID first_uid, Imap.UID last_uid, bool include_marked_for_removal, Cancellable? cancellable) throws Error { // order correctly Imap.UID start, end; @@ -524,7 +525,7 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics { end = first_uid; } - Gee.SortedSet uids = new Gee.TreeSet(); + Gee.Set uids = new Gee.HashSet(); yield db.exec_transaction_async(Db.TransactionType.RO, (cx) => { Db.Statement stmt = cx.prepare(""" SELECT ordering, remove_marker diff --git a/src/engine/imap-engine/imap-engine-generic-account.vala b/src/engine/imap-engine/imap-engine-generic-account.vala index 6c2a5353..90a843a7 100644 --- a/src/engine/imap-engine/imap-engine-generic-account.vala +++ b/src/engine/imap-engine/imap-engine-generic-account.vala @@ -5,7 +5,7 @@ */ private abstract class Geary.ImapEngine.GenericAccount : Geary.AbstractAccount { - private const int REFRESH_FOLDER_LIST_SEC = 2 * 60; + private const int REFRESH_FOLDER_LIST_SEC = 4 * 60; private static Geary.FolderPath? outbox_path = null; private static Geary.FolderPath? search_path = null; @@ -433,28 +433,10 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.AbstractAccount { } } - // always update, openable or not; update UIDs if remote opened, otherwise will keep - // signalling that it's changed (because the only time UIDNEXT/UIDValidity is updated - // is when the remote folder is first opened) + // always update, openable or not; have the folder update the UID info the next time + // it's opened try { - bool update_uid_info; - switch (generic_folder.get_open_state()) { - case Folder.OpenState.REMOTE: - case Folder.OpenState.BOTH: - update_uid_info = true; - break; - - case Folder.OpenState.LOCAL: - case Folder.OpenState.OPENING: - case Folder.OpenState.CLOSED: - update_uid_info = false; - break; - - default: - assert_not_reached(); - } - - yield local.update_folder_status_async(remote_folder, update_uid_info, cancellable); + yield local.update_folder_status_async(remote_folder, false, cancellable); } catch (Error update_error) { debug("Unable to update local folder %s with remote properties: %s", remote_folder.to_string(), update_error.message); diff --git a/src/engine/imap-engine/imap-engine-generic-folder.vala b/src/engine/imap-engine/imap-engine-generic-folder.vala index 0d7a100a..f52db768 100644 --- a/src/engine/imap-engine/imap-engine-generic-folder.vala +++ b/src/engine/imap-engine/imap-engine-generic-folder.vala @@ -212,40 +212,48 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde // get all the UIDs in said range from the local store, sorted; convert to non-null // for ease of use later - Gee.SortedSet? local_uids = yield local_folder.list_uids_by_range_async( + Gee.Set? local_uids = yield local_folder.list_uids_by_range_async( first_uid, last_uid, true, cancellable); if (local_uids == null) - local_uids = new Gee.TreeSet(); + local_uids = Gee.Set.empty(); check_open("normalize_folders (list local)"); // Do the same on the remote ... make non-null for ease of use later - Gee.SortedSet? remote_uids = yield remote_folder.list_uids_async( + Gee.Set? remote_uids = yield remote_folder.list_uids_async( new Imap.MessageSet.uid_range(first_uid, last_uid), cancellable); if (remote_uids == null) - remote_uids = new Gee.TreeSet(); + local_uids = Gee.Set.empty(); check_open("normalize_folders (list remote)"); - // walk local UIDs looking for UIDs no longer on remote, removing those that are available - // make the next pass that much shorter - Gee.HashSet removed_uids = new Gee.HashSet(); - foreach (Imap.UID local_uid in local_uids) { - // if in local but not remote, consider removed from remote - if (!remote_uids.remove(local_uid)) - removed_uids.add(local_uid); - } + debug("%s: Loaded local (%d) and remote (%d) UIDs, normalizing...", to_string(), + local_uids.size, remote_uids.size); - // everything remaining in remote has been added since folder last seen ... whether they're - // discovered (inserted) or appended depends on the highest local UID + Gee.HashSet removed_uids = new Gee.HashSet(); Gee.HashSet appended_uids = new Gee.HashSet(); Gee.HashSet discovered_uids = new Gee.HashSet(); - foreach (Imap.UID remote_uid in remote_uids) { - if (remote_uid.compare_to(local_latest_id.uid) > 0) - appended_uids.add(remote_uid); - else - discovered_uids.add(remote_uid); - } + + // Because the number of UIDs being processed can be immense in large folders, process + // in a background thread + yield Nonblocking.Concurrent.global.schedule_async(() => { + // walk local UIDs looking for UIDs no longer on remote, removing those that are available + // make the next pass that much shorter + foreach (Imap.UID local_uid in local_uids) { + // if in local but not remote, consider removed from remote + if (!remote_uids.remove(local_uid)) + removed_uids.add(local_uid); + } + + // everything remaining in remote has been added since folder last seen ... whether they're + // discovered (inserted) or appended depends on the highest local UID + foreach (Imap.UID remote_uid in remote_uids) { + if (remote_uid.compare_to(local_latest_id.uid) > 0) + appended_uids.add(remote_uid); + else + discovered_uids.add(remote_uid); + } + }, cancellable); debug("%s: changes since last seen: removed=%d appended=%d discovered=%d", to_string(), removed_uids.size, appended_uids.size, discovered_uids.size); @@ -272,21 +280,27 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde to_create, cancellable); assert(created_or_merged != null); - foreach (Email email in created_or_merged.keys) { - ImapDB.EmailIdentifier id = (ImapDB.EmailIdentifier) email.id; - bool created = created_or_merged.get(email); - - // report all appended email, but separate out email never seen before (created) - // as locally-appended - if (appended_uids.contains(id.uid)) { - appended_ids.add(id); + // it's possible a large number of messages have come in, so process them in the + // background + yield Nonblocking.Concurrent.global.schedule_async(() => { + foreach (Email email in created_or_merged.keys) { + ImapDB.EmailIdentifier id = (ImapDB.EmailIdentifier) email.id; + bool created = created_or_merged.get(email); - if (created) - locally_appended_ids.add(id); - } else if (discovered_uids.contains(id.uid) && created) { - discovered_ids.add(id); + // report all appended email, but separate out email never seen before (created) + // as locally-appended + if (appended_uids.contains(id.uid)) { + appended_ids.add(id); + + if (created) + locally_appended_ids.add(id); + } else if (discovered_uids.contains(id.uid) && created) { + discovered_ids.add(id); + } } - } + }, cancellable); + + debug("%s: Finished creating/merging %d emails", to_string(), created_or_merged.size); } check_open("normalize_folders (created/merged appended/discovered emails)"); diff --git a/src/engine/imap/api/imap-folder.vala b/src/engine/imap/api/imap-folder.vala index 64e9fd09..f6d72adb 100644 --- a/src/engine/imap/api/imap-folder.vala +++ b/src/engine/imap/api/imap-folder.vala @@ -323,22 +323,28 @@ private class Geary.Imap.Folder : BaseObject { } // Utility method for listing a UID range - public async Gee.SortedSet? list_uids_async(MessageSet msg_set, Cancellable? cancellable) + // TODO: Offer parameter so a SortedSet could be returned (or, the caller must supply the Set) + public async Gee.Set? list_uids_async(MessageSet msg_set, Cancellable? cancellable) throws Error { - Gee.List? list = yield list_email_async(msg_set, Geary.Email.Field.NONE, + FetchCommand cmd = new FetchCommand.data_type(msg_set, FetchDataType.UID); + + Gee.HashMap? fetched; + yield exec_commands_async(new Collection.SingleItem(cmd), out fetched, null, cancellable); - if (list == null || list.size == 0) + if (fetched == null || fetched.size == 0) return null; - Gee.SortedSet uids = new Gee.TreeSet(); - foreach (Geary.Email email in list) { - Imap.UID? uid = ((ImapDB.EmailIdentifier) email.id).uid; - assert(uid != null); - - uids.add(uid); - } + // Because the number of UIDs can be immense, do hashing in the background + Gee.Set uids = new Gee.HashSet(); + yield Nonblocking.Concurrent.global.schedule_async(() => { + foreach (FetchedData fetched_data in fetched.values) { + Imap.UID? uid = fetched_data.data_map.get(FetchDataType.UID) as Imap.UID; + if (uid != null) + uids.add(uid); + } + }, cancellable); - return uids; + return (uids.size > 0) ? uids : null; } // Returns a no-message-id ImapDB.EmailIdentifier with the UID stored in it. @@ -435,37 +441,40 @@ private class Geary.Imap.Folder : BaseObject { return null; // Convert fetched data into Geary.Email objects + // because this could be for a lot of email, do in a background thread Gee.List email_list = new Gee.ArrayList(); - foreach (SequenceNumber seq_num in fetched.keys) { - FetchedData fetched_data = fetched.get(seq_num); - - // the UID should either have been fetched (if using positional addressing) or should - // have come back with the response (if using UID addressing) - UID? uid = fetched_data.data_map.get(FetchDataType.UID) as UID; - if (uid == null) { - message("Unable to list message #%s on %s: No UID returned from server", - seq_num.to_string(), to_string()); + yield Nonblocking.Concurrent.global.schedule_async(() => { + foreach (SequenceNumber seq_num in fetched.keys) { + FetchedData fetched_data = fetched.get(seq_num); - continue; - } - - try { - Geary.Email email = fetched_data_to_email(uid, fetched_data, fields, - partial_header_identifier, body_identifier, preview_identifier, - preview_charset_identifier); - if (!email.fields.fulfills(fields)) { - message("%s: %s missing=%s fetched=%s", to_string(), email.id.to_string(), - fields.clear(email.fields).to_list_string(), fetched_data.to_string()); + // the UID should either have been fetched (if using positional addressing) or should + // have come back with the response (if using UID addressing) + UID? uid = fetched_data.data_map.get(FetchDataType.UID) as UID; + if (uid == null) { + message("Unable to list message #%s on %s: No UID returned from server", + seq_num.to_string(), to_string()); continue; } - email_list.add(email); - } catch (Error err) { - debug("%s: Unable to convert email for %s %s: %s", to_string(), uid.to_string(), - fetched_data.to_string(), err.message); + try { + Geary.Email email = fetched_data_to_email(to_string(), uid, fetched_data, fields, + partial_header_identifier, body_identifier, preview_identifier, + preview_charset_identifier); + if (!email.fields.fulfills(fields)) { + message("%s: %s missing=%s fetched=%s", to_string(), email.id.to_string(), + fields.clear(email.fields).to_list_string(), fetched_data.to_string()); + + continue; + } + + email_list.add(email); + } catch (Error err) { + debug("%s: Unable to convert email for %s %s: %s", to_string(), uid.to_string(), + fetched_data.to_string(), err.message); + } } - } + }, cancellable); return (email_list.size > 0) ? email_list : null; } @@ -660,10 +669,10 @@ private class Geary.Imap.Folder : BaseObject { } } - private Geary.Email fetched_data_to_email(UID uid, FetchedData fetched_data, Geary.Email.Field required_fields, + private static Geary.Email fetched_data_to_email(string folder_name, UID uid, + FetchedData fetched_data, Geary.Email.Field required_fields, FetchBodyDataIdentifier? partial_header_identifier, FetchBodyDataIdentifier? body_identifier, - FetchBodyDataIdentifier? preview_identifier, FetchBodyDataIdentifier? preview_charset_identifier) - throws Error { + FetchBodyDataIdentifier? preview_identifier, FetchBodyDataIdentifier? preview_charset_identifier) throws Error { // note the use of INVALID_ROWID, as the rowid for this email (if one is present in the // database) is unknown at this time; this means ImapDB *must* create a new EmailIdentifier // for this email after create/merge is completed @@ -731,10 +740,10 @@ private class Geary.Imap.Folder : BaseObject { // if the header was requested, convert its fields now bool has_partial_header = fetched_data.body_data_map.has_key(partial_header_identifier); if (partial_header_identifier != null && !has_partial_header) { - message("[%s] No partial header identifier \"%s\" found:", to_string(), + message("[%s] No partial header identifier \"%s\" found:", folder_name, partial_header_identifier.to_string()); foreach (FetchBodyDataIdentifier id in fetched_data.body_data_map.keys) - message("[%s] has %s", to_string(), id.to_string()); + message("[%s] has %s", folder_name, id.to_string()); } else if (partial_header_identifier != null && has_partial_header) { RFC822.Header headers = new RFC822.Header( fetched_data.body_data_map.get(partial_header_identifier)); @@ -832,10 +841,10 @@ private class Geary.Imap.Folder : BaseObject { email.set_message_body(new Geary.RFC822.Text( fetched_data.body_data_map.get(body_identifier))); } else { - message("[%s] No body identifier \"%s\" found", to_string(), + message("[%s] No body identifier \"%s\" found", folder_name, body_identifier.to_string()); foreach (FetchBodyDataIdentifier id in fetched_data.body_data_map.keys) - message("[%s] has %s", to_string(), id.to_string()); + message("[%s] has %s", folder_name, id.to_string()); } } @@ -849,10 +858,10 @@ private class Geary.Imap.Folder : BaseObject { fetched_data.body_data_map.get(preview_identifier), fetched_data.body_data_map.get(preview_charset_identifier))); } else { - message("[%s] No preview identifiers \"%s\" and \"%s\" found", to_string(), + message("[%s] No preview identifiers \"%s\" and \"%s\" found", folder_name, preview_identifier.to_string(), preview_charset_identifier.to_string()); foreach (FetchBodyDataIdentifier id in fetched_data.body_data_map.keys) - message("[%s] has %s", to_string(), id.to_string()); + message("[%s] has %s", folder_name, id.to_string()); } } @@ -893,7 +902,7 @@ private class Geary.Imap.Folder : BaseObject { return null; } - private bool required_but_not_set(Geary.Email.Field check, Geary.Email.Field users_fields, Geary.Email email) { + private static bool required_but_not_set(Geary.Email.Field check, Geary.Email.Field users_fields, Geary.Email email) { return users_fields.require(check) ? !email.fields.is_all_set(check) : false; } diff --git a/src/engine/imap/transport/imap-deserializer.vala b/src/engine/imap/transport/imap-deserializer.vala index f81466a4..1c69c5a9 100644 --- a/src/engine/imap/transport/imap-deserializer.vala +++ b/src/engine/imap/transport/imap-deserializer.vala @@ -217,7 +217,7 @@ public class Geary.Imap.Deserializer : BaseObject { * * Subscribe to the various signals before starting to ensure that all responses are trapped. */ - public async void start_async(int priority = GLib.Priority.DEFAULT) throws Error { + public async void start_async(int priority = GLib.Priority.DEFAULT_IDLE) throws Error { if (cancellable != null) throw new EngineError.ALREADY_OPEN("Deserializer already open"); diff --git a/src/engine/nonblocking/nonblocking-concurrent.vala b/src/engine/nonblocking/nonblocking-concurrent.vala new file mode 100644 index 00000000..81dfc491 --- /dev/null +++ b/src/engine/nonblocking/nonblocking-concurrent.vala @@ -0,0 +1,143 @@ +/* Copyright 2013 Yorba Foundation + * + * This software is licensed under the GNU Lesser General Public License + * (version 2.1 or later). See the COPYING file in this distribution. + */ + +/** + * Schedule an asynchronous operation (a {@link ConcurrentCallback} to run in a background thread. + * + * Useful to perform blocking I/O or CPU-bound tasks where the result must be ready before + * other work can proceed. + */ + +public class Geary.Nonblocking.Concurrent : BaseObject { + public const int DEFAULT_MAX_THREADS = 4; + + /** + * A callback invoked from a {@link Concurrent} background thread. + * + * The Cancellable passed to the callback is the same Cancellable the caller supplied to + * {@link schedule_async}. + * + * Note that this callback may throw an Error. If it does, Concurrent will throw it to the + * foreground caller on behalf of the callback. + */ + public delegate void ConcurrentCallback(Cancellable? cancellable) throws Error; + + private class ConcurrentOperation : BaseObject { + private unowned ConcurrentCallback cb; + private Cancellable? cancellable; + private Error? caught_err = null; + private Event event = new Event(); + + public ConcurrentOperation(ConcurrentCallback cb, Cancellable? cancellable) { + this.cb = cb; + this.cancellable = cancellable; + } + + // Called from the foreground thread to wait for the background to complete. + // + // Can't cancel here because we *must* wait for the operation to be executed by the + // thread and complete + public async void wait_async() throws Error { + yield event.wait_async(); + + if (caught_err != null) + throw caught_err; + + // now deal with cancellation + if (cancellable != null && cancellable.is_cancelled()) + throw new IOError.CANCELLED("Geary.Nonblocking.Concurrent cancelled"); + } + + // Called from a background thread + public void execute() { + // only execute if not already cancelled + if (cancellable == null || !cancellable.is_cancelled()) { + try { + cb(cancellable); + } catch (Error err) { + caught_err = err; + } + } + + // can't notify event here, Nonblocking.Event is not thread safe + // + // artificially increment the ref count of this object, schedule a completion callback + // on the forground thread, and signal there + ref(); + + Idle.add(on_notify_completed); + } + + // Called in the context of the Event loop in the foreground thread + private bool on_notify_completed() { + // alert waiters + event.blind_notify(); + + // unref; do not touch "self" from here on, it's possibly deallocated + unref(); + + return false; + } + } + + private static Concurrent? _global = null; + /** + * Returns the global instance of a {@link Concurrent} scheduler. + * + * Note that this call is ''not'' thread-safe and should only be called from the foreground + * thread. + */ + public static Concurrent global { + get { + return (_global != null) ? _global : _global = new Concurrent(); + } + } + + private ThreadPool? thread_pool = null; + private ThreadError? init_err = null; + + /** + * Creates a new Concurrent pool for scheduling background work. + * + * A caller may create their own pool, or they may use the default one available with + * {link instance}. + */ + public Concurrent(int max_threads = DEFAULT_MAX_THREADS) { + try { + thread_pool = new ThreadPool.with_owned_data(on_work_ready, + max_threads, false); + } catch (ThreadError err) { + init_err = err; + + warning("Unable to create Geary.Nonblocking.Concurrent: %s", err.message); + } + } + + /** + * Schedule a callback to be invoked in a background thread. + * + * The caller should take care that the callback's state is available until + * {@link schedule_async} completes. + * + * This method is thread-safe. + */ + public async void schedule_async(ConcurrentCallback cb, Cancellable? cancellable = null) + throws Error { + if (init_err != null) + throw init_err; + + // hold ConcurrentOperation ref until thread completes + ConcurrentOperation op = new ConcurrentOperation(cb, cancellable); + thread_pool.add(op); + + yield op.wait_async(); + } + + private void on_work_ready(owned ConcurrentOperation op) { + op.execute(); + } +} +