Message marked read while connecting go unread later: Closes #5571

Bug was due to current flag on server being updated in local database
after the local operation had made a different change and was waiting
for normalization to complete to make the change on the server.
ReplayQueue now has a notion of a writebehind operation, which means
an immediate local change that a ReplayOperation waiting to make on
the remote must now account for.
This commit is contained in:
Jim Nelson 2012-09-13 16:17:58 -07:00
parent dceefa3003
commit 327d183b8f
17 changed files with 514 additions and 114 deletions

View file

@ -148,6 +148,7 @@ engine/nonblocking/nonblocking-abstract-semaphore.vala
engine/nonblocking/nonblocking-batch.vala
engine/nonblocking/nonblocking-mailbox.vala
engine/nonblocking/nonblocking-mutex.vala
engine/nonblocking/nonblocking-reporting-semaphore.vala
engine/nonblocking/nonblocking-variants.vala
engine/rfc822/rfc822-error.vala

View file

@ -23,9 +23,23 @@ public class Geary.EmailFlags : Geary.Equalable {
private Gee.Set<EmailFlag> list = new Gee.HashSet<EmailFlag>(Hashable.hash_func, Equalable.equal_func);
public virtual signal void added(Gee.Collection<EmailFlag> flags) {
}
public virtual signal void removed(Gee.Collection<EmailFlag> flags) {
}
public EmailFlags() {
}
protected virtual void notify_added(Gee.Collection<EmailFlag> flags) {
added(flags);
}
protected virtual void notify_removed(Gee.Collection<EmailFlag> flags) {
removed(flags);
}
public bool contains(EmailFlag flag) {
return list.contains(flag);
}
@ -35,11 +49,42 @@ public class Geary.EmailFlags : Geary.Equalable {
}
public virtual void add(EmailFlag flag) {
list.add(flag);
if (!list.contains(flag)) {
list.add(flag);
notify_added(new Singleton<EmailFlag>(flag));
}
}
public virtual void add_all(EmailFlags flags) {
Gee.ArrayList<EmailFlag> added = new Gee.ArrayList<EmailFlag>();
foreach (EmailFlag flag in flags.get_all()) {
if (!list.contains(flag))
added.add(flag);
}
list.add_all(added);
notify_added(added);
}
public virtual bool remove(EmailFlag flag) {
return list.remove(flag);
bool removed = list.remove(flag);
if (removed)
notify_removed(new Singleton<EmailFlag>(flag));
return removed;
}
public virtual bool remove_all(EmailFlags flags) {
Gee.ArrayList<EmailFlag> removed = new Gee.ArrayList<EmailFlag>();
foreach (EmailFlag flag in flags.get_all()) {
if (list.contains(flag))
removed.add(flag);
}
list.remove_all(removed);
notify_removed(removed);
return removed.size > 0;
}
// Convenience method to check if the unread flag is set.

View file

@ -21,7 +21,7 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde
private EmailPrefetcher email_prefetcher;
private SpecialFolderType special_folder_type;
private bool opened = false;
private NonblockingSemaphore remote_semaphore;
private NonblockingReportingSemaphore<bool> remote_semaphore;
private ReplayQueue? replay_queue = null;
private NonblockingMutex normalize_email_positions_mutex = new NonblockingMutex();
private int remote_count = -1;
@ -230,33 +230,54 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde
Geary.Imap.EmailFlags remote_email_flags = (Geary.Imap.EmailFlags) remote_email.email_flags;
if ((local_email_flags == null) || !local_email_flags.equals(remote_email_flags)) {
batch.add(new CreateLocalEmailOperation(local_folder, remote_email, NORMALIZATION_FIELDS));
flags_changed.set(remote_email.id, remote_email.email_flags);
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: merging remote ID %s",
to_string(), remote_email.id.to_string());
// check before writebehind
if (replay_queue.query_local_writebehind_operation(ReplayOperation.WritebehindOperation.UPDATE_FLAGS,
remote_email.id, (Imap.EmailFlags) remote_email.email_flags)) {
batch.add(new CreateLocalEmailOperation(local_folder, remote_email, NORMALIZATION_FIELDS));
flags_changed.set(remote_email.id, remote_email.email_flags);
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: merging remote ID %s",
to_string(), remote_email.id.to_string());
} else {
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: writebehind cancelled for merge of %s",
to_string(), remote_email.id.to_string());
}
}
remote_ctr++;
local_ctr++;
} else if (remote_uid.value < local_uid.value) {
// one we'd not seen before is present, add and move to next remote
batch.add(new CreateLocalEmailOperation(local_folder, remote_email, NORMALIZATION_FIELDS));
appended_ids.add(remote_email.id);
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: appending inside remote ID %s",
to_string(), remote_email.id.to_string());
// check for writebehind before doing
if (replay_queue.query_local_writebehind_operation(ReplayOperation.WritebehindOperation.CREATE,
remote_email.id, null)) {
batch.add(new CreateLocalEmailOperation(local_folder, remote_email, NORMALIZATION_FIELDS));
appended_ids.add(remote_email.id);
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: appending inside remote ID %s",
to_string(), remote_email.id.to_string());
} else {
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: writebehind cancelled for inside append of %s",
to_string(), remote_email.id.to_string());
}
remote_ctr++;
} else {
assert(remote_uid.value > local_uid.value);
// local's email on the server has been removed, remove locally
batch.add(new RemoveLocalEmailOperation(local_folder, local_email.id));
removed_ids.add(local_email.id);
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: removing inside local ID %s",
to_string(), local_email.id.to_string());
// check writebehind first
if (replay_queue.query_local_writebehind_operation(ReplayOperation.WritebehindOperation.REMOVE,
local_email.id, null)) {
batch.add(new RemoveLocalEmailOperation(local_folder, local_email.id));
removed_ids.add(local_email.id);
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: removing inside local ID %s",
to_string(), local_email.id.to_string());
} else {
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: writebehind cancelled for remove of %s",
to_string(), local_email.id.to_string());
}
local_ctr++;
}
@ -267,22 +288,39 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde
// that were on the server earlier but not stored locally (i.e. this value represents emails
// added to the top of the stack)
for (; remote_ctr < remote_length; remote_ctr++) {
batch.add(new CreateLocalEmailOperation(local_folder, old_remote[remote_ctr],
NORMALIZATION_FIELDS));
appended_ids.add(old_remote[remote_ctr].id);
Geary.Email remote_email = old_remote[remote_ctr];
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: appending outside remote %s",
to_string(), old_remote[remote_ctr].id.to_string());
// again, have to check for writebehind
if (replay_queue.query_local_writebehind_operation(ReplayOperation.WritebehindOperation.CREATE,
remote_email.id, null)) {
batch.add(new CreateLocalEmailOperation(local_folder, remote_email, NORMALIZATION_FIELDS));
appended_ids.add(remote_email.id);
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: appending outside remote %s",
to_string(), remote_email.id.to_string());
} else {
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: writebehind cancelled for outside append of %s",
to_string(), remote_email.id.to_string());
}
}
// remove anything left over ... use local count rather than remote as we're still in a stage
// where only the local messages are available
for (; local_ctr < local_length; local_ctr++) {
batch.add(new RemoveLocalEmailOperation(local_folder, old_local[local_ctr].id));
removed_ids.add(old_local[local_ctr].id);
Geary.Email local_email = old_local[local_ctr];
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: removing outside remote %s",
to_string(), old_local[local_ctr].id.to_string());
// again, check for writebehind
if (replay_queue.query_local_writebehind_operation(ReplayOperation.WritebehindOperation.REMOVE,
local_email.id, null)) {
batch.add(new RemoveLocalEmailOperation(local_folder, local_email.id));
removed_ids.add(local_email.id);
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: removing outside remote %s",
to_string(), local_email.id.to_string());
} else {
Logging.debug(Logging.Flag.FOLDER_NORMALIZATION, "%s: writebehind cancelled for outside remove %s",
to_string(), local_email.id.to_string());
}
}
// execute them all at once
@ -302,7 +340,7 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde
// throw the first exception, if one occurred
batch.throw_first_exception();
// look for local additions (email not known to the local store)
// look for local additions (email not known to the local store) to signal
Gee.ArrayList<Geary.EmailIdentifier> locally_appended = new Gee.ArrayList<Geary.EmailIdentifier>();
foreach (int id in batch.get_ids()) {
CreateLocalEmailOperation? create_op = batch.get_operation(id) as CreateLocalEmailOperation;
@ -347,10 +385,10 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde
opened = true;
remote_semaphore = new Geary.NonblockingSemaphore();
remote_semaphore = new Geary.NonblockingReportingSemaphore<bool>(false);
// start the replay queue
replay_queue = new ReplayQueue(get_path().to_string());
replay_queue = new ReplayQueue(get_path().to_string(), remote_semaphore);
try {
yield local_folder.open_async(readonly, cancellable);
@ -431,7 +469,7 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde
// notify any threads of execution waiting for the remote folder to open that the result
// of that operation is ready
try {
remote_semaphore.notify();
remote_semaphore.notify_result(remote != null, null);
} catch (Error notify_err) {
debug("Unable to fire semaphore notifying remote folder ready/not ready: %s",
notify_err.message);
@ -453,21 +491,6 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde
count);
}
// Returns true if the remote folder is ready, false otherwise
internal async bool wait_for_remote_ready_async(Cancellable? cancellable) throws Error {
if (remote_folder != null)
return true;
yield remote_semaphore.wait_async(cancellable);
return (remote_folder != null);
}
internal async void throw_if_remote_not_ready_async(Cancellable? cancellable) throws Error {
if (!yield wait_for_remote_ready_async(cancellable))
throw new EngineError.SERVER_UNAVAILABLE("No connection to %s", remote.to_string());
}
public override async void close_async(Cancellable? cancellable = null) throws Error {
yield close_internal_async(CloseReason.LOCAL_CLOSE, CloseReason.REMOTE_CLOSE, cancellable);
}
@ -564,7 +587,7 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde
try {
// If remote doesn't fully open, then don't fire signal, as we'll be unable to
// normalize the folder
if (!yield wait_for_remote_ready_async(null)) {
if (!yield remote_semaphore.wait_for_result_async(null)) {
debug("do_replay_appended_messages: remote never opened for %s", to_string());
return;
@ -734,7 +757,7 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde
// if connected or connecting, use stashed remote count (which is always kept current once
// remote folder is opened)
if (opened) {
if (yield wait_for_remote_ready_async(cancellable))
if (yield remote_semaphore.wait_for_result_async(cancellable))
return remote_count;
}
@ -1011,7 +1034,8 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde
// EngineFolder as a member variable.
internal async void normalize_email_positions_async(int low, int count, out int local_count,
Cancellable? cancellable) throws Error {
yield throw_if_remote_not_ready_async(cancellable);
if (!yield remote_semaphore.wait_for_result_async(cancellable))
throw new EngineError.SERVER_UNAVAILABLE("no connection to %s", remote.to_string());
int mutex_token = yield normalize_email_positions_mutex.claim_async(cancellable);

View file

@ -9,6 +9,13 @@ private abstract class Geary.ImapEngine.ReceiveReplayOperation : Geary.ImapEngin
base (name, ReplayOperation.Scope.LOCAL_ONLY);
}
public override bool query_local_writebehind_operation(ReplayOperation.WritebehindOperation op,
EmailIdentifier id, Imap.EmailFlags? flags) {
debug("Warning: ReceiveReplayOperation.query_local_writebehind_operation() called");
return true;
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
debug("Warning: ReceiveReplayOperation.replay_remote_async() called");

View file

@ -11,8 +11,11 @@ private abstract class Geary.ImapEngine.ReplayOperation {
*
* LOCAL_AND_REMOTE: replay_local_async() is called. If that method returns COMPLETED,
* no further calls are made. If it returns CONTINUE, replay_remote_async() is called.
* query_local_writebehind_operation() may be called before replay_remote_async().
* LOCAL_ONLY: replay_local_async() only. replay_remote_async() will never be called.
* query_local_writebehind_operation() will never be called.
* REMOTE_ONLY: replay_remote_async() only. replay_local_async() will never be called.
* query_local_writebehind_operation() may be called before replay_remote_async().
*
* See the various replay methods for how backout_local_async() may be called depending on
* this field and those methods' return values.
@ -29,6 +32,12 @@ private abstract class Geary.ImapEngine.ReplayOperation {
CONTINUE
}
public enum WritebehindOperation {
CREATE,
REMOVE,
UPDATE_FLAGS
}
private static int next_opnum = 0;
public string name { get; set; }
@ -47,7 +56,7 @@ private abstract class Geary.ImapEngine.ReplayOperation {
}
/**
* See Scope for conditions for this method to be called.
* See Scope for conditions where this method will be called.
*
* Returns:
* COMPLETED: the operation has completed and no further calls should be made.
@ -59,7 +68,30 @@ private abstract class Geary.ImapEngine.ReplayOperation {
public abstract async Status replay_local_async() throws Error;
/**
* See Scope for conditions for this method to be called.
* See Scope for conditions where this method will be called.
*
* This method is called only when the ReplayOperation is blocked waiting to execute a remote
* command and an exterior operation is going to occur that may alter the state on the local
* database (i.e. altering state behind the execution of this operation's replay_local_async()).
* This primarily happens during folder normalization (initial synchronization with the server
* when a folder is opened) where ReplayOperations are allowed to execute locally and enqueue
* for remote operation in preparation for the folder to open. (There may be other
* circumstances in the future where this method may be called.)
*
* The method should examine the supplied operation and return true if it's okay to proceed
* (and modifying its own operation to reflect the change that will occur before it's allowed to
* proceed, or merely not performing any operation in replay_remote_async()) or false if the
* supplied operation should *not* execute so that this ReplayOperation's command may execute
* shortly.
*
* flags will only be non-null when op is UPDATE_FLAGS. In that case, if this method returns
* true, it may also modify the EmailFlags. Those flags will be written to the local store.
*/
public abstract bool query_local_writebehind_operation(WritebehindOperation op, EmailIdentifier id,
Imap.EmailFlags? flags);
/**
* See Scope for conditions where this method will be called.
*
* Returns:
* COMPLETED: the operation has completed and no further calls should be made.
@ -70,8 +102,8 @@ private abstract class Geary.ImapEngine.ReplayOperation {
public abstract async Status replay_remote_async() throws Error;
/**
* See Scope, replay_local_async(), and replay_remote_async() for conditions for this to
* be called.
* See Scope, replay_local_async(), and replay_remote_async() for conditions for this where this
* will be called.
*/
public abstract async void backout_local_async() throws Error;

View file

@ -15,6 +15,12 @@ private class Geary.ImapEngine.ReplayQueue {
return Status.CONTINUE;
}
public override bool query_local_writebehind_operation(ReplayOperation.WritebehindOperation op,
EmailIdentifier id, Imap.EmailFlags? flags) {
// whatever, no problem, do what you will
return true;
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
return Status.COMPLETED;
}
@ -38,6 +44,7 @@ private class Geary.ImapEngine.ReplayQueue {
return remote_queue.size;
} }
private NonblockingReportingSemaphore<bool> remote_reporting_semaphore;
private NonblockingMailbox<ReplayOperation> local_queue = new NonblockingMailbox<ReplayOperation>();
private NonblockingMailbox<ReplayOperation> remote_queue = new NonblockingMailbox<ReplayOperation>();
@ -101,8 +108,9 @@ private class Geary.ImapEngine.ReplayQueue {
Logging.debug(Logging.Flag.REPLAY, "[%s] ReplayQueue::closed", to_string());
}
public ReplayQueue(string name) {
public ReplayQueue(string name, NonblockingReportingSemaphore<bool> remote_reporting_semaphore) {
this.name = name;
this.remote_reporting_semaphore = remote_reporting_semaphore;
// fire off background queue processors
do_replay_local_async.begin();
@ -138,6 +146,30 @@ private class Geary.ImapEngine.ReplayQueue {
return true;
}
/**
* This is used by the folder normalization routine to handle a situation where replay
* operations have performed local work (and notified the client of changes) and are enqueued
* waiting to perform the same operation on the server. In normalization, the server reports
* changes that need to be synchronized on the client. If this change is written before the
* enqueued replay operations execute, the potential exists to be unsynchronized.
*
* This call gives all enqueued remote replay operations a chance to cancel or update their
* own state due to a writebehind operation. See
* ReplayOperation.query_local_writebehind_operation() for more information.
*/
public bool query_local_writebehind_operation(ReplayOperation.WritebehindOperation op,
Geary.EmailIdentifier id, Imap.EmailFlags? flags) {
// Although any replay operation can cancel the writebehind operation, give all a chance to
// see it as it may affect their internal state
bool proceed = true;
foreach (ReplayOperation replay_op in remote_queue.get_all()) {
if (!replay_op.query_local_writebehind_operation(op, id, flags))
proceed = false;
}
return proceed;
}
public async void close_async(Cancellable? cancellable = null) throws Error {
if (is_closed)
return;
@ -168,7 +200,7 @@ private class Geary.ImapEngine.ReplayQueue {
debug("Unable to receive next replay operation on local queue %s: %s", to_string(),
recv_err.message);
continue;
break;
}
// If this is a Close operation, shut down the queue after processing it
@ -261,6 +293,19 @@ private class Geary.ImapEngine.ReplayQueue {
}
private async void do_replay_remote_async() {
try {
if (!yield remote_reporting_semaphore.wait_for_result_async()) {
debug("Folder %s failed to open, remote replay queue closing", to_string());
return;
}
} catch (Error remote_err) {
debug("Error for remote queue waiting for remote %s to open, remote queue closing: %s", to_string(),
remote_err.message);
return;
}
bool queue_running = true;
while (queue_running) {
ReplayOperation op;
@ -270,7 +315,7 @@ private class Geary.ImapEngine.ReplayQueue {
debug("Unable to receive next replay operation on remote queue %s: %s", to_string(),
recv_err.message);
continue;
break;
}
if (op is ReplayClose)

View file

@ -6,17 +6,18 @@
private class Geary.ImapEngine.CopyEmail : Geary.ImapEngine.SendReplayOperation {
private GenericFolder engine;
private Gee.List<Geary.EmailIdentifier> to_copy;
private Gee.List<Geary.EmailIdentifier> to_copy = new Gee.ArrayList<Geary.EmailIdentifier>(
Equalable.equal_func);
private Geary.FolderPath destination;
private Cancellable? cancellable;
public CopyEmail(GenericFolder engine, Gee.List<Geary.EmailIdentifier> to_copy,
Geary.FolderPath destination, Cancellable? cancellable = null) {
base("CopyEmail");
this.engine = engine;
this.to_copy = to_copy;
this.to_copy.add_all(to_copy);
this.destination = destination;
this.cancellable = cancellable;
}
@ -29,13 +30,25 @@ private class Geary.ImapEngine.CopyEmail : Geary.ImapEngine.SendReplayOperation
// existing there.
return ReplayOperation.Status.CONTINUE;
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
yield engine.throw_if_remote_not_ready_async(cancellable);
public override bool query_local_writebehind_operation(ReplayOperation.WritebehindOperation op,
EmailIdentifier id, Imap.EmailFlags? flags) {
// only interested in messages going away (i.e. can't be copied) ...
// note that this method operates exactly the same way whether the EmailIdentifer is in
// the to_copy list or not.
if (op == ReplayOperation.WritebehindOperation.REMOVE)
to_copy.remove(id);
return true;
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
// perform_immediate_local_operation() may have removed all messages to copy
if (to_copy.size > 0) {
yield engine.remote_folder.copy_email_async(new Imap.MessageSet.email_id_collection(to_copy),
destination, cancellable);
}
yield engine.remote_folder.copy_email_async(new Imap.MessageSet.email_id_collection(to_copy),
destination, cancellable);
return ReplayOperation.Status.COMPLETED;
}

View file

@ -6,7 +6,8 @@
private class Geary.ImapEngine.ExpungeEmail : Geary.ImapEngine.SendReplayOperation {
private GenericFolder engine;
private Gee.List<Geary.EmailIdentifier> to_remove;
private Gee.List<Geary.EmailIdentifier> to_remove = new Gee.ArrayList<EmailIdentifier>(
Equalable.equal_func);
private Cancellable? cancellable;
private int original_count = 0;
@ -16,7 +17,7 @@ private class Geary.ImapEngine.ExpungeEmail : Geary.ImapEngine.SendReplayOperati
this.engine = engine;
this.to_remove = to_remove;
this.to_remove.add_all(to_remove);
this.cancellable = cancellable;
}
@ -42,9 +43,28 @@ private class Geary.ImapEngine.ExpungeEmail : Geary.ImapEngine.SendReplayOperati
return ReplayOperation.Status.CONTINUE;
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
yield engine.throw_if_remote_not_ready_async(cancellable);
public override bool query_local_writebehind_operation(ReplayOperation.WritebehindOperation op,
EmailIdentifier id, Imap.EmailFlags? flags) {
if (!to_remove.contains(id))
return true;
switch (op) {
case ReplayOperation.WritebehindOperation.CREATE:
// don't allow for the message to be created, it will be removed on the server by
// this operation
return false;
case ReplayOperation.WritebehindOperation.REMOVE:
// removed locally, to be removed remotely, don't bother writing locally
return false;
default:
// ignored
return true;
}
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
// Remove from server. Note that this causes the receive replay queue to kick into
// action, removing the e-mail but *NOT* firing a signal; the "remove marker" indicates
// that the signal has already been fired.

View file

@ -13,6 +13,7 @@ private class Geary.ImapEngine.FetchEmail : Geary.ImapEngine.SendReplayOperation
private Email.Field remaining_fields;
private Folder.ListFlags flags;
private Cancellable? cancellable;
private bool writebehind_removed = false;
public FetchEmail(GenericFolder engine, EmailIdentifier id, Email.Field required_fields,
Folder.ListFlags flags, Cancellable? cancellable) {
@ -66,8 +67,30 @@ private class Geary.ImapEngine.FetchEmail : Geary.ImapEngine.SendReplayOperation
return ReplayOperation.Status.CONTINUE;
}
public override bool query_local_writebehind_operation(ReplayOperation.WritebehindOperation op,
EmailIdentifier id, Imap.EmailFlags? flags) {
if (!this.id.equals(id))
return true;
switch (op) {
case ReplayOperation.WritebehindOperation.REMOVE:
writebehind_removed = true;
return true;
case ReplayOperation.WritebehindOperation.CREATE:
default:
// still need to do the full fetch for CREATE, since it's unknown (currently) what
// fields are available locally; otherwise, ignored
return true;
}
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
yield engine.throw_if_remote_not_ready_async(cancellable);
if (writebehind_removed) {
throw new EngineError.NOT_FOUND("Unable to fetch %s in %s (removed with writebehind)",
id.to_string(), engine.to_string());
}
// fetch only the remaining fields from the remote folder (if only pulling partial information,
// will merge at end of this method)

View file

@ -47,8 +47,6 @@ private class Geary.ImapEngine.ListEmailBySparseID : Geary.ImapEngine.SendReplay
}
public override async Object? execute_async(Cancellable? cancellable) throws Error {
yield owner.throw_if_remote_not_ready_async(cancellable);
// fetch from remote folder
Gee.List<Geary.Email>? list = yield owner.remote_folder.list_email_async(msg_set,
unfulfilled_fields, cancellable);
@ -157,9 +155,37 @@ private class Geary.ImapEngine.ListEmailBySparseID : Geary.ImapEngine.SendReplay
return ReplayOperation.Status.CONTINUE;
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
yield owner.throw_if_remote_not_ready_async(cancellable);
public override bool query_local_writebehind_operation(ReplayOperation.WritebehindOperation op,
EmailIdentifier id, Imap.EmailFlags? flags) {
// don't need to check if id is present here, all paths deal with this correctly
switch (op) {
case ReplayOperation.WritebehindOperation.REMOVE:
// remove email already picked up from local store ... for email reported via the
// callback, too late
if (accumulator != null) {
Gee.HashSet<Geary.Email> wb_removed = new Gee.HashSet<Geary.Email>();
foreach (Geary.Email email in accumulator) {
if (email.id.equals(id))
wb_removed.add(email);
}
accumulator.remove_all(wb_removed);
}
// remove from unfulfilled list, as there's nothing to fetch from the server
foreach (Geary.Email.Field field in unfulfilled.get_keys())
unfulfilled.remove(field, id);
return true;
default:
// ignored
return true;
}
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
NonblockingBatch batch = new NonblockingBatch();
// schedule operations to remote for each set of email with unfulfilled fields and merge

View file

@ -55,7 +55,7 @@ private class Geary.ImapEngine.ListEmail : Geary.ImapEngine.SendReplayOperation
private Gee.List<Geary.Email>? local_list = null;
private int local_list_size = 0;
private Gee.HashMultiMap<Geary.Email.Field, Geary.EmailIdentifier> unfulfilled = new Gee.HashMultiMap<
Geary.Email.Field, Geary.EmailIdentifier>();
Geary.Email.Field, Geary.EmailIdentifier>(null, null, Hashable.hash_func, Equalable.equal_func);
public ListEmail(GenericFolder engine, int low, int count, Geary.Email.Field required_fields,
Folder.ListFlags flags, Gee.List<Geary.Email>? accumulator, EmailCallback? cb, Cancellable? cancellable) {
@ -181,9 +181,38 @@ private class Geary.ImapEngine.ListEmail : Geary.ImapEngine.SendReplayOperation
return ReplayOperation.Status.CONTINUE;
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
yield engine.throw_if_remote_not_ready_async(cancellable);
public override bool query_local_writebehind_operation(ReplayOperation.WritebehindOperation op,
EmailIdentifier id, Imap.EmailFlags? flags) {
// don't need to check if id is present here, all paths deal with this possibility
// correctly
switch (op) {
case ReplayOperation.WritebehindOperation.REMOVE:
// remove email already picked up from local store ... for email reported via the
// callback, too late
if (accumulator != null) {
Gee.HashSet<Geary.Email> wb_removed = new Gee.HashSet<Geary.Email>();
foreach (Geary.Email email in accumulator) {
if (email.id.equals(id))
wb_removed.add(email);
}
accumulator.remove_all(wb_removed);
}
// remove from unfulfilled list, as there's nothing to fetch from the server
foreach (Geary.Email.Field field in unfulfilled.get_keys())
unfulfilled.remove(field, id);
return true;
default:
// ignored
return true;
}
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
// normalize the email positions in the local store, so the positions being requested
// from the server are available in the database
int local_count;
@ -252,9 +281,6 @@ private class Geary.ImapEngine.ListEmail : Geary.ImapEngine.SendReplayOperation
}
private async void remote_list_positional(int[] needed_by_position) throws Error {
// possible to call remote multiple times, wait for it to open once and go
yield engine.throw_if_remote_not_ready_async(cancellable);
// pull in reverse order because callers to this method tend to order messages from oldest
// to newest, but for user satisfaction, should be fetched from newest to oldest
int remaining = needed_by_position.length;
@ -294,9 +320,6 @@ private class Geary.ImapEngine.ListEmail : Geary.ImapEngine.SendReplayOperation
private async void remote_list_partials(Gee.Collection<Geary.EmailIdentifier> ids,
Geary.Email.Field remaining_fields) throws Error {
// possible to call remote multiple times, wait for it to open once and go
yield engine.throw_if_remote_not_ready_async(cancellable);
Gee.List<Geary.Email>? remote_list = yield engine.remote_folder.list_email_async(
new Imap.MessageSet.email_id_collection(ids), remaining_fields, cancellable);
if (remote_list == null || remote_list.size == 0)

View file

@ -6,7 +6,8 @@
private class Geary.ImapEngine.MarkEmail : Geary.ImapEngine.SendReplayOperation {
private GenericFolder engine;
private Gee.List<Geary.EmailIdentifier> to_mark;
private Gee.List<Geary.EmailIdentifier> to_mark = new Gee.ArrayList<Geary.EmailIdentifier>(
Equalable.equal_func);
private Geary.EmailFlags? flags_to_add;
private Geary.EmailFlags? flags_to_remove;
private Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags>? original_flags = null;
@ -19,7 +20,7 @@ private class Geary.ImapEngine.MarkEmail : Geary.ImapEngine.SendReplayOperation
this.engine = engine;
this.to_mark = to_mark;
this.to_mark.add_all(to_mark);
this.flags_to_add = flags_to_add;
this.flags_to_remove = flags_to_remove;
this.cancellable = cancellable;
@ -44,8 +45,40 @@ private class Geary.ImapEngine.MarkEmail : Geary.ImapEngine.SendReplayOperation
return ReplayOperation.Status.CONTINUE;
}
public override bool query_local_writebehind_operation(ReplayOperation.WritebehindOperation op,
EmailIdentifier id, Imap.EmailFlags? flags) {
if (!to_mark.contains(id))
return true;
switch (op) {
case ReplayOperation.WritebehindOperation.REMOVE:
// don't bother updating on server
to_mark.remove(id);
return true;
case ReplayOperation.WritebehindOperation.UPDATE_FLAGS:
// user's mark operation takes precedence over server's, update supplied flags
// and continue
if (flags_to_add != null && flags != null)
flags.add_all(flags_to_add);
if (flags_to_remove != null && flags != null)
flags.remove_all(flags_to_remove);
return true;
case ReplayOperation.WritebehindOperation.CREATE:
default:
// not interested in other operations
return true;
}
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
yield engine.throw_if_remote_not_ready_async(cancellable);
// potentially empty due to writebehind operation
if (to_mark.size == 0)
return ReplayOperation.Status.COMPLETED;
yield engine.remote_folder.mark_email_async(new Imap.MessageSet.email_id_collection(to_mark),
flags_to_add, flags_to_remove, cancellable);

View file

@ -6,7 +6,8 @@
private class Geary.ImapEngine.MoveEmail : Geary.ImapEngine.SendReplayOperation {
private GenericFolder engine;
private Gee.List<Geary.EmailIdentifier> to_move;
private Gee.List<Geary.EmailIdentifier> to_move = new Gee.ArrayList<Geary.EmailIdentifier>(
Equalable.equal_func);
private Geary.FolderPath destination;
private Cancellable? cancellable;
private int original_count = 0;
@ -17,7 +18,7 @@ private class Geary.ImapEngine.MoveEmail : Geary.ImapEngine.SendReplayOperation
this.engine = engine;
this.to_move = to_move;
this.to_move.add_all(to_move);
this.destination = destination;
this.cancellable = cancellable;
}
@ -43,12 +44,33 @@ private class Geary.ImapEngine.MoveEmail : Geary.ImapEngine.SendReplayOperation
return ReplayOperation.Status.CONTINUE;
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
yield engine.throw_if_remote_not_ready_async(cancellable);
public override bool query_local_writebehind_operation(ReplayOperation.WritebehindOperation op,
EmailIdentifier id, Imap.EmailFlags? flags) {
if (!to_move.contains(id))
return true;
switch (op) {
case ReplayOperation.WritebehindOperation.CREATE:
// don't allow for it to be created, it's already been marked for removal
return false;
case ReplayOperation.WritebehindOperation.REMOVE:
case ReplayOperation.WritebehindOperation.UPDATE_FLAGS:
// don't bother, already removed
return false;
default:
// ignored
return true;
}
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
if (to_move.size > 0) {
yield engine.remote_folder.move_email_async(new Imap.MessageSet.email_id_collection(to_move),
destination, cancellable);
}
yield engine.remote_folder.move_email_async(new Imap.MessageSet.email_id_collection(to_move),
destination, cancellable);
return ReplayOperation.Status.COMPLETED;
}

View file

@ -9,29 +9,36 @@ public class Geary.Imap.EmailFlags : Geary.EmailFlags {
public EmailFlags(MessageFlags flags) {
message_flags = flags;
if (!flags.contains(MessageFlag.SEEN))
add(UNREAD);
if (flags.contains(MessageFlag.FLAGGED))
add(FLAGGED);
}
public override void add(EmailFlag flag) {
if (flag.equals(UNREAD))
message_flags.remove(MessageFlag.SEEN);
if (flag.equals(FLAGGED))
message_flags.add(MessageFlag.FLAGGED);
base.add(flag);
protected override void notify_added(Gee.Collection<EmailFlag> added) {
foreach (EmailFlag flag in added) {
if (flag.equals(UNREAD))
message_flags.remove(MessageFlag.SEEN);
if (flag.equals(FLAGGED))
message_flags.add(MessageFlag.FLAGGED);
}
base.notify_added(added);
}
public override bool remove(EmailFlag flag) {
if (flag.equals(UNREAD))
message_flags.add(MessageFlag.SEEN);
if (flag.equals(FLAGGED))
message_flags.remove(MessageFlag.FLAGGED);
return base.remove(flag);
protected override void notify_removed(Gee.Collection<EmailFlag> removed) {
foreach (EmailFlag flag in removed) {
if (flag.equals(UNREAD))
message_flags.add(MessageFlag.SEEN);
if (flag.equals(FLAGGED))
message_flags.remove(MessageFlag.FLAGGED);
}
base.notify_removed(removed);
}
}

View file

@ -46,6 +46,9 @@ public abstract class Geary.NonblockingAbstractSemaphore {
private bool passed = false;
private Gee.List<Pending> pending_queue = new Gee.LinkedList<Pending>();
public virtual signal void at_reset() {
}
protected NonblockingAbstractSemaphore(bool broadcast, bool autoreset, Cancellable? cancellable = null) {
this.broadcast = broadcast;
this.autoreset = autoreset;
@ -67,6 +70,10 @@ public abstract class Geary.NonblockingAbstractSemaphore {
cancellable.cancelled.disconnect(on_cancelled);
}
protected virtual void notify_at_reset() {
at_reset();
}
private void trigger(bool all) {
if (pending_queue.size == 0)
return;
@ -84,7 +91,7 @@ public abstract class Geary.NonblockingAbstractSemaphore {
}
}
public void notify() throws Error {
public virtual void notify() throws Error {
check_cancelled();
passed = true;
@ -106,7 +113,7 @@ public abstract class Geary.NonblockingAbstractSemaphore {
}
}
public async void wait_async(Cancellable? cancellable = null) throws Error {
public virtual async void wait_async(Cancellable? cancellable = null) throws Error {
for (;;) {
check_user_cancelled(cancellable);
check_cancelled();
@ -130,8 +137,10 @@ public abstract class Geary.NonblockingAbstractSemaphore {
}
}
public void reset() {
public virtual void reset() {
passed = false;
notify_at_reset();
}
public bool is_cancelled() {

View file

@ -19,6 +19,13 @@ public class Geary.NonblockingMailbox<G> : Object {
spinlock.notify();
}
/**
* Returns true if the message was revoked.
*/
public bool revoke(G msg) throws Error {
return queue.remove(msg);
}
public async G recv_async(Cancellable? cancellable = null) throws Error {
for (;;) {
if (queue.size > 0)
@ -27,5 +34,16 @@ public class Geary.NonblockingMailbox<G> : Object {
yield spinlock.wait_async(cancellable);
}
}
/**
* Since the queue could potentially alter when the main loop runs, it's important to only
* examine the queue when not allowing other operations to process.
*
* This returns a read-only list in queue-order. Altering will not affect the queue. Use
* revoke() to remove enqueued operations.
*/
public Gee.List<G> get_all() {
return queue.read_only_view;
}
}

View file

@ -0,0 +1,52 @@
/* Copyright 2012 Yorba Foundation
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
public class Geary.NonblockingReportingSemaphore<G> : Geary.NonblockingSemaphore {
public G result { get; private set; }
public Error? err { get; private set; default = null; }
private G default_result;
public NonblockingReportingSemaphore(G default_result, Cancellable? cancellable = null) {
base (cancellable);
this.default_result = default_result;
result = default_result;
}
protected override void notify_at_reset() {
result = default_result;
err = null;
base.notify_at_reset();
}
public void notify_result(G result, Error? err) throws Error {
this.result = result;
this.err = err;
notify();
}
public void throw_if_error() throws Error {
if (err != null)
throw err;
}
public async G wait_for_result_async(Cancellable? cancellable = null) throws Error {
// check before waiting
throw_if_error();
// wait
yield base.wait_async(cancellable);
// if notified of error while waiting, throw that
throw_if_error();
return result;
}
}