From 9a83e95b89a0e4bbce453a4e40da6fce920be22d Mon Sep 17 00:00:00 2001 From: Michael James Gratton Date: Wed, 22 Nov 2017 12:00:37 +1100 Subject: [PATCH] Rename Geary.Nonblocking.Mailbox to Queue, make constructed type explicit. Mailbox would produce a FIFO or priority queue depending on if you pass in a comparator or not. This adds additional constructors to make that explicit, and also now allows the FIFO to have its own equality function. Renames the class to something less confusing for an email library. Also adds doc comments for all public members. * src/engine/nonblocking/nonblocking-queue.vala: Renamed from nonblocking-mailbox.vala, rename class from Mailbox to Queue, rename recv_async to just 'receive'. Provide full documentation comments. Update call sites. --- po/POTFILES.in | 2 +- src/CMakeLists.txt | 2 +- src/engine/app/app-draft-manager.vala | 10 +- .../app-conversation-operation-queue.vala | 6 +- .../imap-db/outbox/smtp-outbox-folder.vala | 4 +- .../imap-engine-account-synchronizer.vala | 11 +- .../imap-engine/imap-engine-replay-queue.vala | 17 +- .../nonblocking/nonblocking-mailbox.vala | 109 ----------- src/engine/nonblocking/nonblocking-queue.vala | 171 ++++++++++++++++++ 9 files changed, 203 insertions(+), 129 deletions(-) delete mode 100644 src/engine/nonblocking/nonblocking-mailbox.vala create mode 100644 src/engine/nonblocking/nonblocking-queue.vala diff --git a/po/POTFILES.in b/po/POTFILES.in index 002f3205..4f796880 100644 --- a/po/POTFILES.in +++ b/po/POTFILES.in @@ -339,8 +339,8 @@ src/engine/nonblocking/nonblocking-batch.vala src/engine/nonblocking/nonblocking-concurrent.vala src/engine/nonblocking/nonblocking-counting-semaphore.vala src/engine/nonblocking/nonblocking-error.vala -src/engine/nonblocking/nonblocking-mailbox.vala src/engine/nonblocking/nonblocking-mutex.vala +src/engine/nonblocking/nonblocking-queue.vala src/engine/nonblocking/nonblocking-reporting-semaphore.vala src/engine/nonblocking/nonblocking-variants.vala src/engine/rfc822/rfc822-error.vala diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index ecfbcb09..5f7fa99d 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -258,8 +258,8 @@ engine/nonblocking/nonblocking-batch.vala engine/nonblocking/nonblocking-concurrent.vala engine/nonblocking/nonblocking-counting-semaphore.vala engine/nonblocking/nonblocking-error.vala -engine/nonblocking/nonblocking-mailbox.vala engine/nonblocking/nonblocking-mutex.vala +engine/nonblocking/nonblocking-queue.vala engine/nonblocking/nonblocking-reporting-semaphore.vala engine/nonblocking/nonblocking-variants.vala diff --git a/src/engine/app/app-draft-manager.vala b/src/engine/app/app-draft-manager.vala index 068ee905..798e7f0f 100644 --- a/src/engine/app/app-draft-manager.vala +++ b/src/engine/app/app-draft-manager.vala @@ -125,7 +125,8 @@ public class Geary.App.DraftManager : BaseObject { private Folder? drafts_folder = null; private FolderSupport.Create? create_support = null; private FolderSupport.Remove? remove_support = null; - private Nonblocking.Mailbox mailbox = new Nonblocking.Mailbox(); + private Nonblocking.Queue mailbox = + new Nonblocking.Queue.fifo(); private bool was_opened = false; private Error? fatal_err = null; @@ -380,16 +381,15 @@ public class Geary.App.DraftManager : BaseObject { // reporting it again if (fatal_err != null) break; - + Operation op; try { - op = yield mailbox.recv_async(null); + op = yield mailbox.receive(null); } catch (Error err) { fatal(err); - break; } - + bool continue_loop = yield operation_loop_iteration_async(op); // fire semaphore, if present diff --git a/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala b/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala index caa98a75..76065cf2 100644 --- a/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala +++ b/src/engine/app/conversation-monitor/app-conversation-operation-queue.vala @@ -9,8 +9,8 @@ private class Geary.App.ConversationOperationQueue : BaseObject { public Geary.SimpleProgressMonitor progress_monitor { get; private set; default = new Geary.SimpleProgressMonitor(Geary.ProgressType.ACTIVITY); } - private Geary.Nonblocking.Mailbox mailbox - = new Geary.Nonblocking.Mailbox(); + private Geary.Nonblocking.Queue mailbox + = new Geary.Nonblocking.Queue.fifo(); private Geary.Nonblocking.Spinlock processing_done_spinlock = new Geary.Nonblocking.Spinlock(); @@ -61,7 +61,7 @@ private class Geary.App.ConversationOperationQueue : BaseObject { for (;;) { ConversationOperation op; try { - op = yield mailbox.recv_async(); + op = yield mailbox.receive(); } catch (Error e) { debug("Error processing in conversation operation mailbox: %s", e.message); break; diff --git a/src/engine/imap-db/outbox/smtp-outbox-folder.vala b/src/engine/imap-db/outbox/smtp-outbox-folder.vala index 4211392c..4e08bb63 100644 --- a/src/engine/imap-db/outbox/smtp-outbox-folder.vala +++ b/src/engine/imap-db/outbox/smtp-outbox-folder.vala @@ -81,7 +81,7 @@ private class Geary.SmtpOutboxFolder : private ImapDB.Database db; private Cancellable? queue_cancellable = null; - private Nonblocking.Mailbox outbox_queue = new Nonblocking.Mailbox(); + private Nonblocking.Queue outbox_queue = new Nonblocking.Queue.fifo(); private Geary.ProgressMonitor sending_monitor; private SmtpOutboxFolderProperties _properties = new SmtpOutboxFolderProperties(0, 0); private int64 next_ordering = 0; @@ -129,7 +129,7 @@ private class Geary.SmtpOutboxFolder : OutboxRow? row = null; bool row_handled = false; try { - row = yield this.outbox_queue.recv_async(cancellable); + row = yield this.outbox_queue.receive(cancellable); row_handled = yield postman_send(row, cancellable); } catch (SmtpError err) { ProblemType problem = ProblemType.GENERIC_ERROR; diff --git a/src/engine/imap-engine/imap-engine-account-synchronizer.vala b/src/engine/imap-engine/imap-engine-account-synchronizer.vala index ca0101ec..b8dd07bf 100644 --- a/src/engine/imap-engine/imap-engine-account-synchronizer.vala +++ b/src/engine/imap-engine/imap-engine-account-synchronizer.vala @@ -12,9 +12,12 @@ private class Geary.ImapEngine.AccountSynchronizer : Geary.BaseObject { private weak GenericAccount account { get; private set; } private weak Imap.Account remote { get; private set; } - private Nonblocking.Mailbox bg_queue = new Nonblocking.Mailbox(bg_queue_comparator); - private Gee.HashSet made_available = new Gee.HashSet(); - private Gee.HashSet unavailable_paths = new Gee.HashSet(); + private Nonblocking.Queue bg_queue = + new Nonblocking.Queue.priority(bg_queue_comparator); + private Gee.HashSet made_available = + new Gee.HashSet(); + private Gee.HashSet unavailable_paths = + new Gee.HashSet(); private MinimalFolder? current_folder = null; private Cancellable? bg_cancellable = null; private DateTime max_epoch = new DateTime(new TimeZone.local(), 2000, 1, 1, 0, 0, 0.0); @@ -219,7 +222,7 @@ private class Geary.ImapEngine.AccountSynchronizer : Geary.BaseObject { while (!cancellable.is_cancelled()) { MinimalFolder folder; try { - folder = yield bg_queue.recv_async(bg_cancellable); + folder = yield bg_queue.receive(bg_cancellable); } catch (Error err) { if (!(err is IOError.CANCELLED)) debug("Failed to receive next folder for background sync: %s", err.message); diff --git a/src/engine/imap-engine/imap-engine-replay-queue.vala b/src/engine/imap-engine/imap-engine-replay-queue.vala index be4056fe..00301df7 100644 --- a/src/engine/imap-engine/imap-engine-replay-queue.vala +++ b/src/engine/imap-engine/imap-engine-replay-queue.vala @@ -4,6 +4,13 @@ * (version 2.1 or later). See the COPYING file in this distribution. */ +/** + * Interleaves IMAP operations to maintain consistent sequence numbering. + * + * The replay queue manages and executes operations originating both + * locally and from the server for a specific IMAP mailbox so as to + * ensure the execution of the operations maintains consistent. + */ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject { // this value is high because delays between back-to-back unsolicited notifications have been // see as high as 250ms @@ -56,8 +63,10 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject { } } private weak MinimalFolder owner; - private Nonblocking.Mailbox local_queue = new Nonblocking.Mailbox(); - private Nonblocking.Mailbox remote_queue = new Nonblocking.Mailbox(); + private Nonblocking.Queue local_queue = + new Nonblocking.Queue.fifo(); + private Nonblocking.Queue remote_queue = + new Nonblocking.Queue.fifo(); private ReplayOperation? local_op_active = null; private ReplayOperation? remote_op_active = null; private Gee.ArrayList notification_queue = new Gee.ArrayList(); @@ -359,7 +368,7 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject { while (queue_running) { ReplayOperation op; try { - op = yield local_queue.recv_async(); + op = yield local_queue.receive(); } catch (Error recv_err) { debug("Unable to receive next replay operation on local queue %s: %s", to_string(), recv_err.message); @@ -459,7 +468,7 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject { // wait for the next operation ... do this *before* waiting for remote ReplayOperation op; try { - op = yield remote_queue.recv_async(); + op = yield remote_queue.receive(); } catch (Error recv_err) { debug("Unable to receive next replay operation on remote queue %s: %s", to_string(), recv_err.message); diff --git a/src/engine/nonblocking/nonblocking-mailbox.vala b/src/engine/nonblocking/nonblocking-mailbox.vala deleted file mode 100644 index 5cd97a0c..00000000 --- a/src/engine/nonblocking/nonblocking-mailbox.vala +++ /dev/null @@ -1,109 +0,0 @@ -/* Copyright 2016 Software Freedom Conservancy Inc. - * - * This software is licensed under the GNU Lesser General Public License - * (version 2.1 or later). See the COPYING file in this distribution. - */ - -public class Geary.Nonblocking.Mailbox : BaseObject { - public int size { get { return queue.size; } } - - public bool allow_duplicates { get; set; default = true; } - - public bool requeue_duplicate { get; set; default = false; } - - private bool _is_paused = false; - public bool is_paused { - get { return _is_paused; } - - set { - // if no longer paused, wake up any waiting recipients - if (_is_paused && !value) - spinlock.blind_notify(); - - _is_paused = value; - } - } - - private Gee.Queue queue; - private Nonblocking.Spinlock spinlock = new Nonblocking.Spinlock(); - - public Mailbox(owned CompareDataFunc? comparator = null) { - if (comparator == null && !typeof(G).is_a(typeof(Gee.Comparable))) - queue = new Gee.LinkedList(); - else - queue = new Gee.PriorityQueue((owned) comparator); - } - - public bool send(G msg) { - if (!allow_duplicates && queue.contains(msg)) { - if (requeue_duplicate) - queue.remove(msg); - else - return false; - } - - if (!queue.offer(msg)) - return false; - - if (!is_paused) - spinlock.blind_notify(); - - return true; - } - - /** - * Returns true if the message was revoked. - */ - public bool revoke(G msg) { - return queue.remove(msg); - } - - /** - * Returns number of removed items. - */ - public int clear() { - int count = queue.size; - if (count != 0) - queue.clear(); - - return count; - } - - /** - * Remove messages matching the given predicate. Return the removed messages. - */ - public Gee.Collection revoke_matching(owned Gee.Predicate predicate) { - Gee.ArrayList removed = new Gee.ArrayList(); - // Iterate over a copy so we can modify the original. - foreach (G msg in queue.to_array()) { - if (predicate(msg)) { - queue.remove(msg); - removed.add(msg); - } - } - - return removed; - } - - public async G recv_async(Cancellable? cancellable = null) throws Error { - for (;;) { - if (queue.size > 0 && !is_paused) - return queue.poll(); - - yield spinlock.wait_async(cancellable); - } - } - - /** - * Returns a read-only version of the mailbox queue that can be iterated in queue-order. - * - * Since the queue could potentially alter when the main loop runs, it's important to only - * examine the queue when not allowing other operations to process. - * - * Altering will not affect the actual queue. Use {@link revoke} to remove enqueued operations. - */ - public Gee.Collection get_all() { - return queue.read_only_view; - } -} - diff --git a/src/engine/nonblocking/nonblocking-queue.vala b/src/engine/nonblocking/nonblocking-queue.vala new file mode 100644 index 00000000..984a6f25 --- /dev/null +++ b/src/engine/nonblocking/nonblocking-queue.vala @@ -0,0 +1,171 @@ +/* + * Copyright 2016 Software Freedom Conservancy Inc. + * Copyright 2017 Michael Gratton + * + * This software is licensed under the GNU Lesser General Public License + * (version 2.1 or later). See the COPYING file in this distribution. + */ + +/** + * An asynchronous queue, first-in first-out (FIFO) or priority. + * + * This class can be used to asynchronously wait for items to be added + * to the queue, the asynchronous call blocking until an item is + * ready. + */ +public class Geary.Nonblocking.Queue : BaseObject { + + /** Returns the number of items currently in the queue. */ + public int size { get { return queue.size; } } + + /** + * Determines if duplicate items can be added to the queue. + * + * If a priory queue, this applies to items of the same priority, + * otherwise uses the item's natural identity. + */ + public bool allow_duplicates { get; set; default = true; } + + /** + * Determines if duplicate items will be added to the queue. + * + * If {@link allow_duplicates} is `true` and an item is already in + * the queue, this determines if it will be added again. + */ + public bool requeue_duplicate { get; set; default = false; } + + /** + * Determines if the queue is currently running. + */ + public bool is_paused { + get { return _is_paused; } + + set { + // if no longer paused, wake up any waiting recipients + if (_is_paused && !value) + spinlock.blind_notify(); + + _is_paused = value; + } + } + private bool _is_paused = false; + + private Gee.Queue queue; + private Nonblocking.Spinlock spinlock = new Nonblocking.Spinlock(); + + + /** + * Constructs a new first-in first-out (FIFO) queue. + * + * If `equalator` is not null it will be used to determine the + * identity of objects in the queue, else the items' natural + * identity will be used. + */ + public Queue.fifo(owned Gee.EqualDataFunc? equalator = null) { + this(new Gee.LinkedList(equalator)); + } + + /** + * Constructs a new priority queue. + * + * If `comparator` is not null it will be used to determine the + * ordering of objects in the queue, else the items' natural + * ordering will be used. + */ + public Queue.priority(owned CompareDataFunc? comparator = null) { + this(new Gee.PriorityQueue(comparator)); + } + + /** + * Constructs a new queue. + */ + protected Queue(Gee.Queue queue) { + this.queue = queue; + } + + /** + * Adds an item to the queue. + * + * If the queue is a priority queue, it is added according to its + * relative priority, else it is added to the end. + * + * Returns `true` if the item was added to the queue. + */ + public bool send(G msg) { + if (!allow_duplicates && queue.contains(msg)) { + if (requeue_duplicate) + queue.remove(msg); + else + return false; + } + + if (!queue.offer(msg)) + return false; + + if (!is_paused) + spinlock.blind_notify(); + + return true; + } + + /** + * Retrieves the next item from the queue, blocking until available. + * + * If the queue is paused, this will continue to wait until + * unpaused and an item is ready. If `cancellable` is non-null, + * when used will cancel this call. + */ + public async G receive(Cancellable? cancellable = null) throws Error { + for (;;) { + if (queue.size > 0 && !is_paused) + return queue.poll(); + + yield spinlock.wait_async(cancellable); + } + } + + /** + * Removes all items in queue, returning the number of removed items. + */ + public int clear() { + int count = queue.size; + if (count != 0) + queue.clear(); + + return count; + } + + /** + * Removes an item from the queue, returning `true` if removed. + */ + public bool revoke(G msg) { + return queue.remove(msg); + } + + /** + * Remove items matching the given predicate, returning those removed. + */ + public Gee.Collection revoke_matching(owned Gee.Predicate predicate) { + Gee.ArrayList removed = new Gee.ArrayList(); + // Iterate over a copy so we can modify the original. + foreach (G msg in queue.to_array()) { + if (predicate(msg)) { + queue.remove(msg); + removed.add(msg); + } + } + + return removed; + } + + /** + * Returns a read-only version of the queue queue. + * + * Since the queue could potentially alter when the main loop + * runs, it's important to only examine the queue when not + * allowing other operations to process. + */ + public Gee.Collection get_all() { + return queue.read_only_view; + } +}