Rename Geary.Nonblocking.Mailbox to Queue, make constructed type explicit.
Mailbox would produce a FIFO or priority queue depending on if you pass in a comparator or not. This adds additional constructors to make that explicit, and also now allows the FIFO to have its own equality function. Renames the class to something less confusing for an email library. Also adds doc comments for all public members. * src/engine/nonblocking/nonblocking-queue.vala: Renamed from nonblocking-mailbox.vala, rename class from Mailbox to Queue, rename recv_async to just 'receive'. Provide full documentation comments. Update call sites.
This commit is contained in:
parent
724202d112
commit
9a83e95b89
9 changed files with 203 additions and 129 deletions
|
|
@ -339,8 +339,8 @@ src/engine/nonblocking/nonblocking-batch.vala
|
|||
src/engine/nonblocking/nonblocking-concurrent.vala
|
||||
src/engine/nonblocking/nonblocking-counting-semaphore.vala
|
||||
src/engine/nonblocking/nonblocking-error.vala
|
||||
src/engine/nonblocking/nonblocking-mailbox.vala
|
||||
src/engine/nonblocking/nonblocking-mutex.vala
|
||||
src/engine/nonblocking/nonblocking-queue.vala
|
||||
src/engine/nonblocking/nonblocking-reporting-semaphore.vala
|
||||
src/engine/nonblocking/nonblocking-variants.vala
|
||||
src/engine/rfc822/rfc822-error.vala
|
||||
|
|
|
|||
|
|
@ -258,8 +258,8 @@ engine/nonblocking/nonblocking-batch.vala
|
|||
engine/nonblocking/nonblocking-concurrent.vala
|
||||
engine/nonblocking/nonblocking-counting-semaphore.vala
|
||||
engine/nonblocking/nonblocking-error.vala
|
||||
engine/nonblocking/nonblocking-mailbox.vala
|
||||
engine/nonblocking/nonblocking-mutex.vala
|
||||
engine/nonblocking/nonblocking-queue.vala
|
||||
engine/nonblocking/nonblocking-reporting-semaphore.vala
|
||||
engine/nonblocking/nonblocking-variants.vala
|
||||
|
||||
|
|
|
|||
|
|
@ -125,7 +125,8 @@ public class Geary.App.DraftManager : BaseObject {
|
|||
private Folder? drafts_folder = null;
|
||||
private FolderSupport.Create? create_support = null;
|
||||
private FolderSupport.Remove? remove_support = null;
|
||||
private Nonblocking.Mailbox<Operation?> mailbox = new Nonblocking.Mailbox<Operation?>();
|
||||
private Nonblocking.Queue<Operation?> mailbox =
|
||||
new Nonblocking.Queue<Operation?>.fifo();
|
||||
private bool was_opened = false;
|
||||
private Error? fatal_err = null;
|
||||
|
||||
|
|
@ -380,16 +381,15 @@ public class Geary.App.DraftManager : BaseObject {
|
|||
// reporting it again
|
||||
if (fatal_err != null)
|
||||
break;
|
||||
|
||||
|
||||
Operation op;
|
||||
try {
|
||||
op = yield mailbox.recv_async(null);
|
||||
op = yield mailbox.receive(null);
|
||||
} catch (Error err) {
|
||||
fatal(err);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
|
||||
bool continue_loop = yield operation_loop_iteration_async(op);
|
||||
|
||||
// fire semaphore, if present
|
||||
|
|
|
|||
|
|
@ -9,8 +9,8 @@ private class Geary.App.ConversationOperationQueue : BaseObject {
|
|||
public Geary.SimpleProgressMonitor progress_monitor { get; private set; default =
|
||||
new Geary.SimpleProgressMonitor(Geary.ProgressType.ACTIVITY); }
|
||||
|
||||
private Geary.Nonblocking.Mailbox<ConversationOperation> mailbox
|
||||
= new Geary.Nonblocking.Mailbox<ConversationOperation>();
|
||||
private Geary.Nonblocking.Queue<ConversationOperation> mailbox
|
||||
= new Geary.Nonblocking.Queue<ConversationOperation>.fifo();
|
||||
private Geary.Nonblocking.Spinlock processing_done_spinlock
|
||||
= new Geary.Nonblocking.Spinlock();
|
||||
|
||||
|
|
@ -61,7 +61,7 @@ private class Geary.App.ConversationOperationQueue : BaseObject {
|
|||
for (;;) {
|
||||
ConversationOperation op;
|
||||
try {
|
||||
op = yield mailbox.recv_async();
|
||||
op = yield mailbox.receive();
|
||||
} catch (Error e) {
|
||||
debug("Error processing in conversation operation mailbox: %s", e.message);
|
||||
break;
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ private class Geary.SmtpOutboxFolder :
|
|||
private ImapDB.Database db;
|
||||
|
||||
private Cancellable? queue_cancellable = null;
|
||||
private Nonblocking.Mailbox<OutboxRow> outbox_queue = new Nonblocking.Mailbox<OutboxRow>();
|
||||
private Nonblocking.Queue<OutboxRow> outbox_queue = new Nonblocking.Queue<OutboxRow>.fifo();
|
||||
private Geary.ProgressMonitor sending_monitor;
|
||||
private SmtpOutboxFolderProperties _properties = new SmtpOutboxFolderProperties(0, 0);
|
||||
private int64 next_ordering = 0;
|
||||
|
|
@ -129,7 +129,7 @@ private class Geary.SmtpOutboxFolder :
|
|||
OutboxRow? row = null;
|
||||
bool row_handled = false;
|
||||
try {
|
||||
row = yield this.outbox_queue.recv_async(cancellable);
|
||||
row = yield this.outbox_queue.receive(cancellable);
|
||||
row_handled = yield postman_send(row, cancellable);
|
||||
} catch (SmtpError err) {
|
||||
ProblemType problem = ProblemType.GENERIC_ERROR;
|
||||
|
|
|
|||
|
|
@ -12,9 +12,12 @@ private class Geary.ImapEngine.AccountSynchronizer : Geary.BaseObject {
|
|||
private weak GenericAccount account { get; private set; }
|
||||
private weak Imap.Account remote { get; private set; }
|
||||
|
||||
private Nonblocking.Mailbox<MinimalFolder> bg_queue = new Nonblocking.Mailbox<MinimalFolder>(bg_queue_comparator);
|
||||
private Gee.HashSet<MinimalFolder> made_available = new Gee.HashSet<MinimalFolder>();
|
||||
private Gee.HashSet<FolderPath> unavailable_paths = new Gee.HashSet<FolderPath>();
|
||||
private Nonblocking.Queue<MinimalFolder> bg_queue =
|
||||
new Nonblocking.Queue<MinimalFolder>.priority(bg_queue_comparator);
|
||||
private Gee.HashSet<MinimalFolder> made_available =
|
||||
new Gee.HashSet<MinimalFolder>();
|
||||
private Gee.HashSet<FolderPath> unavailable_paths =
|
||||
new Gee.HashSet<FolderPath>();
|
||||
private MinimalFolder? current_folder = null;
|
||||
private Cancellable? bg_cancellable = null;
|
||||
private DateTime max_epoch = new DateTime(new TimeZone.local(), 2000, 1, 1, 0, 0, 0.0);
|
||||
|
|
@ -219,7 +222,7 @@ private class Geary.ImapEngine.AccountSynchronizer : Geary.BaseObject {
|
|||
while (!cancellable.is_cancelled()) {
|
||||
MinimalFolder folder;
|
||||
try {
|
||||
folder = yield bg_queue.recv_async(bg_cancellable);
|
||||
folder = yield bg_queue.receive(bg_cancellable);
|
||||
} catch (Error err) {
|
||||
if (!(err is IOError.CANCELLED))
|
||||
debug("Failed to receive next folder for background sync: %s", err.message);
|
||||
|
|
|
|||
|
|
@ -4,6 +4,13 @@
|
|||
* (version 2.1 or later). See the COPYING file in this distribution.
|
||||
*/
|
||||
|
||||
/**
|
||||
* Interleaves IMAP operations to maintain consistent sequence numbering.
|
||||
*
|
||||
* The replay queue manages and executes operations originating both
|
||||
* locally and from the server for a specific IMAP mailbox so as to
|
||||
* ensure the execution of the operations maintains consistent.
|
||||
*/
|
||||
private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
|
||||
// this value is high because delays between back-to-back unsolicited notifications have been
|
||||
// see as high as 250ms
|
||||
|
|
@ -56,8 +63,10 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
|
|||
} }
|
||||
|
||||
private weak MinimalFolder owner;
|
||||
private Nonblocking.Mailbox<ReplayOperation> local_queue = new Nonblocking.Mailbox<ReplayOperation>();
|
||||
private Nonblocking.Mailbox<ReplayOperation> remote_queue = new Nonblocking.Mailbox<ReplayOperation>();
|
||||
private Nonblocking.Queue<ReplayOperation> local_queue =
|
||||
new Nonblocking.Queue<ReplayOperation>.fifo();
|
||||
private Nonblocking.Queue<ReplayOperation> remote_queue =
|
||||
new Nonblocking.Queue<ReplayOperation>.fifo();
|
||||
private ReplayOperation? local_op_active = null;
|
||||
private ReplayOperation? remote_op_active = null;
|
||||
private Gee.ArrayList<ReplayOperation> notification_queue = new Gee.ArrayList<ReplayOperation>();
|
||||
|
|
@ -359,7 +368,7 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
|
|||
while (queue_running) {
|
||||
ReplayOperation op;
|
||||
try {
|
||||
op = yield local_queue.recv_async();
|
||||
op = yield local_queue.receive();
|
||||
} catch (Error recv_err) {
|
||||
debug("Unable to receive next replay operation on local queue %s: %s", to_string(),
|
||||
recv_err.message);
|
||||
|
|
@ -459,7 +468,7 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
|
|||
// wait for the next operation ... do this *before* waiting for remote
|
||||
ReplayOperation op;
|
||||
try {
|
||||
op = yield remote_queue.recv_async();
|
||||
op = yield remote_queue.receive();
|
||||
} catch (Error recv_err) {
|
||||
debug("Unable to receive next replay operation on remote queue %s: %s", to_string(),
|
||||
recv_err.message);
|
||||
|
|
|
|||
|
|
@ -1,109 +0,0 @@
|
|||
/* Copyright 2016 Software Freedom Conservancy Inc.
|
||||
*
|
||||
* This software is licensed under the GNU Lesser General Public License
|
||||
* (version 2.1 or later). See the COPYING file in this distribution.
|
||||
*/
|
||||
|
||||
public class Geary.Nonblocking.Mailbox<G> : BaseObject {
|
||||
public int size { get { return queue.size; } }
|
||||
|
||||
public bool allow_duplicates { get; set; default = true; }
|
||||
|
||||
public bool requeue_duplicate { get; set; default = false; }
|
||||
|
||||
private bool _is_paused = false;
|
||||
public bool is_paused {
|
||||
get { return _is_paused; }
|
||||
|
||||
set {
|
||||
// if no longer paused, wake up any waiting recipients
|
||||
if (_is_paused && !value)
|
||||
spinlock.blind_notify();
|
||||
|
||||
_is_paused = value;
|
||||
}
|
||||
}
|
||||
|
||||
private Gee.Queue<G> queue;
|
||||
private Nonblocking.Spinlock spinlock = new Nonblocking.Spinlock();
|
||||
|
||||
public Mailbox(owned CompareDataFunc<G>? comparator = null) {
|
||||
if (comparator == null && !typeof(G).is_a(typeof(Gee.Comparable)))
|
||||
queue = new Gee.LinkedList<G>();
|
||||
else
|
||||
queue = new Gee.PriorityQueue<G>((owned) comparator);
|
||||
}
|
||||
|
||||
public bool send(G msg) {
|
||||
if (!allow_duplicates && queue.contains(msg)) {
|
||||
if (requeue_duplicate)
|
||||
queue.remove(msg);
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!queue.offer(msg))
|
||||
return false;
|
||||
|
||||
if (!is_paused)
|
||||
spinlock.blind_notify();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the message was revoked.
|
||||
*/
|
||||
public bool revoke(G msg) {
|
||||
return queue.remove(msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns number of removed items.
|
||||
*/
|
||||
public int clear() {
|
||||
int count = queue.size;
|
||||
if (count != 0)
|
||||
queue.clear();
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove messages matching the given predicate. Return the removed messages.
|
||||
*/
|
||||
public Gee.Collection<G> revoke_matching(owned Gee.Predicate<G> predicate) {
|
||||
Gee.ArrayList<G> removed = new Gee.ArrayList<G>();
|
||||
// Iterate over a copy so we can modify the original.
|
||||
foreach (G msg in queue.to_array()) {
|
||||
if (predicate(msg)) {
|
||||
queue.remove(msg);
|
||||
removed.add(msg);
|
||||
}
|
||||
}
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
||||
public async G recv_async(Cancellable? cancellable = null) throws Error {
|
||||
for (;;) {
|
||||
if (queue.size > 0 && !is_paused)
|
||||
return queue.poll();
|
||||
|
||||
yield spinlock.wait_async(cancellable);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a read-only version of the mailbox queue that can be iterated in queue-order.
|
||||
*
|
||||
* Since the queue could potentially alter when the main loop runs, it's important to only
|
||||
* examine the queue when not allowing other operations to process.
|
||||
*
|
||||
* Altering will not affect the actual queue. Use {@link revoke} to remove enqueued operations.
|
||||
*/
|
||||
public Gee.Collection<G> get_all() {
|
||||
return queue.read_only_view;
|
||||
}
|
||||
}
|
||||
|
||||
171
src/engine/nonblocking/nonblocking-queue.vala
Normal file
171
src/engine/nonblocking/nonblocking-queue.vala
Normal file
|
|
@ -0,0 +1,171 @@
|
|||
/*
|
||||
* Copyright 2016 Software Freedom Conservancy Inc.
|
||||
* Copyright 2017 Michael Gratton <mike@vee.net>
|
||||
*
|
||||
* This software is licensed under the GNU Lesser General Public License
|
||||
* (version 2.1 or later). See the COPYING file in this distribution.
|
||||
*/
|
||||
|
||||
/**
|
||||
* An asynchronous queue, first-in first-out (FIFO) or priority.
|
||||
*
|
||||
* This class can be used to asynchronously wait for items to be added
|
||||
* to the queue, the asynchronous call blocking until an item is
|
||||
* ready.
|
||||
*/
|
||||
public class Geary.Nonblocking.Queue<G> : BaseObject {
|
||||
|
||||
/** Returns the number of items currently in the queue. */
|
||||
public int size { get { return queue.size; } }
|
||||
|
||||
/**
|
||||
* Determines if duplicate items can be added to the queue.
|
||||
*
|
||||
* If a priory queue, this applies to items of the same priority,
|
||||
* otherwise uses the item's natural identity.
|
||||
*/
|
||||
public bool allow_duplicates { get; set; default = true; }
|
||||
|
||||
/**
|
||||
* Determines if duplicate items will be added to the queue.
|
||||
*
|
||||
* If {@link allow_duplicates} is `true` and an item is already in
|
||||
* the queue, this determines if it will be added again.
|
||||
*/
|
||||
public bool requeue_duplicate { get; set; default = false; }
|
||||
|
||||
/**
|
||||
* Determines if the queue is currently running.
|
||||
*/
|
||||
public bool is_paused {
|
||||
get { return _is_paused; }
|
||||
|
||||
set {
|
||||
// if no longer paused, wake up any waiting recipients
|
||||
if (_is_paused && !value)
|
||||
spinlock.blind_notify();
|
||||
|
||||
_is_paused = value;
|
||||
}
|
||||
}
|
||||
private bool _is_paused = false;
|
||||
|
||||
private Gee.Queue<G> queue;
|
||||
private Nonblocking.Spinlock spinlock = new Nonblocking.Spinlock();
|
||||
|
||||
|
||||
/**
|
||||
* Constructs a new first-in first-out (FIFO) queue.
|
||||
*
|
||||
* If `equalator` is not null it will be used to determine the
|
||||
* identity of objects in the queue, else the items' natural
|
||||
* identity will be used.
|
||||
*/
|
||||
public Queue.fifo(owned Gee.EqualDataFunc<G>? equalator = null) {
|
||||
this(new Gee.LinkedList<G>(equalator));
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new priority queue.
|
||||
*
|
||||
* If `comparator` is not null it will be used to determine the
|
||||
* ordering of objects in the queue, else the items' natural
|
||||
* ordering will be used.
|
||||
*/
|
||||
public Queue.priority(owned CompareDataFunc<G>? comparator = null) {
|
||||
this(new Gee.PriorityQueue<G>(comparator));
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new queue.
|
||||
*/
|
||||
protected Queue(Gee.Queue<G> queue) {
|
||||
this.queue = queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an item to the queue.
|
||||
*
|
||||
* If the queue is a priority queue, it is added according to its
|
||||
* relative priority, else it is added to the end.
|
||||
*
|
||||
* Returns `true` if the item was added to the queue.
|
||||
*/
|
||||
public bool send(G msg) {
|
||||
if (!allow_duplicates && queue.contains(msg)) {
|
||||
if (requeue_duplicate)
|
||||
queue.remove(msg);
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!queue.offer(msg))
|
||||
return false;
|
||||
|
||||
if (!is_paused)
|
||||
spinlock.blind_notify();
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Retrieves the next item from the queue, blocking until available.
|
||||
*
|
||||
* If the queue is paused, this will continue to wait until
|
||||
* unpaused and an item is ready. If `cancellable` is non-null,
|
||||
* when used will cancel this call.
|
||||
*/
|
||||
public async G receive(Cancellable? cancellable = null) throws Error {
|
||||
for (;;) {
|
||||
if (queue.size > 0 && !is_paused)
|
||||
return queue.poll();
|
||||
|
||||
yield spinlock.wait_async(cancellable);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes all items in queue, returning the number of removed items.
|
||||
*/
|
||||
public int clear() {
|
||||
int count = queue.size;
|
||||
if (count != 0)
|
||||
queue.clear();
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes an item from the queue, returning `true` if removed.
|
||||
*/
|
||||
public bool revoke(G msg) {
|
||||
return queue.remove(msg);
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove items matching the given predicate, returning those removed.
|
||||
*/
|
||||
public Gee.Collection<G> revoke_matching(owned Gee.Predicate<G> predicate) {
|
||||
Gee.ArrayList<G> removed = new Gee.ArrayList<G>();
|
||||
// Iterate over a copy so we can modify the original.
|
||||
foreach (G msg in queue.to_array()) {
|
||||
if (predicate(msg)) {
|
||||
queue.remove(msg);
|
||||
removed.add(msg);
|
||||
}
|
||||
}
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a read-only version of the queue queue.
|
||||
*
|
||||
* Since the queue could potentially alter when the main loop
|
||||
* runs, it's important to only examine the queue when not
|
||||
* allowing other operations to process.
|
||||
*/
|
||||
public Gee.Collection<G> get_all() {
|
||||
return queue.read_only_view;
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue