diff --git a/po/POTFILES.in b/po/POTFILES.in index 002f3205..4f796880 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -339,8 +339,8 @@ src/engine/nonblocking/nonblocking-batch.vala src/engine/nonblocking/nonblocking-concurrent.vala src/engine/nonblocking/nonblocking-counting-semaphore.vala src/engine/nonblocking/nonblocking-error.vala -src/engine/nonblocking/nonblocking-mailbox.vala src/engine/nonblocking/nonblocking-mutex.vala +src/engine/nonblocking/nonblocking-queue.vala src/engine/nonblocking/nonblocking-reporting-semaphore.vala src/engine/nonblocking/nonblocking-variants.vala src/engine/rfc822/rfc822-error.vala diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ecfbcb09..5f7fa99d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -258,8 +258,8 @@ engine/nonblocking/nonblocking-batch.vala engine/nonblocking/nonblocking-concurrent.vala engine/nonblocking/nonblocking-counting-semaphore.vala engine/nonblocking/nonblocking-error.vala -engine/nonblocking/nonblocking-mailbox.vala engine/nonblocking/nonblocking-mutex.vala +engine/nonblocking/nonblocking-queue.vala engine/nonblocking/nonblocking-reporting-semaphore.vala engine/nonblocking/nonblocking-variants.vala diff --git a/src/engine/app/app-draft-manager.vala b/src/engine/app/app-draft-manager.vala index 068ee905..798e7f0f 100644 --- a/src/engine/app/app-draft-manager.vala +++ b/src/engine/app/app-draft-manager.vala @@ -125,7 +125,8 @@ public class Geary.App.DraftManager : BaseObject { private Folder? drafts_folder = null; private FolderSupport.Create? create_support = null; private FolderSupport.Remove? remove_support = null; - private Nonblocking.Mailbox mailbox = new Nonblocking.Mailbox(); + private Nonblocking.Queue mailbox = + new Nonblocking.Queue.fifo(); private bool was_opened = false; private Error? fatal_err = null; @@ -380,16 +381,15 @@ public class Geary.App.DraftManager : BaseObject { // reporting it again if (fatal_err != null) break; - + Operation op; try { - op = yield mailbox.recv_async(null); + op = yield mailbox.receive(null); } catch (Error err) { fatal(err); - break; } - + bool continue_loop = yield operation_loop_iteration_async(op); // fire semaphore, if present diff --git a/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala b/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala index caa98a75..76065cf2 100644 --- a/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala +++ b/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala @@ -9,8 +9,8 @@ private class Geary.App.ConversationOperationQueue : BaseObject { public Geary.SimpleProgressMonitor progress_monitor { get; private set; default = new Geary.SimpleProgressMonitor(Geary.ProgressType.ACTIVITY); } - private Geary.Nonblocking.Mailbox mailbox - = new Geary.Nonblocking.Mailbox(); + private Geary.Nonblocking.Queue mailbox + = new Geary.Nonblocking.Queue.fifo(); private Geary.Nonblocking.Spinlock processing_done_spinlock = new Geary.Nonblocking.Spinlock(); @@ -61,7 +61,7 @@ private class Geary.App.ConversationOperationQueue : BaseObject { for (;;) { ConversationOperation op; try { - op = yield mailbox.recv_async(); + op = yield mailbox.receive(); } catch (Error e) { debug("Error processing in conversation operation mailbox: %s", e.message); break; diff --git a/src/engine/imap-db/outbox/smtp-outbox-folder.vala b/src/engine/imap-db/outbox/smtp-outbox-folder.vala index 4211392c..4e08bb63 100644 --- a/src/engine/imap-db/outbox/smtp-outbox-folder.vala +++ b/src/engine/imap-db/outbox/smtp-outbox-folder.vala @@ -81,7 +81,7 @@ private class Geary.SmtpOutboxFolder : private ImapDB.Database db; private Cancellable? queue_cancellable = null; - private Nonblocking.Mailbox outbox_queue = new Nonblocking.Mailbox(); + private Nonblocking.Queue outbox_queue = new Nonblocking.Queue.fifo(); private Geary.ProgressMonitor sending_monitor; private SmtpOutboxFolderProperties _properties = new SmtpOutboxFolderProperties(0, 0); private int64 next_ordering = 0; @@ -129,7 +129,7 @@ private class Geary.SmtpOutboxFolder : OutboxRow? row = null; bool row_handled = false; try { - row = yield this.outbox_queue.recv_async(cancellable); + row = yield this.outbox_queue.receive(cancellable); row_handled = yield postman_send(row, cancellable); } catch (SmtpError err) { ProblemType problem = ProblemType.GENERIC_ERROR; diff --git a/src/engine/imap-engine/imap-engine-account-synchronizer.vala b/src/engine/imap-engine/imap-engine-account-synchronizer.vala index ca0101ec..b8dd07bf 100644 --- a/src/engine/imap-engine/imap-engine-account-synchronizer.vala +++ b/src/engine/imap-engine/imap-engine-account-synchronizer.vala @@ -12,9 +12,12 @@ private class Geary.ImapEngine.AccountSynchronizer : Geary.BaseObject { private weak GenericAccount account { get; private set; } private weak Imap.Account remote { get; private set; } - private Nonblocking.Mailbox bg_queue = new Nonblocking.Mailbox(bg_queue_comparator); - private Gee.HashSet made_available = new Gee.HashSet(); - private Gee.HashSet unavailable_paths = new Gee.HashSet(); + private Nonblocking.Queue bg_queue = + new Nonblocking.Queue.priority(bg_queue_comparator); + private Gee.HashSet made_available = + new Gee.HashSet(); + private Gee.HashSet unavailable_paths = + new Gee.HashSet(); private MinimalFolder? current_folder = null; private Cancellable? bg_cancellable = null; private DateTime max_epoch = new DateTime(new TimeZone.local(), 2000, 1, 1, 0, 0, 0.0); @@ -219,7 +222,7 @@ private class Geary.ImapEngine.AccountSynchronizer : Geary.BaseObject { while (!cancellable.is_cancelled()) { MinimalFolder folder; try { - folder = yield bg_queue.recv_async(bg_cancellable); + folder = yield bg_queue.receive(bg_cancellable); } catch (Error err) { if (!(err is IOError.CANCELLED)) debug("Failed to receive next folder for background sync: %s", err.message); diff --git a/src/engine/imap-engine/imap-engine-replay-queue.vala b/src/engine/imap-engine/imap-engine-replay-queue.vala index be4056fe..00301df7 100644 --- a/src/engine/imap-engine/imap-engine-replay-queue.vala +++ b/src/engine/imap-engine/imap-engine-replay-queue.vala @@ -4,6 +4,13 @@ * (version 2.1 or later). See the COPYING file in this distribution. */ +/** + * Interleaves IMAP operations to maintain consistent sequence numbering. + * + * The replay queue manages and executes operations originating both + * locally and from the server for a specific IMAP mailbox so as to + * ensure the execution of the operations maintains consistent. + */ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject { // this value is high because delays between back-to-back unsolicited notifications have been // see as high as 250ms @@ -56,8 +63,10 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject { } } private weak MinimalFolder owner; - private Nonblocking.Mailbox local_queue = new Nonblocking.Mailbox(); - private Nonblocking.Mailbox remote_queue = new Nonblocking.Mailbox(); + private Nonblocking.Queue local_queue = + new Nonblocking.Queue.fifo(); + private Nonblocking.Queue remote_queue = + new Nonblocking.Queue.fifo(); private ReplayOperation? local_op_active = null; private ReplayOperation? remote_op_active = null; private Gee.ArrayList notification_queue = new Gee.ArrayList(); @@ -359,7 +368,7 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject { while (queue_running) { ReplayOperation op; try { - op = yield local_queue.recv_async(); + op = yield local_queue.receive(); } catch (Error recv_err) { debug("Unable to receive next replay operation on local queue %s: %s", to_string(), recv_err.message); @@ -459,7 +468,7 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject { // wait for the next operation ... do this *before* waiting for remote ReplayOperation op; try { - op = yield remote_queue.recv_async(); + op = yield remote_queue.receive(); } catch (Error recv_err) { debug("Unable to receive next replay operation on remote queue %s: %s", to_string(), recv_err.message); diff --git a/src/engine/nonblocking/nonblocking-mailbox.vala b/src/engine/nonblocking/nonblocking-mailbox.vala deleted file mode 100644 index 5cd97a0c..00000000 --- a/src/engine/nonblocking/nonblocking-mailbox.vala +++ /dev/null @@ -1,109 +0,0 @@ -/* Copyright 2016 Software Freedom Conservancy Inc. - * - * This software is licensed under the GNU Lesser General Public License - * (version 2.1 or later). See the COPYING file in this distribution. - */ - -public class Geary.Nonblocking.Mailbox : BaseObject { - public int size { get { return queue.size; } } - - public bool allow_duplicates { get; set; default = true; } - - public bool requeue_duplicate { get; set; default = false; } - - private bool _is_paused = false; - public bool is_paused { - get { return _is_paused; } - - set { - // if no longer paused, wake up any waiting recipients - if (_is_paused && !value) - spinlock.blind_notify(); - - _is_paused = value; - } - } - - private Gee.Queue queue; - private Nonblocking.Spinlock spinlock = new Nonblocking.Spinlock(); - - public Mailbox(owned CompareDataFunc? comparator = null) { - if (comparator == null && !typeof(G).is_a(typeof(Gee.Comparable))) - queue = new Gee.LinkedList(); - else - queue = new Gee.PriorityQueue((owned) comparator); - } - - public bool send(G msg) { - if (!allow_duplicates && queue.contains(msg)) { - if (requeue_duplicate) - queue.remove(msg); - else - return false; - } - - if (!queue.offer(msg)) - return false; - - if (!is_paused) - spinlock.blind_notify(); - - return true; - } - - /** - * Returns true if the message was revoked. - */ - public bool revoke(G msg) { - return queue.remove(msg); - } - - /** - * Returns number of removed items. - */ - public int clear() { - int count = queue.size; - if (count != 0) - queue.clear(); - - return count; - } - - /** - * Remove messages matching the given predicate. Return the removed messages. - */ - public Gee.Collection revoke_matching(owned Gee.Predicate predicate) { - Gee.ArrayList removed = new Gee.ArrayList(); - // Iterate over a copy so we can modify the original. - foreach (G msg in queue.to_array()) { - if (predicate(msg)) { - queue.remove(msg); - removed.add(msg); - } - } - - return removed; - } - - public async G recv_async(Cancellable? cancellable = null) throws Error { - for (;;) { - if (queue.size > 0 && !is_paused) - return queue.poll(); - - yield spinlock.wait_async(cancellable); - } - } - - /** - * Returns a read-only version of the mailbox queue that can be iterated in queue-order. - * - * Since the queue could potentially alter when the main loop runs, it's important to only - * examine the queue when not allowing other operations to process. - * - * Altering will not affect the actual queue. Use {@link revoke} to remove enqueued operations. - */ - public Gee.Collection get_all() { - return queue.read_only_view; - } -} - diff --git a/src/engine/nonblocking/nonblocking-queue.vala b/src/engine/nonblocking/nonblocking-queue.vala new file mode 100644 index 00000000..984a6f25 --- /dev/null +++ b/src/engine/nonblocking/nonblocking-queue.vala @@ -0,0 +1,171 @@ +/* + * Copyright 2016 Software Freedom Conservancy Inc. + * Copyright 2017 Michael Gratton + * + * This software is licensed under the GNU Lesser General Public License + * (version 2.1 or later). See the COPYING file in this distribution. + */ + +/** + * An asynchronous queue, first-in first-out (FIFO) or priority. + * + * This class can be used to asynchronously wait for items to be added + * to the queue, the asynchronous call blocking until an item is + * ready. + */ +public class Geary.Nonblocking.Queue : BaseObject { + + /** Returns the number of items currently in the queue. */ + public int size { get { return queue.size; } } + + /** + * Determines if duplicate items can be added to the queue. + * + * If a priory queue, this applies to items of the same priority, + * otherwise uses the item's natural identity. + */ + public bool allow_duplicates { get; set; default = true; } + + /** + * Determines if duplicate items will be added to the queue. + * + * If {@link allow_duplicates} is `true` and an item is already in + * the queue, this determines if it will be added again. + */ + public bool requeue_duplicate { get; set; default = false; } + + /** + * Determines if the queue is currently running. + */ + public bool is_paused { + get { return _is_paused; } + + set { + // if no longer paused, wake up any waiting recipients + if (_is_paused && !value) + spinlock.blind_notify(); + + _is_paused = value; + } + } + private bool _is_paused = false; + + private Gee.Queue queue; + private Nonblocking.Spinlock spinlock = new Nonblocking.Spinlock(); + + + /** + * Constructs a new first-in first-out (FIFO) queue. + * + * If `equalator` is not null it will be used to determine the + * identity of objects in the queue, else the items' natural + * identity will be used. + */ + public Queue.fifo(owned Gee.EqualDataFunc? equalator = null) { + this(new Gee.LinkedList(equalator)); + } + + /** + * Constructs a new priority queue. + * + * If `comparator` is not null it will be used to determine the + * ordering of objects in the queue, else the items' natural + * ordering will be used. + */ + public Queue.priority(owned CompareDataFunc? comparator = null) { + this(new Gee.PriorityQueue(comparator)); + } + + /** + * Constructs a new queue. + */ + protected Queue(Gee.Queue queue) { + this.queue = queue; + } + + /** + * Adds an item to the queue. + * + * If the queue is a priority queue, it is added according to its + * relative priority, else it is added to the end. + * + * Returns `true` if the item was added to the queue. + */ + public bool send(G msg) { + if (!allow_duplicates && queue.contains(msg)) { + if (requeue_duplicate) + queue.remove(msg); + else + return false; + } + + if (!queue.offer(msg)) + return false; + + if (!is_paused) + spinlock.blind_notify(); + + return true; + } + + /** + * Retrieves the next item from the queue, blocking until available. + * + * If the queue is paused, this will continue to wait until + * unpaused and an item is ready. If `cancellable` is non-null, + * when used will cancel this call. + */ + public async G receive(Cancellable? cancellable = null) throws Error { + for (;;) { + if (queue.size > 0 && !is_paused) + return queue.poll(); + + yield spinlock.wait_async(cancellable); + } + } + + /** + * Removes all items in queue, returning the number of removed items. + */ + public int clear() { + int count = queue.size; + if (count != 0) + queue.clear(); + + return count; + } + + /** + * Removes an item from the queue, returning `true` if removed. + */ + public bool revoke(G msg) { + return queue.remove(msg); + } + + /** + * Remove items matching the given predicate, returning those removed. + */ + public Gee.Collection revoke_matching(owned Gee.Predicate predicate) { + Gee.ArrayList removed = new Gee.ArrayList(); + // Iterate over a copy so we can modify the original. + foreach (G msg in queue.to_array()) { + if (predicate(msg)) { + queue.remove(msg); + removed.add(msg); + } + } + + return removed; + } + + /** + * Returns a read-only version of the queue queue. + * + * Since the queue could potentially alter when the main loop + * runs, it's important to only examine the queue when not + * allowing other operations to process. + */ + public Gee.Collection get_all() { + return queue.read_only_view; + } +}