Merge branch 'wip/778276-better-flag-updates'. Fixes Bug 778276.

This commit is contained in:
Michael James Gratton 2018-01-12 11:56:57 +11:00
commit 6c5a7d5868
36 changed files with 3534 additions and 2373 deletions

View file

@ -230,6 +230,8 @@ src/engine/imap-engine/gmail/imap-engine-gmail-drafts-folder.vala
src/engine/imap-engine/gmail/imap-engine-gmail-folder.vala
src/engine/imap-engine/gmail/imap-engine-gmail-search-folder.vala
src/engine/imap-engine/gmail/imap-engine-gmail-spam-trash-folder.vala
src/engine/imap-engine/imap-engine-account-operation.vala
src/engine/imap-engine/imap-engine-account-processor.vala
src/engine/imap-engine/imap-engine-account-synchronizer.vala
src/engine/imap-engine/imap-engine-batch-operations.vala
src/engine/imap-engine/imap-engine-contact-store.vala
@ -339,8 +341,8 @@ src/engine/nonblocking/nonblocking-batch.vala
src/engine/nonblocking/nonblocking-concurrent.vala
src/engine/nonblocking/nonblocking-counting-semaphore.vala
src/engine/nonblocking/nonblocking-error.vala
src/engine/nonblocking/nonblocking-mailbox.vala
src/engine/nonblocking/nonblocking-mutex.vala
src/engine/nonblocking/nonblocking-queue.vala
src/engine/nonblocking/nonblocking-reporting-semaphore.vala
src/engine/nonblocking/nonblocking-variants.vala
src/engine/rfc822/rfc822-error.vala

1791
po/id.po

File diff suppressed because it is too large Load diff

View file

@ -190,10 +190,11 @@ engine/imap-db/outbox/smtp-outbox-folder-properties.vala
engine/imap-db/outbox/smtp-outbox-folder-root.vala
engine/imap-engine/imap-engine.vala
engine/imap-engine/imap-engine-account-operation.vala
engine/imap-engine/imap-engine-account-processor.vala
engine/imap-engine/imap-engine-account-synchronizer.vala
engine/imap-engine/imap-engine-batch-operations.vala
engine/imap-engine/imap-engine-contact-store.vala
engine/imap-engine/imap-engine-email-flag-watcher.vala
engine/imap-engine/imap-engine-email-prefetcher.vala
engine/imap-engine/imap-engine-generic-account.vala
engine/imap-engine/imap-engine-generic-folder.vala
@ -229,6 +230,7 @@ engine/imap-engine/replay-ops/imap-engine-remove-email.vala
engine/imap-engine/replay-ops/imap-engine-replay-append.vala
engine/imap-engine/replay-ops/imap-engine-replay-disconnect.vala
engine/imap-engine/replay-ops/imap-engine-replay-removal.vala
engine/imap-engine/replay-ops/imap-engine-replay-update.vala
engine/imap-engine/replay-ops/imap-engine-server-search-email.vala
engine/imap-engine/replay-ops/imap-engine-user-close.vala
engine/imap-engine/yahoo/imap-engine-yahoo-account.vala
@ -258,8 +260,8 @@ engine/nonblocking/nonblocking-batch.vala
engine/nonblocking/nonblocking-concurrent.vala
engine/nonblocking/nonblocking-counting-semaphore.vala
engine/nonblocking/nonblocking-error.vala
engine/nonblocking/nonblocking-mailbox.vala
engine/nonblocking/nonblocking-mutex.vala
engine/nonblocking/nonblocking-queue.vala
engine/nonblocking/nonblocking-reporting-semaphore.vala
engine/nonblocking/nonblocking-variants.vala

View file

@ -41,6 +41,14 @@ public abstract class Geary.Account : BaseObject {
public signal void email_sent(Geary.RFC822.Message rfc822);
/**
* Emitted to notify the client that some problem has occurred.
*
* The engine uses this signal to report internal errors and other
* issues that the client should notify the user about. The {@link
* ProblemReport} class provides context about the nature of the
* problem itself.
*/
public signal void report_problem(Geary.ProblemReport problem);
public signal void contacts_loaded();
@ -62,17 +70,35 @@ public abstract class Geary.Account : BaseObject {
Gee.List<Geary.Folder>? unavailable);
/**
* Fired when folders are created or deleted.
* Fired when new folders have been created.
*
* Folders are ordered for the convenience of the caller from the top of the hierarchy to
* lower in the hierarchy. In other words, parents are listed before children, assuming the
* lists are traversed in natural order.
* This is fired in response to new folders appearing, for example
* the user created a new folder. It will be fired after {@link
* folders_available_unavailable} has been fired to mark the
* folders as having been made available.
*
* @see sort_by_path
* Folders are ordered for the convenience of the caller from the
* top of the hierarchy to lower in the hierarchy. In other
* words, parents are listed before children, assuming the lists
* are traversed in natural order.
*/
public signal void folders_added_removed(Gee.List<Geary.Folder>? added,
Gee.List<Geary.Folder>? removed);
public signal void folders_created(Gee.List<Geary.Folder> created);
/**
* Fired when existing folders are deleted.
*
* This is fired in response to existing folders being removed,
* for example if the user deleted a folder. it will be fired
* after {@link folders_available_unavailable} has been fired to
* mark the folders as having been made unavailable.
*
* Folders are ordered for the convenience of the caller from the
* top of the hierarchy to lower in the hierarchy. In other
* words, parents are listed before children, assuming the lists
* are traversed in natural order.
*/
public signal void folders_deleted(Gee.List<Geary.Folder> deleted);
/**
* Fired when a Folder's contents is detected having changed.
*/
@ -369,10 +395,14 @@ public abstract class Geary.Account : BaseObject {
folders_available_unavailable(available, unavailable);
}
/** Fires a {@link folders_added_removed} signal. */
protected virtual void notify_folders_added_removed(Gee.List<Geary.Folder>? added,
Gee.List<Geary.Folder>? removed) {
folders_added_removed(added, removed);
/** Fires a {@link folders_created} signal. */
protected virtual void notify_folders_created(Gee.List<Geary.Folder> created) {
folders_created(created);
}
/** Fires a {@link folders_deleted} signal. */
protected virtual void notify_folders_deleted(Gee.List<Geary.Folder> deleted) {
folders_deleted(deleted);
}
/** Fires a {@link folders_contents_altered} signal. */

View file

@ -15,7 +15,7 @@ public abstract class Geary.ContactStore : BaseObject {
public signal void contacts_updated(Gee.Collection<Contact> contacts);
internal ContactStore() {
protected ContactStore() {
contact_map = new Gee.HashMap<string, Contact>();
}

View file

@ -125,7 +125,8 @@ public class Geary.App.DraftManager : BaseObject {
private Folder? drafts_folder = null;
private FolderSupport.Create? create_support = null;
private FolderSupport.Remove? remove_support = null;
private Nonblocking.Mailbox<Operation?> mailbox = new Nonblocking.Mailbox<Operation?>();
private Nonblocking.Queue<Operation?> mailbox =
new Nonblocking.Queue<Operation?>.fifo();
private bool was_opened = false;
private Error? fatal_err = null;
@ -380,16 +381,15 @@ public class Geary.App.DraftManager : BaseObject {
// reporting it again
if (fatal_err != null)
break;
Operation op;
try {
op = yield mailbox.recv_async(null);
op = yield mailbox.receive(null);
} catch (Error err) {
fatal(err);
break;
}
bool continue_loop = yield operation_loop_iteration_async(op);
// fire semaphore, if present

View file

@ -179,9 +179,6 @@ public class Geary.App.EmailStore : BaseObject {
bool open = false;
Gee.Collection<Geary.EmailIdentifier>? used_ids = null;
try {
debug("EmailStore opening %s for %s on %d emails", folder.to_string(),
operation.get_type().name(), ids.size);
yield folder.open_async(Geary.Folder.OpenFlags.FAST_OPEN, cancellable);
open = true;
used_ids = yield operation.execute_async(folder, ids, cancellable);
@ -193,12 +190,6 @@ public class Geary.App.EmailStore : BaseObject {
// Don't use the cancellable here, if it's been
// opened we need to try to close it.
yield folder.close_async(null);
debug(
"EmailStore closed %s after %s on %d emails",
folder.to_string(),
operation.get_type().name(),
ids.size
);
} catch (Error e) {
debug("Error closing folder %s: %s",
folder.to_string(), e.message);
@ -216,10 +207,7 @@ public class Geary.App.EmailStore : BaseObject {
// And we don't want to operate on the same folder twice.
folders_to_ids.remove_all(path);
}
debug("EmailStore %s done running %s on %d emails", account.to_string(),
operation.get_type().name(), emails.size);
if (folders_to_ids.size > 0) {
debug("Couldn't perform %s on some messages in %s", operation.get_type().name(),
account.to_string());

View file

@ -9,8 +9,8 @@ private class Geary.App.ConversationOperationQueue : BaseObject {
public Geary.SimpleProgressMonitor progress_monitor { get; private set; default =
new Geary.SimpleProgressMonitor(Geary.ProgressType.ACTIVITY); }
private Geary.Nonblocking.Mailbox<ConversationOperation> mailbox
= new Geary.Nonblocking.Mailbox<ConversationOperation>();
private Geary.Nonblocking.Queue<ConversationOperation> mailbox
= new Geary.Nonblocking.Queue<ConversationOperation>.fifo();
private Geary.Nonblocking.Spinlock processing_done_spinlock
= new Geary.Nonblocking.Spinlock();
@ -61,7 +61,7 @@ private class Geary.App.ConversationOperationQueue : BaseObject {
for (;;) {
ConversationOperation op;
try {
op = yield mailbox.recv_async();
op = yield mailbox.receive();
} catch (Error e) {
debug("Error processing in conversation operation mailbox: %s", e.message);
break;

View file

@ -371,10 +371,15 @@ private class Geary.ImapDB.Account : BaseObject {
public async void clone_folder_async(Geary.Imap.Folder imap_folder, Cancellable? cancellable = null)
throws Error {
check_open();
Geary.Imap.FolderProperties properties = imap_folder.properties;
Geary.FolderPath path = imap_folder.path;
// XXX this should really be a db table constraint
Geary.ImapDB.Folder? folder = get_local_folder(path);
if (folder != null)
throw new EngineError.ALREADY_EXISTS(path.to_string());
yield db.exec_transaction_async(Db.TransactionType.RW, (cx) => {
// get the parent of this folder, creating parents if necessary ... ok if this fails,
// that just means the folder has no parents

View file

@ -81,7 +81,7 @@ private class Geary.SmtpOutboxFolder :
private ImapDB.Database db;
private Cancellable? queue_cancellable = null;
private Nonblocking.Mailbox<OutboxRow> outbox_queue = new Nonblocking.Mailbox<OutboxRow>();
private Nonblocking.Queue<OutboxRow> outbox_queue = new Nonblocking.Queue<OutboxRow>.fifo();
private Geary.ProgressMonitor sending_monitor;
private SmtpOutboxFolderProperties _properties = new SmtpOutboxFolderProperties(0, 0);
private int64 next_ordering = 0;
@ -129,7 +129,7 @@ private class Geary.SmtpOutboxFolder :
OutboxRow? row = null;
bool row_handled = false;
try {
row = yield this.outbox_queue.recv_async(cancellable);
row = yield this.outbox_queue.receive(cancellable);
row_handled = yield postman_send(row, cancellable);
} catch (SmtpError err) {
ProblemType problem = ProblemType.GENERIC_ERROR;

View file

@ -39,30 +39,30 @@ private class Geary.ImapEngine.GmailAccount : Geary.ImapEngine.GenericAccount {
return SUPPORTED_SPECIAL_FOLDERS;
}
protected override MinimalFolder new_folder(Geary.FolderPath path, Imap.Account remote_account,
ImapDB.Account local_account, ImapDB.Folder local_folder) {
protected override MinimalFolder new_folder(ImapDB.Folder local_folder) {
Geary.FolderPath path = local_folder.get_path();
SpecialFolderType special_folder_type;
if (Imap.MailboxSpecifier.folder_path_is_inbox(path))
special_folder_type = SpecialFolderType.INBOX;
else
special_folder_type = local_folder.get_properties().attrs.get_special_folder_type();
switch (special_folder_type) {
case SpecialFolderType.ALL_MAIL:
return new GmailAllMailFolder(this, remote_account, local_account, local_folder,
return new GmailAllMailFolder(this, this.remote, this.local, local_folder,
special_folder_type);
case SpecialFolderType.DRAFTS:
return new GmailDraftsFolder(this, remote_account, local_account, local_folder,
return new GmailDraftsFolder(this, this.remote, this.local, local_folder,
special_folder_type);
case SpecialFolderType.SPAM:
case SpecialFolderType.TRASH:
return new GmailSpamTrashFolder(this, remote_account, local_account, local_folder,
return new GmailSpamTrashFolder(this, this.remote, this.local, local_folder,
special_folder_type);
default:
return new GmailFolder(this, remote_account, local_account, local_folder, special_folder_type);
return new GmailFolder(this, this.remote, this.local, local_folder, special_folder_type);
}
}

View file

@ -0,0 +1,169 @@
/*
* Copyright 2017 Michael Gratton <mike@vee.net>
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* A unit of work to be executed by {@link GenericAccount}.
*
* It is important that account operations are idempotent in that they
* can be safely re-executed multiple times, and perform the same task
* each time. This means that in practice instance properties should
* only be used to store state passed to the operation via its
* constructor (e.g. a target folder to be updated) and this state
* should not be modified when the operation is executed (e.g. the
* target folder should not be changed or set to `null` during or
* after execution), any state needed to be maintained when executing
* should be passed as arguments to internal methods (e.g. the list of
* messages to be checked in the target folder should be passed around
* as arguments), and the operation should perform any needed sanity
* checks before proceeding (e.g. check the target folder sill exists
* before updating it).
*
* To queue an operation for execution, pass an instance to {@link
* GenericAccount.queue_operation} after the account has been
* opened. It will added to the accounts queue and executed
* asynchronously when it reaches the front.
*
* Execution of the operation is managed by {@link
* AccountProcessor}. Since the processor will not en-queue duplicate
* operations, implementations of this class may override the {@link
* equal_to} method to specify which instances are considered to be
* duplicates.
*/
public abstract class Geary.ImapEngine.AccountOperation : Geary.BaseObject {
/** The account this operation applies to. */
protected weak Geary.Account account { get; private set; }
/**
* Constructs a new account operation.
*
* The passed in `account` will be the account the operation will
* apply to.
*/
protected AccountOperation(Geary.Account account) {
this.account = account;
}
/**
* Fired by after processing when the operation has completed.
*
* This is fired regardless of if an error was thrown after {@link
* execute} is called. It is always fired after either {@link
* succeeded} or {@link failed} is fired.
*
* Implementations should not fire this themselves, the
* processor will do it for them.
*/
public signal void completed();
/**
* Fired by the processor if the operation completes successfully.
*
* This is fired only after {@link execute} was called and did
* not raise an error.
*
* Implementations should not fire this themselves, the
* processor will do it for them.
*/
public signal void succeeded();
/**
* Fired by the processor if the operation throws an error.
*
* This is fired only after {@link execute} was called and
* threw an error. The argument is the error that was thrown.
*
* Implementations should not fire this themselves, the
* processor will do it for them.
*/
public signal void failed(Error err);
/**
* Called by the processor to execute this operation.
*/
public abstract async void execute(Cancellable cancellable) throws Error;
/**
* Determines if this operation is equal to another.
*
* By default assumes that the same instance or two different
* instances of the exact same type are equal. Implementations
* should override it if they wish to guard against different
* instances of the same high-level operation from being executed
* twice.
*/
public virtual bool equal_to(AccountOperation op) {
return (op != null && (this == op || this.get_type() == op.get_type()));
}
/**
* Provides a representation of this operation for debugging.
*
* By default simply returns the name of the class.
*/
public virtual string to_string() {
return this.get_type().name();
}
}
/**
* An account operation that applies to a specific folder.
*
* By default, instances of this class require that another operation
* applies to the same folder as well as having the same type to be
* considered equal, for the purpose of not en-queuing duplicate
* operations.
*/
public abstract class Geary.ImapEngine.FolderOperation : AccountOperation {
/** The folder this operation applies to. */
protected weak Geary.Folder folder { get; private set; }
/**
* Constructs a new folder operation.
*
* The passed in `folder` and `account` will be the objects the
* operation will apply to.
*/
protected FolderOperation(Geary.Account account, Geary.Folder folder) {
base(account);
this.folder = folder;
}
/**
* Determines if another operation is equal to this.
*
* This method compares both chain's up to {@link
* AccountOperation.equal_to} and if equal, compares the paths of
* both operation's folders to determine if `op` is equal to this
* operation.
*/
public override bool equal_to(AccountOperation op) {
return (
base.equal_to(op) &&
this.folder.path.equal_to(((FolderOperation) op).folder.path)
);
}
/**
* Provides a representation of this operation for debugging.
*
* The return value will include its folder's path and the name of
* the class.
*/
public override string to_string() {
return "%s(%s)".printf(base.to_string(), folder.path.to_string());
}
}

View file

@ -0,0 +1,116 @@
/*
* Copyright 2017-2018 Michael Gratton <mike@vee.net>
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* Queues and asynchronously executes {@link AccountOperation} instances.
*
* Operations that are equal to any currently executing or currently
* in the queue will not be re-queued.
*
* Errors thrown are reported to the user via the account's
* `problem_report` signal. Normally if an operation throws an error
* it will not be re-queued, however if a network connection error
* occurs the error will be suppressed and it will be re-attempted
* once, to allow for the network dropping out mid-execution.
*/
internal class Geary.ImapEngine.AccountProcessor : Geary.BaseObject {
// Retry ops after network failures at least once before giving up
private const int MAX_NETWORK_ERRORS = 1;
private static bool op_equal(AccountOperation a, AccountOperation b) {
return a.equal_to(b);
}
/** Determines an operation is currently being executed. */
public bool is_executing { get { return this.current_op != null; } }
/** Returns the number of operations currently waiting in the queue. */
public uint waiting { get { return this.queue.size; } }
/** Fired when an error occurs processing an operation. */
public signal void operation_error(AccountOperation op, Error error);
private string id;
private Nonblocking.Queue<AccountOperation> queue =
new Nonblocking.Queue<AccountOperation>.fifo(op_equal);
private AccountOperation? current_op = null;
private Cancellable cancellable = new Cancellable();
public AccountProcessor(string id) {
this.id = id;
this.queue.allow_duplicates = false;
this.run.begin();
}
public void enqueue(AccountOperation op) {
if (this.current_op == null || !op.equal_to(this.current_op)) {
this.queue.send(op);
}
}
public void stop() {
this.cancellable.cancel();
this.queue.clear();
}
private async void run() {
while (!this.cancellable.is_cancelled()) {
AccountOperation? op = null;
try {
op = yield this.queue.receive(this.cancellable);
} catch (Error err) {
// we've been cancelled, so bail out
return;
}
if (op != null) {
debug("%s: Executing operation: %s", id, op.to_string());
this.current_op = op;
Error? op_error = null;
int network_errors = 0;
while (op_error == null) {
try {
yield op.execute(this.cancellable);
op.succeeded();
break;
} catch (ImapError err) {
if (err is ImapError.NOT_CONNECTED &&
++network_errors <= MAX_NETWORK_ERRORS) {
debug(
"Retrying operation due to network error: %s",
err.message
);
} else {
op_error = err;
}
} catch (Error err) {
op_error = err;
}
}
if (op_error != null) {
op.failed(op_error);
operation_error(op, op_error);
}
op.completed();
this.current_op = null;
}
}
}
}

View file

@ -1,481 +1,141 @@
/* Copyright 2016 Software Freedom Conservancy Inc.
/*
* Copyright 2016 Software Freedom Conservancy Inc.
* Copyright 2017 Michael Gratton <mike@vee.net>
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
private class Geary.ImapEngine.AccountSynchronizer : Geary.BaseObject {
private const int FETCH_DATE_RECEIVED_CHUNK_COUNT = 25;
private const int SYNC_DELAY_SEC = 10;
private const int RETRY_SYNC_DELAY_SEC = 60;
private weak GenericAccount account { get; private set; }
private weak Imap.Account remote { get; private set; }
private Nonblocking.Mailbox<MinimalFolder> bg_queue = new Nonblocking.Mailbox<MinimalFolder>(bg_queue_comparator);
private Gee.HashSet<MinimalFolder> made_available = new Gee.HashSet<MinimalFolder>();
private Gee.HashSet<FolderPath> unavailable_paths = new Gee.HashSet<FolderPath>();
private MinimalFolder? current_folder = null;
private Cancellable? bg_cancellable = null;
private DateTime max_epoch = new DateTime(new TimeZone.local(), 2000, 1, 1, 0, 0, 0.0);
private TimeoutManager prefetch_timer;
private DateTime max_epoch = new DateTime(
new TimeZone.local(), 2000, 1, 1, 0, 0, 0.0
);
public AccountSynchronizer(GenericAccount account, Imap.Account remote) {
public AccountSynchronizer(GenericAccount account) {
this.account = account;
this.remote = remote;
// don't allow duplicates because it's possible for a Folder to change several times
// before finally opened and synchronized, which we only want to do once
this.bg_queue.allow_duplicates = false;
this.bg_queue.requeue_duplicate = false;
this.prefetch_timer = new TimeoutManager.seconds(
10, do_prefetch_changed
);
this.account.information.notify["prefetch-period-days"].connect(on_account_prefetch_changed);
this.account.folders_available_unavailable.connect(on_folders_available_unavailable);
this.account.folders_available_unavailable.connect(on_folders_updated);
this.account.folders_contents_altered.connect(on_folders_contents_altered);
this.account.email_sent.connect(on_email_sent);
this.remote.ready.connect(on_account_ready);
}
public void stop() {
Cancellable? cancellable = this.bg_cancellable;
if (cancellable != null) {
debug("%s: Stopping...", this.account.to_string());
cancellable.cancel();
this.bg_queue.clear();
this.made_available.clear();
this.unavailable_paths.clear();
this.current_folder = null;
}
}
private void on_account_prefetch_changed() {
try {
// treat as an availability check (i.e. as if the account had just opened) because
// just because this value has changed doesn't mean the contents in the folders
// have changed
if (this.account.is_open()) {
delayed_send_all(account.list_folders(), true, SYNC_DELAY_SEC);
}
} catch (Error err) {
debug("Unable to schedule re-sync for %s due to prefetch time changing: %s",
account.to_string(), err.message);
}
}
private void on_folders_available_unavailable(Gee.Collection<Folder>? available,
Gee.Collection<Folder>? unavailable) {
if (available != null) {
foreach (Folder folder in available)
unavailable_paths.remove(folder.path);
delayed_send_all(available, true, SYNC_DELAY_SEC);
}
if (unavailable != null) {
foreach (Folder folder in unavailable)
unavailable_paths.add(folder.path);
revoke_all(unavailable);
}
}
private void on_folders_contents_altered(Gee.Collection<Folder> altered) {
delayed_send_all(altered, false, SYNC_DELAY_SEC);
}
private void on_email_sent() {
try {
Folder? sent_mail = account.get_special_folder(SpecialFolderType.SENT);
if (sent_mail != null)
send_all(iterate<Folder>(sent_mail).to_array_list(), false);
} catch (Error err) {
debug("Unable to retrieve Sent Mail from %s: %s", account.to_string(), err.message);
}
}
private void delayed_send_all(Gee.Collection<Folder> folders, bool reason_available, int sec) {
Timeout.add_seconds(sec, () => {
// remove any unavailable folders
Gee.ArrayList<Folder> trimmed_folders = new Gee.ArrayList<Folder>();
foreach (Folder folder in folders) {
if (!unavailable_paths.contains(folder.path))
trimmed_folders.add(folder);
}
send_all(trimmed_folders, reason_available);
return false;
});
}
private void send_all(Gee.Collection<Folder> folders, bool reason_available) {
private void send_all(Gee.Collection<Folder> folders, bool became_available) {
foreach (Folder folder in folders) {
MinimalFolder? imap_folder = folder as MinimalFolder;
// only deal with ImapEngine.MinimalFolder
if (imap_folder == null)
continue;
// if considering folder not because it's available (i.e. because its contents changed),
// and the folder is open, don't process it; MinimalFolder will take care of changes as
// they occur, in order to remain synchronized
if (!reason_available &&
if (!became_available &&
imap_folder.get_open_state() != Folder.OpenState.CLOSED) {
continue;
}
// don't requeue the currently processing folder
if (imap_folder != current_folder)
bg_queue.send(imap_folder);
// If adding because now available, make sure it's flagged as such, since there's an
// additional check for available folders ... if not, remove from the map so it's
// not treated as such, in case both of these come in back-to-back
if (reason_available && imap_folder != current_folder)
made_available.add(imap_folder);
else
made_available.remove(imap_folder);
}
}
private void revoke_all(Gee.Collection<Folder> folders) {
foreach (Folder folder in folders) {
MinimalFolder? generic_folder = folder as MinimalFolder;
if (generic_folder != null) {
bg_queue.revoke(generic_folder);
made_available.remove(generic_folder);
}
}
}
// This is used to ensure that certain special folders get prioritized over others, so folders
// important to the user (i.e. Inbox) go first while less-used folders (Spam) are fetched last
private static int bg_queue_comparator(MinimalFolder a, MinimalFolder b) {
if (a == b)
return 0;
int cmp = score_folder(a) - score_folder(b);
if (cmp != 0)
return cmp;
// sort by path to stabilize the sort
return a.path.compare_to(b.path);
}
// Lower the score, the higher the importance.
//
// Some explanation is due here. It may seem odd to place TRASH, SENT, and DRAFTS so high, but
// there's a method to the madness. In particular, because Geary can produce a lot of drafts
// during composition, it's important to synchronize with Trash so discarded drafts don't wind
// up included in conversations until, eventually, the Trash is synchronized. (Recall that
// Spam and Trash are blacklisted in conversations and searching.) Since Drafts is open while
// writing them, it's not vital to keep it absolutely high, but Trash is usually not open,
// so it should be.
//
// All Mail is important, but synchronizing with it can be hard on the system because of the
// sheer amount of messages, and so it's placed lower to put it off until the more active
// folders are finished.
private static int score_folder(Folder a) {
switch (a.special_folder_type) {
case SpecialFolderType.INBOX:
return -70;
case SpecialFolderType.TRASH:
return -60;
case SpecialFolderType.SENT:
return -50;
case SpecialFolderType.DRAFTS:
return -40;
case SpecialFolderType.FLAGGED:
return -30;
case SpecialFolderType.IMPORTANT:
return -20;
case SpecialFolderType.ALL_MAIL:
case SpecialFolderType.ARCHIVE:
return -10;
case SpecialFolderType.SPAM:
return 10;
default:
return 0;
}
}
private async void process_queue_async() {
if (this.bg_cancellable != null) {
return;
}
Cancellable cancellable = this.bg_cancellable = new Cancellable();
AccountOperation op = became_available
? new CheckFolderSync(this.account, imap_folder, this.max_epoch)
: new RefreshFolderSync(this.account, imap_folder);
debug("%s: Starting background sync", this.account.to_string());
while (!cancellable.is_cancelled()) {
MinimalFolder folder;
try {
folder = yield bg_queue.recv_async(bg_cancellable);
this.account.queue_operation(op);
} catch (Error err) {
if (!(err is IOError.CANCELLED))
debug("Failed to receive next folder for background sync: %s", err.message);
break;
debug("Failed to queue sync operation: %s", err.message);
}
}
}
// generate the current epoch for synchronization (could cache this value, obviously, but
// doesn't seem like this biggest win in this class)
DateTime epoch;
if (account.information.prefetch_period_days >= 0) {
epoch = new DateTime.now_local();
epoch = epoch.add_days(0 - account.information.prefetch_period_days);
} else {
epoch = max_epoch;
}
bool availability_check = false;
private void do_prefetch_changed() {
// treat as an availability check (i.e. as if the account had
// just opened) because just because this value has changed
// doesn't mean the contents in the folders have changed
if (this.account.is_open()) {
try {
// mark as current folder to prevent requeues while processing
this.current_folder = folder;
availability_check = this.made_available.remove(folder);
yield process_folder_async(folder, availability_check, epoch, cancellable);
send_all(this.account.list_folders(), true);
} catch (Error err) {
// retry the folder later
delayed_send_all(
iterate<Folder>(folder).to_array_list(),
availability_check,
RETRY_SYNC_DELAY_SEC
);
if (!(err is IOError.CANCELLED)) {
debug("%s: Error synchronising %s: %s",
this.account.to_string(), folder.to_string(), err.message);
}
break;
} finally {
this.current_folder = null;
debug("Failed to list account folders for sync: %s", err.message);
}
}
this.bg_cancellable = null;
}
// Returns false if IOError.CANCELLED received
private async void process_folder_async(MinimalFolder folder,
bool availability_check,
DateTime epoch,
Cancellable cancellable)
throws Error {
Logging.debug(
Logging.Flag.PERIODIC,
"Background sync'ing %s to %s",
folder.to_string(),
epoch.to_string()
);
// If we aren't checking the folder because it became
// available, then it has changed and we need to check it.
// Otherwise compare the oldest mail in the local store and
// see if it is before the epoch; if so, no need to
// synchronize simply because this Folder is available; wait
// for its contents to change instead.
//
// Note we can't compare the local and remote folder counts
// here, since the folder may not have opened yet to determine
// what the actual remote count is, which is particularly
// problematic when an existing folder is seen for the first
// time, e.g. when the account was just added.
DateTime? oldest_local = null;
Geary.EmailIdentifier? oldest_local_id = null;
bool do_sync = true;
if (!availability_check) {
// Folder already available, so it must have changed
Logging.debug(
Logging.Flag.PERIODIC,
"Folder %s changed, synchronizing...",
folder.to_string()
);
} else {
// get oldest local email and its time, as well as number
// of messages in local store
Gee.List<Geary.Email>? list =yield folder.local_folder.list_email_by_id_async(
null,
1,
Email.Field.PROPERTIES,
ImapDB.Folder.ListFlags.NONE | ImapDB.Folder.ListFlags.OLDEST_TO_NEWEST,
cancellable
);
if (list != null && list.size > 0) {
oldest_local = list[0].properties.date_received;
oldest_local_id = list[0].id;
}
if (oldest_local == null) {
// No oldest message found, so we haven't seen the folder
// before or it has no messages. Either way we need to
// open it to check, so sync it.
Logging.debug(
Logging.Flag.PERIODIC,
"No oldest message found for %s, synchronizing...",
folder.to_string()
);
} else if (oldest_local.compare(epoch) < 0) {
// Oldest local email before epoch, don't sync from network
do_sync = false;
Logging.debug(
Logging.Flag.PERIODIC,
"Oldest local message is older than the epoch for %s",
folder.to_string()
);
}
}
if (do_sync) {
bool opened = false;
try {
yield folder.open_async(Folder.OpenFlags.FAST_OPEN, cancellable);
opened = true;
yield sync_folder_async(folder, epoch, oldest_local, oldest_local_id, cancellable);
} finally {
if (opened) {
try {
// don't pass Cancellable; really need this to complete in all cases
yield folder.close_async();
} catch (Error err) {
debug("%s: Error closing folder %s: %s",
this.account.to_string(), folder.to_string(), err.message);
}
}
}
}
Logging.debug(
Logging.Flag.PERIODIC, "Background sync of %s completed",
folder.to_string()
);
private void on_account_prefetch_changed() {
this.prefetch_timer.start();
}
private async void sync_folder_async(MinimalFolder folder,
DateTime epoch,
DateTime? oldest_local,
Geary.EmailIdentifier? oldest_local_id,
Cancellable cancellable)
throws Error {
// wait for the folder to be fully opened to be sure we have all the most current
// information
yield folder.wait_for_open_async(cancellable);
// only perform vector expansion if oldest isn't old enough
if (oldest_local == null || oldest_local.compare(epoch) > 0) {
// go back three months at a time to the epoch, performing a little vector expansion at a
// time rather than all at once (which will stall the replay queue)
DateTime current_epoch = (oldest_local != null) ? oldest_local : new DateTime.now_local();
do {
// look for complete synchronization of UIDs (i.e. complete vector normalization)
// no need to keep searching once this happens
int local_count = yield folder.local_folder.get_email_count_async(ImapDB.Folder.ListFlags.NONE,
cancellable);
int remote_count = folder.properties.email_total;
if (local_count >= remote_count) {
Logging.debug(
Logging.Flag.PERIODIC,
"Final vector normalization for %s: %d/%d emails",
folder.to_string(),
local_count,
remote_count
);
break;
}
current_epoch = current_epoch.add_months(-3);
// if past max_epoch, then just pull in everything and be done with it
if (current_epoch.compare(max_epoch) < 0) {
Logging.debug(
Logging.Flag.PERIODIC,
"Synchronization reached max epoch of %s, fetching all mail from %s (already got %d of %d emails)",
max_epoch.to_string(),
folder.to_string(),
local_count,
remote_count
);
// Per the contract for list_email_by_id_async, we
// need to specify int.MAX count and ensure that
// ListFlags.OLDEST_TO_NEWEST is *not* specified
// to get all messages listed.
//
// XXX This is expensive, but should only usually
// happen once per folder - at the end of a full
// sync.
yield folder.list_email_by_id_async(
null,
int.MAX,
Geary.Email.Field.NONE,
Geary.Folder.ListFlags.NONE,
cancellable
);
} else {
// don't go past proscribed epoch
if (current_epoch.compare(epoch) < 0)
current_epoch = epoch;
Logging.debug(
Logging.Flag.PERIODIC,
"Synchronizing %s to %s (already got %d of %d emails)",
folder.to_string(),
current_epoch.to_string(),
local_count,
remote_count
);
Geary.EmailIdentifier? earliest_span_id = yield folder.find_earliest_email_async(current_epoch,
oldest_local_id, cancellable);
if (earliest_span_id == null && current_epoch.compare(epoch) <= 0) {
Logging.debug(
Logging.Flag.PERIODIC,
"Unable to locate epoch messages on remote folder %s%s, fetching one past oldest...",
folder.to_string(),
(oldest_local_id != null) ? " earlier than oldest local" : ""
);
// if there's nothing between the oldest local and the epoch, that means the
// mail just prior to our local oldest is oldest than the epoch; rather than
// continually thrashing looking for something that's just out of reach, add it
// to the folder and be done with it ... note that this even works if oldest_local_id
// is null, as that means the local folder is empty and so we should at least
// pull the first one to get a marker of age
yield folder.list_email_by_id_async(oldest_local_id, 1, Geary.Email.Field.NONE,
Geary.Folder.ListFlags.NONE, cancellable);
} else if (earliest_span_id != null) {
// use earliest email from that span for the next round
oldest_local_id = earliest_span_id;
}
}
yield Scheduler.sleep_ms_async(200);
} while (current_epoch.compare(epoch) > 0);
} else {
Logging.debug(
Logging.Flag.PERIODIC,
"No expansion necessary for %s, oldest local (%s) is before epoch (%s)",
folder.to_string(),
oldest_local.to_string(),
epoch.to_string()
);
private void on_folders_updated(Gee.Collection<Folder>? available,
Gee.Collection<Folder>? unavailable) {
if (available != null) {
send_all(available, true);
}
}
// always give email prefetcher time to finish its work
Logging.debug(
Logging.Flag.PERIODIC,
"Waiting for email prefetcher to complete %s...",
folder.to_string()
);
private void on_folders_contents_altered(Gee.Collection<Folder> altered) {
send_all(altered, false);
}
}
/**
* Synchronises a folder after its contents have changed.
*
* This synchronisation process simply opens the remote folder, waits
* for it to finish opening for normalisation and pre-fetching to
* complete, then closes it again.
*/
private class Geary.ImapEngine.RefreshFolderSync : FolderOperation {
internal RefreshFolderSync(GenericAccount account,
MinimalFolder folder) {
base(account, folder);
}
public override async void execute(Cancellable cancellable)
throws Error {
bool opened = false;
try {
yield folder.email_prefetcher.active_sem.wait_async(cancellable);
yield this.folder.open_async(Folder.OpenFlags.FAST_OPEN, cancellable);
opened = true;
yield this.folder.wait_for_open_async(cancellable);
yield sync_folder(cancellable);
} finally {
if (opened) {
try {
// don't pass in the Cancellable; really need this
// to complete in all cases
yield this.folder.close_async();
} catch (Error err) {
debug(
"%s: Error closing folder %s: %s",
this.account.to_string(),
this.folder.to_string(),
err.message
);
}
}
}
}
protected virtual async void sync_folder(Cancellable cancellable)
throws Error {
yield wait_for_prefetcher(cancellable);
}
protected async void wait_for_prefetcher(Cancellable cancellable)
throws Error {
MinimalFolder minimal = (MinimalFolder) this.folder;
try {
yield minimal.email_prefetcher.active_sem.wait_async(cancellable);
} catch (Error err) {
Logging.debug(
Logging.Flag.PERIODIC,
@ -484,16 +144,180 @@ private class Geary.ImapEngine.AccountSynchronizer : Geary.BaseObject {
err.message
);
}
Logging.debug(
Logging.Flag.PERIODIC,
"Done background sync'ing %s",
folder.to_string()
);
}
private void on_account_ready() {
this.process_queue_async.begin();
}
}
/**
* Synchronises a folder after first checking if it needs to be sync'ed.
*
* This synchronisation process performs the same work as its base
* class, but also ensures enough mail has been fetched to satisfy the
* account's prefetch period, by checking the earliest mail in the
* folder and if later than the maximum prefetch epoch, expands the
* folder's vector until it does.
*/
private class Geary.ImapEngine.CheckFolderSync : RefreshFolderSync {
private DateTime sync_max_epoch;
internal CheckFolderSync(GenericAccount account,
MinimalFolder folder,
DateTime sync_max_epoch) {
base(account, folder);
this.sync_max_epoch = sync_max_epoch;
}
protected override async void sync_folder(Cancellable cancellable)
throws Error {
// Determine the earliest date we should be synchronising back to
DateTime prefetch_max_epoch;
if (this.account.information.prefetch_period_days >= 0) {
prefetch_max_epoch = new DateTime.now_local();
prefetch_max_epoch = prefetch_max_epoch.add_days(
0 - account.information.prefetch_period_days
);
} else {
prefetch_max_epoch = this.sync_max_epoch;
}
// get oldest local email and its time, as well as number
// of messages in local store
ImapDB.Folder local_folder = ((MinimalFolder) this.folder).local_folder;
Gee.List<Geary.Email>? list = yield local_folder.list_email_by_id_async(
null,
1,
Email.Field.PROPERTIES,
ImapDB.Folder.ListFlags.NONE | ImapDB.Folder.ListFlags.OLDEST_TO_NEWEST,
cancellable
);
Geary.Email? current_oldest = null;
if (list != null && list.size > 0) {
current_oldest = list[0];
}
DateTime? oldest_date = (current_oldest != null)
? current_oldest.properties.date_received : null;
if (oldest_date == null) {
oldest_date = new DateTime.now_local();
}
DateTime? next_epoch = oldest_date;
while (next_epoch.compare(prefetch_max_epoch) > 0) {
int local_count = yield local_folder.get_email_count_async(
ImapDB.Folder.ListFlags.NONE, cancellable
);
next_epoch = next_epoch.add_months(-3);
if (next_epoch.compare(prefetch_max_epoch) < 0) {
next_epoch = prefetch_max_epoch;
}
debug("%s *** syncing to: %s", this.account.to_string(), next_epoch.to_string());
if (local_count < this.folder.properties.email_total &&
next_epoch.compare(prefetch_max_epoch) >= 0) {
if (next_epoch.compare(this.sync_max_epoch) > 0) {
current_oldest = yield expand_vector(
next_epoch, current_oldest, cancellable
);
if (current_oldest == null &&
next_epoch.equal(prefetch_max_epoch)) {
yield expand_to_previous(
current_oldest, cancellable
);
// Exit next time around
next_epoch = prefetch_max_epoch.add_days(-1);
}
} else {
yield expand_complete_vector(cancellable);
// Exit next time around
next_epoch = prefetch_max_epoch.add_days(-1);
}
} else {
// Exit next time around
next_epoch = prefetch_max_epoch.add_days(-1);
}
// let the prefetcher catch up
yield wait_for_prefetcher(cancellable);
}
}
private async Geary.Email? expand_vector(DateTime next_epoch,
Geary.Email? current_oldest,
Cancellable cancellable)
throws Error {
// Expand the vector up until the given epoch
Logging.debug(
Logging.Flag.PERIODIC,
"Synchronizing %s:%s to %s",
this.account.to_string(),
this.folder.to_string(),
next_epoch.to_string()
);
return yield ((MinimalFolder) this.folder).find_earliest_email_async(
next_epoch,
(current_oldest != null) ? current_oldest.id : null,
cancellable
);
}
private async void expand_to_previous(Geary.Email? current_oldest,
Cancellable cancellable)
throws Error {
// there's nothing between the oldest local and the epoch,
// which means the mail just prior to our local oldest is
// oldest than the epoch; rather than continually thrashing
// looking for something that's just out of reach, add it to
// the folder and be done with it ... note that this even
// works if id is null, as that means the local folder is
// empty and so we should at least pull the first one to get a
// marker of age
Geary.EmailIdentifier? id =
(current_oldest != null) ? current_oldest.id : null;
Logging.debug(
Logging.Flag.PERIODIC,
"Unable to locate epoch messages on remote folder %s:%s%s, fetching one past oldest...",
this.account.to_string(),
this.folder.to_string(),
(id != null) ? " earlier than oldest local" : ""
);
yield this.folder.list_email_by_id_async(
id, 1,
Geary.Email.Field.NONE,
Geary.Folder.ListFlags.NONE, cancellable
);
}
private async void expand_complete_vector(Cancellable cancellable)
throws Error {
// past max_epoch, so just pull in everything and be done with it
Logging.debug(
Logging.Flag.PERIODIC,
"Synchronization reached max epoch of %s, fetching all mail from %s:%s",
this.sync_max_epoch.to_string(),
this.account.to_string(),
this.folder.to_string()
);
// Per the contract for list_email_by_id_async, we need to
// specify int.MAX count and ensure that
// ListFlags.OLDEST_TO_NEWEST is *not* specified to get all
// messages listed.
//
// XXX This is expensive, but should only usually happen once
// per folder - at the end of a full sync.
yield this.folder.list_email_by_id_async(
null,
int.MAX,
Geary.Email.Field.NONE,
Geary.Folder.ListFlags.NONE,
cancellable
);
}
}

View file

@ -1,158 +0,0 @@
/* Copyright 2016 Software Freedom Conservancy Inc.
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* Monitor an open {@link ImapEngine.GenericFolder} for changes to {@link EmailFlags}.
*
* Because IMAP doesn't offer a standard mechanism for server notifications of email flags changing,
* have to poll for changes. This class performs this task by monitoring the supplied
* folder for its "opened" and "closed" signals and periodically polling for changes.
*
* Note that EmailFlagWatcher doesn't maintain a reference to the Geary.Folder it's watching.
*/
private class Geary.ImapEngine.EmailFlagWatcher : BaseObject {
public const int DEFAULT_FLAG_WATCH_SEC = 3 * 60;
private const int PULL_CHUNK_COUNT = 100;
private unowned Geary.Folder folder;
private int seconds;
private uint watch_id = 0;
private bool in_flag_watch = false;
private Cancellable cancellable = new Cancellable();
public signal void email_flags_changed(Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> changed);
public EmailFlagWatcher(Geary.Folder folder, int seconds = DEFAULT_FLAG_WATCH_SEC) {
assert(seconds > 0);
this.folder = folder;
this.seconds = seconds;
folder.opened.connect(on_opened);
folder.closed.connect(on_closed);
}
~EmailFlagWatcher() {
if (watch_id != 0)
message("Warning: Geary.FlagWatcher destroyed before folder closed");
folder.opened.disconnect(on_opened);
folder.closed.disconnect(on_closed);
}
private void on_opened(Geary.Folder.OpenState open_state) {
if (open_state != Geary.Folder.OpenState.BOTH)
return;
cancellable = new Cancellable();
if (watch_id == 0)
watch_id = Idle.add(on_opened_update_flags);
}
private void on_closed(Geary.Folder.CloseReason close_reason) {
if (close_reason != Geary.Folder.CloseReason.FOLDER_CLOSED)
return;
cancellable.cancel();
if (watch_id != 0)
Source.remove(watch_id);
watch_id = 0;
}
private bool on_opened_update_flags() {
flag_watch_async.begin();
// this callback was immediately called due to open, schedule next ones for here on out
// on a timer
watch_id = Timeout.add_seconds(seconds, on_flag_watch);
return false;
}
private bool on_flag_watch() {
flag_watch_async.begin();
watch_id = 0;
// return false and reschedule when finished
return false;
}
private async void flag_watch_async() {
// prevent reentrancy and don't run if folder is closed
if (!in_flag_watch && !cancellable.is_cancelled()) {
in_flag_watch = true;
try {
yield do_flag_watch_async();
} catch (Error err) {
if (!(err is IOError.CANCELLED))
debug("%s flag watch error: %s", folder.to_string(), err.message);
else
debug("%s flag watch cancelled", folder.to_string());
}
in_flag_watch = false;
}
// reschedule if not already, and if not cancelled (folder is closed)
if (watch_id == 0 && !cancellable.is_cancelled())
watch_id = Timeout.add_seconds(seconds, on_flag_watch);
}
private async void do_flag_watch_async() throws Error {
Logging.debug(Logging.Flag.PERIODIC, "do_flag_watch_async begin %s", folder.to_string());
Geary.EmailIdentifier? lowest = null;
int total = 0;
for (;;) {
Gee.List<Geary.Email>? list_local = yield folder.list_email_by_id_async(lowest,
PULL_CHUNK_COUNT, Geary.Email.Field.FLAGS, Geary.Folder.ListFlags.LOCAL_ONLY, cancellable);
if (list_local == null || list_local.is_empty)
break;
total += list_local.size;
// find the lowest for the next iteration
lowest = Geary.EmailIdentifier.sort_emails(list_local).first().id;
// Get all email identifiers in the local folder mapped to their EmailFlags
Gee.HashMap<Geary.EmailIdentifier, Geary.EmailFlags> local_map = new Gee.HashMap<
Geary.EmailIdentifier, Geary.EmailFlags>();
foreach (Geary.Email e in list_local)
local_map.set(e.id, e.email_flags);
// Fetch e-mail from folder using force update, which will cause the cache to be bypassed
// and the latest to be gotten from the server (updating the cache in the process)
Logging.debug(Logging.Flag.PERIODIC, "do_flag_watch_async: fetching %d flags for %s",
local_map.keys.size, folder.to_string());
Gee.List<Geary.Email>? list_remote = yield folder.list_email_by_sparse_id_async(local_map.keys,
Email.Field.FLAGS, Geary.Folder.ListFlags.FORCE_UPDATE, cancellable);
if (list_remote == null || list_remote.is_empty)
break;
// Build map of emails that have changed.
Gee.HashMap<Geary.EmailIdentifier, Geary.EmailFlags> changed_map =
new Gee.HashMap<Geary.EmailIdentifier, Geary.EmailFlags>();
foreach (Geary.Email e in list_remote) {
if (!local_map.has_key(e.id))
continue;
if (!local_map.get(e.id).equal_to(e.email_flags))
changed_map.set(e.id, e.email_flags);
}
if (!cancellable.is_cancelled() && changed_map.size > 0)
email_flags_changed(changed_map);
}
Logging.debug(Logging.Flag.PERIODIC, "do_flag_watch_async: completed %s, %d messages updates",
folder.to_string(), total);
}
}

View file

@ -21,60 +21,49 @@ private class Geary.ImapEngine.EmailPrefetcher : Object {
default = new Nonblocking.CountingSemaphore(null); }
private unowned ImapEngine.MinimalFolder folder;
private int start_delay_sec;
private Nonblocking.Mutex mutex = new Nonblocking.Mutex();
private Gee.TreeSet<Geary.Email> prefetch_emails = new Gee.TreeSet<Geary.Email>(
Email.compare_recv_date_descending);
private uint schedule_id = 0;
private Cancellable cancellable = new Cancellable();
private TimeoutManager prefetch_timer;
private Cancellable? cancellable = null;
public EmailPrefetcher(ImapEngine.MinimalFolder folder, int start_delay_sec = PREFETCH_DELAY_SEC) {
assert(start_delay_sec > 0);
this.folder = folder;
this.start_delay_sec = start_delay_sec;
folder.opened.connect(on_opened);
folder.closed.connect(on_closed);
folder.email_locally_appended.connect(on_local_expansion);
folder.email_locally_inserted.connect(on_local_expansion);
}
~EmailPrefetcher() {
if (schedule_id != 0)
message("Warning: Geary.EmailPrefetcher destroyed before folder closed");
folder.opened.disconnect(on_opened);
folder.closed.disconnect(on_closed);
folder.email_locally_appended.disconnect(on_local_expansion);
folder.email_locally_inserted.disconnect(on_local_expansion);
}
private void on_opened(Geary.Folder.OpenState open_state) {
if (open_state != Geary.Folder.OpenState.BOTH)
return;
cancellable = new Cancellable();
// acquire here since .begin() only schedules for later
active_sem.acquire();
do_prepare_all_local_async.begin();
}
private void on_closed(Geary.Folder.CloseReason close_reason) {
// cancel for any reason ... this will be called multiple times, but the following operations
// can be executed any number of times and still get the desired results
cancellable.cancel();
if (schedule_id != 0) {
Source.remove(schedule_id);
schedule_id = 0;
// since an acquire was done when scheduled, need to notify when cancelled
active_sem.blind_notify();
if (start_delay_sec <= 0) {
start_delay_sec = PREFETCH_DELAY_SEC;
}
this.prefetch_timer = new TimeoutManager.seconds(
start_delay_sec, () => { do_prefetch_async.begin(); }
);
}
public void open() {
this.cancellable = new Cancellable();
this.folder.email_locally_appended.connect(on_local_expansion);
this.folder.email_locally_inserted.connect(on_local_expansion);
// acquire here since .begin() only schedules for later
this.active_sem.acquire();
this.do_prepare_all_local_async.begin();
}
public void close() {
if (this.prefetch_timer.is_running) {
this.prefetch_timer.reset();
// since an acquire was done when scheduled, need to
// notify when cancelled
this.active_sem.blind_notify();
}
this.folder.email_locally_appended.disconnect(on_local_expansion);
this.folder.email_locally_inserted.disconnect(on_local_expansion);
this.cancellable = null;
}
private void on_local_expansion(Gee.Collection<Geary.EmailIdentifier> ids) {
// it's possible to be notified of an append prior to remote open; don't prefetch until
// that occurs
@ -85,30 +74,24 @@ private class Geary.ImapEngine.EmailPrefetcher : Object {
active_sem.acquire();
do_prepare_new_async.begin(ids);
}
// emails should include PROPERTIES
private void schedule_prefetch(Gee.Collection<Geary.Email>? emails) {
if (emails == null || emails.size == 0)
return;
debug("%s: scheduling %d emails for prefetching", folder.to_string(), emails.size);
prefetch_emails.add_all(emails);
debug("%s: scheduling %d emails for prefetching",
folder.to_string(), emails.size);
this.prefetch_emails.add_all(emails);
// only increment active state if not rescheduling
if (schedule_id != 0)
Source.remove(schedule_id);
else
active_sem.acquire();
schedule_id = Timeout.add_seconds(start_delay_sec, () => {
schedule_id = 0;
do_prefetch_async.begin();
return false;
});
if (!this.prefetch_timer.is_running) {
this.active_sem.acquire();
}
this.prefetch_timer.start();
}
private async void do_prepare_all_local_async() {
Gee.List<Geary.Email>? list = null;
try {

File diff suppressed because it is too large Load diff

View file

@ -26,7 +26,13 @@
private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport.Copy,
Geary.FolderSupport.Mark, Geary.FolderSupport.Move {
private const int FLAG_UPDATE_TIMEOUT_SEC = 2;
private const int FLAG_UPDATE_START_CHUNK = 20;
private const int FLAG_UPDATE_MAX_CHUNK = 100;
private const int FORCE_OPEN_REMOTE_TIMEOUT_SEC = 10;
private const int REFRESH_UNSEEN_TIMEOUT_SEC = 1;
public override Account account { get { return _account; } }
@ -51,8 +57,9 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
internal ImapDB.Folder local_folder { get; protected set; }
internal Imap.Folder? remote_folder { get; protected set; default = null; }
internal EmailPrefetcher email_prefetcher { get; private set; }
internal EmailFlagWatcher email_flag_watcher;
internal int remote_count { get; private set; default = -1; }
internal ReplayQueue replay_queue { get; private set; }
private weak GenericAccount _account;
private Geary.AggregatedFolderProperties _properties = new Geary.AggregatedFolderProperties(
false, false);
@ -65,12 +72,13 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
private Nonblocking.ReportingSemaphore<bool> remote_semaphore =
new Nonblocking.ReportingSemaphore<bool>(false);
private Nonblocking.Semaphore closed_semaphore = new Nonblocking.Semaphore();
private ReplayQueue replay_queue;
private int remote_count = -1;
private Nonblocking.Mutex open_mutex = new Nonblocking.Mutex();
private Nonblocking.Mutex close_mutex = new Nonblocking.Mutex();
private TimeoutManager update_flags_timer;
private TimeoutManager refresh_unseen_timer;
private Cancellable? open_cancellable = null;
/**
* Called when the folder is closing (and not reestablishing a connection) and will be flushing
* the replay queue. Subscribers may add ReplayOperations to the list, which will be enqueued
@ -93,6 +101,10 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
*/
public signal void marked_email_removed(Gee.Collection<Geary.EmailIdentifier> removed);
/** Emitted to notify the account that some problem has occurred. */
internal signal void report_problem(Geary.ProblemReport problem);
public MinimalFolder(GenericAccount account, Imap.Account remote, ImapDB.Account local,
ImapDB.Folder local_folder, SpecialFolderType special_folder_type) {
this._account = account;
@ -102,16 +114,20 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
);
this.local = local;
this.local_folder = local_folder;
_special_folder_type = special_folder_type;
_properties.add(local_folder.get_properties());
replay_queue = new ReplayQueue(this);
email_flag_watcher = new EmailFlagWatcher(this);
email_flag_watcher.email_flags_changed.connect(on_email_flags_changed);
email_prefetcher = new EmailPrefetcher(this);
local_folder.email_complete.connect(on_email_complete);
this.local_folder.email_complete.connect(on_email_complete);
this._special_folder_type = special_folder_type;
this._properties.add(local_folder.get_properties());
this.replay_queue = new ReplayQueue(this);
this.email_prefetcher = new EmailPrefetcher(this);
this.update_flags_timer = new TimeoutManager.seconds(
FLAG_UPDATE_TIMEOUT_SEC, () => { on_update_flags.begin(); }
);
this.refresh_unseen_timer = new TimeoutManager.seconds(
REFRESH_UNSEEN_TIMEOUT_SEC, on_refresh_unseen
);
// Notify now to ensure that wait_for_close_async does not
// block if never opened.
@ -569,13 +585,22 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
// first open gets to name the flags, but see note above
this.open_flags = open_flags;
// reset to force waiting in wait_for_open_async()
remote_semaphore.reset();
this.remote_semaphore.reset();
// reset to force waiting in wait_for_close_async()
closed_semaphore.reset();
this.closed_semaphore.reset();
// reset unseen count refresh since it will be updated when
// the remote opens
this.refresh_unseen_timer.reset();
this.open_cancellable = new Cancellable();
// Notify the email prefetcher
this.email_prefetcher.open();
// Unless NO_DELAY is set, do NOT open the remote side here; wait for the ReplayQueue to
// require a remote connection or wait_for_open_async() to be called ... this allows for
// fast local-only operations to occur, local-only either because (a) the folder has all
@ -680,6 +705,7 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
// signals
opening_folder.appended.connect(on_remote_appended);
opening_folder.updated.connect(on_remote_updated);
opening_folder.removed.connect(on_remote_removed);
opening_folder.disconnected.connect(on_remote_disconnected);
@ -789,11 +815,16 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
}
_properties.add(remote_folder.properties);
// notify any subscribers with similar information
notify_opened(Geary.Folder.OpenState.BOTH, remote_count);
// Update flags once the folder has opened. We will receive
// notifications of changes as long as it remains open, so
// only need to do this once
this.update_flags_timer.start();
}
public override async bool close_async(Cancellable? cancellable = null) throws Error {
// Check open_count but only decrement inside of replay queue
if (open_count <= 0)
@ -815,10 +846,13 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
// decrement open_count and, if zero, continue closing Folder
if (open_count == 0 || --open_count > 0)
return false;
// Close the prefetcher early so it stops using the remote ASAP
this.email_prefetcher.close();
if (remote_folder != null)
_properties.remove(remote_folder.properties);
// block anyone from wait_until_open_async(), as this is no longer open
remote_semaphore.reset();
@ -958,9 +992,14 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
// Returns the remote_folder, if it was set
private Imap.Folder? clear_remote_folder() {
// Cancel any internal pending operations before unhooking
this.open_cancellable.cancel();
this.open_cancellable = null;
if (remote_folder != null) {
// disconnect signals before ripping out reference
remote_folder.appended.disconnect(on_remote_appended);
remote_folder.updated.disconnect(on_remote_updated);
remote_folder.removed.disconnect(on_remote_removed);
remote_folder.disconnected.disconnect(on_remote_disconnected);
}
@ -1044,95 +1083,29 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
Gee.List<Imap.SequenceNumber> positions = new Gee.ArrayList<Imap.SequenceNumber>();
for (int pos = remote_count + 1; pos <= reported_remote_count; pos++)
positions.add(new Imap.SequenceNumber(pos));
// store the remote count NOW, as further appended messages could arrive before the
// ReplayAppend executes
remote_count = reported_remote_count;
if (positions.size > 0)
replay_queue.schedule_server_notification(new ReplayAppend(this, reported_remote_count, positions));
}
// Need to prefetch at least an EmailIdentifier (and duplicate detection fields) to create a
// normalized placeholder in the local database of the message, so all positions are
// properly relative to the end of the message list; once this is done, notify user of new
// messages. If duplicates, create_email_async() will fall through to an updated merge,
// which is exactly what we want.
//
// This MUST only be called from ReplayAppend.
internal async void do_replay_appended_messages(int reported_remote_count,
Gee.List<Imap.SequenceNumber> remote_positions) {
StringBuilder positions_builder = new StringBuilder("( ");
foreach (Imap.SequenceNumber remote_position in remote_positions)
positions_builder.append_printf("%s ", remote_position.to_string());
positions_builder.append(")");
debug("%s do_replay_appended_message: current remote_count=%d reported_remote_count=%d remote_positions=%s",
to_string(), remote_count, reported_remote_count, positions_builder.str);
if (remote_positions.size == 0)
return;
Gee.HashSet<Geary.EmailIdentifier> created = new Gee.HashSet<Geary.EmailIdentifier>();
Gee.HashSet<Geary.EmailIdentifier> appended = new Gee.HashSet<Geary.EmailIdentifier>();
try {
Gee.List<Imap.MessageSet> msg_sets = Imap.MessageSet.sparse(remote_positions);
foreach (Imap.MessageSet msg_set in msg_sets) {
Gee.List<Geary.Email>? list = yield remote_folder.list_email_async(msg_set,
ImapDB.Folder.REQUIRED_FIELDS, null);
if (list != null && list.size > 0) {
debug("%s do_replay_appended_message: %d new messages in %s", to_string(),
list.size, msg_set.to_string());
// need to report both if it was created (not known before) and appended (which
// could mean created or simply a known email associated with this folder)
Gee.Map<Geary.Email, bool> created_or_merged =
yield local_folder.create_or_merge_email_async(list, null);
foreach (Geary.Email email in created_or_merged.keys) {
// true means created
if (created_or_merged.get(email)) {
debug("%s do_replay_appended_message: appended email ID %s added",
to_string(), email.id.to_string());
created.add(email.id);
} else {
debug("%s do_replay_appended_message: appended email ID %s associated",
to_string(), email.id.to_string());
}
appended.add(email.id);
}
} else {
debug("%s do_replay_appended_message: no new messages in %s", to_string(),
msg_set.to_string());
}
}
} catch (Error err) {
debug("%s do_replay_appended_message: Unable to process: %s",
to_string(), err.message);
this.remote_count = reported_remote_count;
if (positions.size > 0) {
ReplayAppend op = new ReplayAppend(this, reported_remote_count, positions);
op.email_appended.connect(notify_email_appended);
op.email_locally_appended.connect(notify_email_locally_appended);
op.email_count_changed.connect(notify_email_count_changed);
this.replay_queue.schedule_server_notification(op);
}
// store the reported count, *not* the current count (which is updated outside the of
// the queue) to ensure that updates happen serially and reflect committed local changes
try {
yield local_folder.update_remote_selected_message_count(reported_remote_count, null);
} catch (Error err) {
debug("%s do_replay_appended_message: Unable to save appended remote count %d: %s",
to_string(), reported_remote_count, err.message);
}
if (appended.size > 0)
notify_email_appended(appended);
if (created.size > 0)
notify_email_locally_appended(created);
notify_email_count_changed(reported_remote_count, CountChangeReason.APPENDED);
debug("%s do_replay_appended_message: completed, current remote_count=%d reported_remote_count=%d",
to_string(), remote_count, reported_remote_count);
}
private void on_remote_updated(Imap.SequenceNumber position, Imap.FetchedData data) {
debug("%s on_remote_updated: remote_count=%d position=%s", to_string(),
this.remote_count, position.to_string());
this.replay_queue.schedule_server_notification(
new ReplayUpdate(this, this.remote_count, position, data)
);
}
private void on_remote_removed(Imap.SequenceNumber position, int reported_remote_count) {
debug("%s on_remote_removed: remote_count=%d position=%s reported_remote_count=%d", to_string(),
remote_count, position.to_string(), reported_remote_count);
@ -1151,109 +1124,15 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
// remote_count is *not* updated, which is why it's safe to do that here without worry.
// similarly, signals are only fired here if marked, so the same EmailIdentifier isn't
// reported twice
remote_count = reported_remote_count;
replay_queue.schedule_server_notification(new ReplayRemoval(this, reported_remote_count, position));
this.remote_count = reported_remote_count;
ReplayRemoval op = new ReplayRemoval(this, reported_remote_count, position);
op.email_removed.connect(notify_email_removed);
op.marked_email_removed.connect(notify_marked_email_removed);
op.email_count_changed.connect(notify_email_count_changed);
this.replay_queue.schedule_server_notification(op);
}
// This MUST only be called from ReplayRemoval.
internal async void do_replay_removed_message(int reported_remote_count, Imap.SequenceNumber remote_position) {
debug("%s do_replay_removed_message: current remote_count=%d remote_position=%s reported_remote_count=%d",
to_string(), remote_count, remote_position.value.to_string(), reported_remote_count);
if (!remote_position.is_valid()) {
debug("%s do_replay_removed_message: ignoring, invalid remote position or count",
to_string());
return;
}
int local_count = -1;
int64 local_position = -1;
ImapDB.EmailIdentifier? owned_id = null;
try {
// need total count, including those marked for removal, to accurately calculate position
// from server's point of view, not client's
local_count = yield local_folder.get_email_count_async(
ImapDB.Folder.ListFlags.INCLUDE_MARKED_FOR_REMOVE, null);
local_position = remote_position.value - (reported_remote_count + 1 - local_count);
// zero or negative means the message exists beyond the local vector's range, so
// nothing to do there
if (local_position > 0) {
debug("%s do_replay_removed_message: local_count=%d local_position=%s", to_string(),
local_count, local_position.to_string());
owned_id = yield local_folder.get_id_at_async(local_position, null);
} else {
debug("%s do_replay_removed_message: message not stored locally (local_count=%d local_position=%s)",
to_string(), local_count, local_position.to_string());
}
} catch (Error err) {
debug("%s do_replay_removed_message: unable to determine ID of removed message %s: %s",
to_string(), remote_position.to_string(), err.message);
}
bool marked = false;
if (owned_id != null) {
debug("%s do_replay_removed_message: detaching from local store Email ID %s", to_string(),
owned_id.to_string());
try {
// Reflect change in the local store and notify subscribers
yield local_folder.detach_single_email_async(owned_id, out marked, null);
} catch (Error err) {
debug("%s do_replay_removed_message: unable to remove message #%s: %s", to_string(),
remote_position.to_string(), err.message);
}
// Notify queued replay operations that the email has been removed (by EmailIdentifier)
replay_queue.notify_remote_removed_ids(
Geary.iterate<ImapDB.EmailIdentifier>(owned_id).to_array_list());
} else {
debug("%s do_replay_removed_message: remote_position=%ld unknown in local store "
+ "(reported_remote_count=%d local_position=%ld local_count=%d)",
to_string(), remote_position.value, reported_remote_count, local_position, local_count);
}
// for debugging
int new_local_count = -1;
try {
new_local_count = yield local_folder.get_email_count_async(
ImapDB.Folder.ListFlags.INCLUDE_MARKED_FOR_REMOVE, null);
} catch (Error err) {
debug("%s do_replay_removed_message: error fetching new local count: %s", to_string(),
err.message);
}
// as with on_remote_appended(), only update in local store inside a queue operation, to
// ensure serial commits
try {
yield local_folder.update_remote_selected_message_count(reported_remote_count, null);
} catch (Error err) {
debug("%s do_replay_removed_message: unable to save removed remote count: %s", to_string(),
err.message);
}
// notify of change ... use "marked-email-removed" for marked email to allow internal code
// to be notified when a removed email is "really" removed
if (owned_id != null) {
Gee.List<EmailIdentifier> removed = Geary.iterate<Geary.EmailIdentifier>(owned_id).to_array_list();
if (!marked)
notify_email_removed(removed);
else
marked_email_removed(removed);
}
if (!marked)
notify_email_count_changed(reported_remote_count, CountChangeReason.REMOVED);
debug("%s do_replay_remove_message: completed, current remote_count=%d "
+ "(reported_remote_count=%d local_count=%d starting local_count=%d remote_position=%ld local_position=%ld marked=%s)",
to_string(), remote_count, reported_remote_count, new_local_count, local_count, remote_position.value,
local_position, marked.to_string());
}
private void on_remote_disconnected(Imap.ClientSession.DisconnectReason reason) {
debug("on_remote_disconnected: reason=%s", reason.to_string());
@ -1398,16 +1277,20 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
yield mark.wait_for_ready_async(cancellable);
}
public virtual async void copy_email_async(Gee.List<Geary.EmailIdentifier> to_copy,
Geary.FolderPath destination, Cancellable? cancellable = null) throws Error {
Geary.FolderPath destination,
Cancellable? cancellable = null)
throws Error {
Geary.Folder target = yield this._account.fetch_folder_async(destination);
yield copy_email_uids_async(to_copy, destination, cancellable);
this._account.update_folder(target);
}
/**
* Returns the destination folder's UIDs for the copied messages.
*/
public async Gee.Set<Imap.UID>? copy_email_uids_async(Gee.List<Geary.EmailIdentifier> to_copy,
protected async Gee.Set<Imap.UID>? copy_email_uids_async(Gee.List<Geary.EmailIdentifier> to_copy,
Geary.FolderPath destination, Cancellable? cancellable = null) throws Error {
check_open("copy_email_uids_async");
check_ids("copy_email_uids_async", to_copy);
@ -1441,10 +1324,13 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
if (prepare.prepared_for_move == null || prepare.prepared_for_move.size == 0)
return null;
return new RevokableMove(_account, this, destination, prepare.prepared_for_move);
Geary.Folder target = yield this._account.fetch_folder_async(destination);
return new RevokableMove(
_account, this, target, prepare.prepared_for_move
);
}
public void schedule_op(ReplayOperation op) throws Error {
check_open("schedule_op");
@ -1455,14 +1341,27 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
schedule_op(op);
yield op.wait_for_ready_async(cancellable);
}
private void on_email_flags_changed(Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> changed) {
notify_email_flags_changed(changed);
public override string to_string() {
return "%s (open_count=%d remote_opened=%s)".printf(base.to_string(), open_count,
remote_opened.to_string());
}
/**
* Schedules a refresh of the unseen count for the folder.
*
* This will only refresh folders that are not open, since if they
* are open or opening, they will already be updated. Hence it is safe to be called on closed folders.
*/
internal void refresh_unseen() {
if (this.open_count == 0) {
this.refresh_unseen_timer.start();
}
}
// TODO: A proper public search mechanism; note that this always round-trips to the remote,
// doesn't go through the replay queue, and doesn't deal with messages marked for deletion
internal async Geary.EmailIdentifier? find_earliest_email_async(DateTime datetime,
internal async Geary.Email? find_earliest_email_async(DateTime datetime,
Geary.EmailIdentifier? before_id, Cancellable? cancellable) throws Error {
check_open("find_earliest_email_async");
if (before_id != null)
@ -1493,19 +1392,20 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
replay_queue.schedule(op);
yield op.wait_for_ready_async(cancellable);
// find earliest ID; because all Email comes from Folder, UID should always be present
Geary.Email? earliest = null;
ImapDB.EmailIdentifier? earliest_id = null;
foreach (Geary.Email email in op.accumulator) {
ImapDB.EmailIdentifier email_id = (ImapDB.EmailIdentifier) email.id;
if (earliest_id == null || email_id.uid.compare_to(earliest_id.uid) < 0)
if (earliest_id == null || email_id.uid.compare_to(earliest_id.uid) < 0) {
earliest = email;
earliest_id = email_id;
}
}
return earliest_id;
return earliest;
}
protected async Geary.EmailIdentifier? create_email_async(RFC822.Message rfc822,
Geary.EmailFlags? flags, DateTime? date_received, Geary.EmailIdentifier? id,
Cancellable? cancellable = null) throws Error {
@ -1542,13 +1442,95 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
// safely back out.
if (cancellable != null && cancellable.is_cancelled() && ret != null && remove_folder != null)
yield remove_folder.remove_email_async(iterate<EmailIdentifier>(ret).to_array_list());
this._account.update_folder(this);
return ret;
}
public override string to_string() {
return "%s (open_count=%d remote_opened=%s)".printf(base.to_string(), open_count,
remote_opened.to_string());
/** Fires a {@link report_problem}} signal for a service for this folder. */
protected virtual void notify_service_problem(ProblemType type, Service service_type, Error? err) {
report_problem(new ServiceProblemReport(
type, this._account.information, service_type, err
));
}
/** Fires a {@link marked_email_removed}} signal for this folder. */
protected virtual void notify_marked_email_removed(Gee.Collection<Geary.EmailIdentifier> removed) {
marked_email_removed(removed);
}
/**
* Checks for changes to {@link EmailFlags} after a folder opens.
*/
private async void on_update_flags() throws Error {
// Update this to use CHANGEDSINCE FETCH when available, when
// we support IMAP CONDSTORE (Bug 713117).
int chunk_size = FLAG_UPDATE_START_CHUNK;
Geary.EmailIdentifier? lowest = null;
while (!this.open_cancellable.is_cancelled() && this.remote.is_ready) {
Gee.List<Geary.Email>? list_local = yield list_email_by_id_async(
lowest, chunk_size,
Geary.Email.Field.FLAGS,
Geary.Folder.ListFlags.LOCAL_ONLY,
this.open_cancellable
);
if (list_local == null || list_local.is_empty)
break;
// find the lowest for the next iteration
lowest = Geary.EmailIdentifier.sort_emails(list_local).first().id;
// Get all email identifiers in the local folder mapped to their EmailFlags
Gee.HashMap<Geary.EmailIdentifier, Geary.EmailFlags> local_map =
new Gee.HashMap<Geary.EmailIdentifier, Geary.EmailFlags>();
foreach (Geary.Email e in list_local)
local_map.set(e.id, e.email_flags);
// Fetch e-mail from folder using force update, which will cause the cache to be bypassed
// and the latest to be gotten from the server (updating the cache in the process)
debug("%s: fetching %d flags", this.to_string(), local_map.keys.size);
Gee.List<Geary.Email>? list_remote = yield list_email_by_sparse_id_async(
local_map.keys,
Email.Field.FLAGS,
Geary.Folder.ListFlags.FORCE_UPDATE,
this.open_cancellable
);
if (list_remote == null || list_remote.is_empty)
break;
// Build map of emails that have changed.
Gee.HashMap<Geary.EmailIdentifier, Geary.EmailFlags> changed_map =
new Gee.HashMap<Geary.EmailIdentifier, Geary.EmailFlags>();
foreach (Geary.Email e in list_remote) {
if (!local_map.has_key(e.id))
continue;
if (!local_map.get(e.id).equal_to(e.email_flags))
changed_map.set(e.id, e.email_flags);
}
if (!this.open_cancellable.is_cancelled() && changed_map.size > 0)
notify_email_flags_changed(changed_map);
chunk_size *= 2;
if (chunk_size > FLAG_UPDATE_MAX_CHUNK) {
chunk_size = FLAG_UPDATE_MAX_CHUNK;
}
}
}
private void on_refresh_unseen() {
// We queue an account operation since the folder itself is
// closed and hence does not have a connection to use for it.
RefreshFolderUnseen op = new RefreshFolderUnseen(
this, this._account, this.remote, this.local
);
try {
this._account.queue_operation(op);
} catch (Error err) {
// oh well
}
}
private void on_remote_ready() {
@ -1556,4 +1538,5 @@ private class Geary.ImapEngine.MinimalFolder : Geary.Folder, Geary.FolderSupport
start_open_remote();
}
}
}

View file

@ -4,6 +4,13 @@
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* Interleaves IMAP operations to maintain consistent sequence numbering.
*
* The replay queue manages and executes operations originating both
* locally and from the server for a specific IMAP mailbox so as to
* ensure the execution of the operations maintains consistent.
*/
private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
// this value is high because delays between back-to-back unsolicited notifications have been
// see as high as 250ms
@ -56,8 +63,10 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
} }
private weak MinimalFolder owner;
private Nonblocking.Mailbox<ReplayOperation> local_queue = new Nonblocking.Mailbox<ReplayOperation>();
private Nonblocking.Mailbox<ReplayOperation> remote_queue = new Nonblocking.Mailbox<ReplayOperation>();
private Nonblocking.Queue<ReplayOperation> local_queue =
new Nonblocking.Queue<ReplayOperation>.fifo();
private Nonblocking.Queue<ReplayOperation> remote_queue =
new Nonblocking.Queue<ReplayOperation>.fifo();
private ReplayOperation? local_op_active = null;
private ReplayOperation? remote_op_active = null;
private Gee.ArrayList<ReplayOperation> notification_queue = new Gee.ArrayList<ReplayOperation>();
@ -363,7 +372,7 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
while (queue_running) {
ReplayOperation op;
try {
op = yield local_queue.recv_async();
op = yield local_queue.receive();
} catch (Error recv_err) {
debug("Unable to receive next replay operation on local queue %s: %s", to_string(),
recv_err.message);
@ -463,7 +472,7 @@ private class Geary.ImapEngine.ReplayQueue : Geary.BaseObject {
// wait for the next operation ... do this *before* waiting for remote
ReplayOperation op;
try {
op = yield remote_queue.recv_async();
op = yield remote_queue.receive();
} catch (Error recv_err) {
debug("Unable to receive next replay operation on remote queue %s: %s", to_string(),
recv_err.message);

View file

@ -42,6 +42,9 @@ private class Geary.ImapEngine.RevokableCommittedMove : Revokable {
}
notify_revoked();
Geary.Folder target = yield this.account.fetch_folder_async(this.destination);
this.account.update_folder(target);
} finally {
if (detached_destination != null) {
try {

View file

@ -14,14 +14,14 @@
*/
private class Geary.ImapEngine.RevokableMove : Revokable {
private const int COMMIT_TIMEOUT_SEC = 60;
private const int COMMIT_TIMEOUT_SEC = 5;
private GenericAccount account;
private ImapEngine.MinimalFolder source;
private FolderPath destination;
private MinimalFolder source;
private Geary.Folder destination;
private Gee.Set<ImapDB.EmailIdentifier> move_ids;
public RevokableMove(GenericAccount account, ImapEngine.MinimalFolder source, FolderPath destination,
public RevokableMove(GenericAccount account, MinimalFolder source, Geary.Folder destination,
Gee.Set<ImapDB.EmailIdentifier> move_ids) {
base (COMMIT_TIMEOUT_SEC);
@ -48,7 +48,7 @@ private class Geary.ImapEngine.RevokableMove : Revokable {
source.path.to_string(), destination.to_string());
try {
source.schedule_op(new MoveEmailCommit(source, move_ids, destination, null));
source.schedule_op(new MoveEmailCommit(source, move_ids, destination.path, null));
} catch (Error err) {
debug("Move from %s to %s failed: %s", source.path.to_string(), destination.to_string(),
err.message);
@ -58,26 +58,34 @@ private class Geary.ImapEngine.RevokableMove : Revokable {
source.path.to_string(), source.get_open_state().to_string());
}
}
protected override async void internal_revoke_async(Cancellable? cancellable) throws Error {
try {
yield source.exec_op_async(new MoveEmailRevoke(source, move_ids, cancellable),
cancellable);
MoveEmailRevoke op = new MoveEmailRevoke(
source, move_ids, cancellable
);
yield source.exec_op_async(op, cancellable);
// valid must still be true before firing
notify_revoked();
yield op.wait_for_ready_async(cancellable);
this.account.update_folder(this.destination);
} finally {
set_invalid();
}
}
protected override async void internal_commit_async(Cancellable? cancellable) throws Error {
try {
MoveEmailCommit op = new MoveEmailCommit(source, move_ids, destination, cancellable);
MoveEmailCommit op = new MoveEmailCommit(source, move_ids, destination.path, cancellable);
yield source.exec_op_async(op, cancellable);
// valid must still be true before firing
notify_committed(new RevokableCommittedMove(account, source.path, destination, op.destination_uids));
notify_committed(new RevokableCommittedMove(account, source.path, destination.path, op.destination_uids));
yield op.wait_for_ready_async(cancellable);
this.account.update_folder(this.destination);
} finally {
set_invalid();
}
@ -87,7 +95,7 @@ private class Geary.ImapEngine.RevokableMove : Revokable {
// look for either of the folders going away
if (unavailable != null) {
foreach (Folder folder in unavailable) {
if (folder.path.equal_to(source.path) || folder.path.equal_to(destination)) {
if (folder.path.equal_to(source.path) || folder.path.equal_to(destination.path)) {
set_invalid();
break;
@ -107,13 +115,23 @@ private class Geary.ImapEngine.RevokableMove : Revokable {
if (move_ids.size <= 0)
set_invalid();
}
private void on_source_closing(Gee.List<ReplayOperation> final_ops) {
if (!valid)
return;
final_ops.add(new MoveEmailCommit(source, move_ids, destination, null));
MoveEmailCommit op = new MoveEmailCommit(
source, move_ids, destination.path, null
);
final_ops.add(op);
set_invalid();
op.wait_for_ready_async.begin(null, (obj, res) => {
try {
op.wait_for_ready_async.end(res);
this.account.update_folder(this.destination);
} catch (Error err) {
// Oh well
}
});
}
}

View file

@ -4,21 +4,25 @@
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* A replay operation for a user-initiated operation.
*/
private abstract class Geary.ImapEngine.SendReplayOperation : Geary.ImapEngine.ReplayOperation {
protected SendReplayOperation(string name, ReplayOperation.OnError on_remote_error = OnError.THROW) {
base (name, ReplayOperation.Scope.LOCAL_AND_REMOTE, on_remote_error);
}
protected SendReplayOperation.only_local(string name, ReplayOperation.OnError on_remote_error = OnError.THROW) {
base (name, ReplayOperation.Scope.LOCAL_ONLY, on_remote_error);
}
protected SendReplayOperation.only_remote(string name, ReplayOperation.OnError on_remote_error = OnError.THROW) {
base (name, ReplayOperation.Scope.REMOTE_ONLY, on_remote_error);
}
public override void notify_remote_removed_position(Imap.SequenceNumber removed) {
// we've worked very hard to keep positional addressing out of the SendReplayOperations
}
}
}

View file

@ -10,15 +10,14 @@ private class Geary.ImapEngine.OtherAccount : Geary.ImapEngine.GenericAccount {
base (name, account_information, remote, local);
}
protected override MinimalFolder new_folder(Geary.FolderPath path, Imap.Account remote_account,
ImapDB.Account local_account, ImapDB.Folder local_folder) {
protected override MinimalFolder new_folder(ImapDB.Folder local_folder) {
Geary.FolderPath path = local_folder.get_path();
SpecialFolderType type;
if (Imap.MailboxSpecifier.folder_path_is_inbox(path))
type = SpecialFolderType.INBOX;
else
type = local_folder.get_properties().attrs.get_special_folder_type();
return new OtherFolder(this, remote_account, local_account, local_folder, type);
return new OtherFolder(this, this.remote, this.local, local_folder, type);
}
}

View file

@ -35,20 +35,20 @@ private class Geary.ImapEngine.OutlookAccount : Geary.ImapEngine.GenericAccount
base (name, account_information, remote, local);
}
protected override MinimalFolder new_folder(Geary.FolderPath path, Imap.Account remote_account,
ImapDB.Account local_account, ImapDB.Folder local_folder) {
protected override MinimalFolder new_folder(ImapDB.Folder local_folder) {
// use the Folder's attributes to determine if it's a special folder type, unless it's
// INBOX; that's determined by name
Geary.FolderPath path = local_folder.get_path();
SpecialFolderType special_folder_type;
if (Imap.MailboxSpecifier.folder_path_is_inbox(path))
special_folder_type = SpecialFolderType.INBOX;
else
special_folder_type = local_folder.get_properties().attrs.get_special_folder_type();
if (special_folder_type == Geary.SpecialFolderType.DRAFTS)
return new OutlookDraftsFolder(this, remote_account, local_account, local_folder, special_folder_type);
return new OutlookFolder(this, remote_account, local_account, local_folder, special_folder_type);
}
}
if (special_folder_type == Geary.SpecialFolderType.DRAFTS)
return new OutlookDraftsFolder(this, this.remote, this.local, local_folder, special_folder_type);
return new OutlookFolder(this, this.remote, this.local, local_folder, special_folder_type);
}
}

View file

@ -5,10 +5,16 @@
*/
private class Geary.ImapEngine.ReplayAppend : Geary.ImapEngine.ReplayOperation {
private MinimalFolder owner;
private int remote_count;
private Gee.List<Imap.SequenceNumber> positions;
public signal void email_appended(Gee.Collection<Geary.EmailIdentifier> ids);
public signal void email_locally_appended(Gee.Collection<Geary.EmailIdentifier> ids);
public signal void email_count_changed(int count, Folder.CountChangeReason reason);
public ReplayAppend(MinimalFolder owner, int remote_count, Gee.List<Imap.SequenceNumber> positions) {
// IGNORE remote errors because the reconnect will re-normalize the folder, making this
// append moot
@ -51,16 +57,90 @@ private class Geary.ImapEngine.ReplayAppend : Geary.ImapEngine.ReplayOperation {
public override async void backout_local_async() throws Error {
}
public override async ReplayOperation.Status replay_remote_async() {
if (positions.size > 0)
yield owner.do_replay_appended_messages(remote_count, positions);
if (this.positions.size > 0)
yield do_replay_appended_messages();
return ReplayOperation.Status.COMPLETED;
}
public override string describe_state() {
return "remote_count=%d positions.size=%d".printf(remote_count, positions.size);
}
}
// Need to prefetch at least an EmailIdentifier (and duplicate detection fields) to create a
// normalized placeholder in the local database of the message, so all positions are
// properly relative to the end of the message list; once this is done, notify user of new
// messages. If duplicates, create_email_async() will fall through to an updated merge,
// which is exactly what we want.
private async void do_replay_appended_messages() {
StringBuilder positions_builder = new StringBuilder("( ");
foreach (Imap.SequenceNumber remote_position in this.positions)
positions_builder.append_printf("%s ", remote_position.to_string());
positions_builder.append(")");
debug("%s do_replay_appended_message: current remote_count=%d this.remote_count=%d this.positions=%s",
to_string(), remote_count, this.remote_count, positions_builder.str);
Gee.HashSet<Geary.EmailIdentifier> created = new Gee.HashSet<Geary.EmailIdentifier>();
Gee.HashSet<Geary.EmailIdentifier> appended = new Gee.HashSet<Geary.EmailIdentifier>();
try {
Gee.List<Imap.MessageSet> msg_sets = Imap.MessageSet.sparse(this.positions);
foreach (Imap.MessageSet msg_set in msg_sets) {
Gee.List<Geary.Email>? list = yield this.owner.remote_folder.list_email_async(msg_set,
ImapDB.Folder.REQUIRED_FIELDS, null);
if (list != null && list.size > 0) {
debug("%s do_replay_appended_message: %d new messages in %s", to_string(),
list.size, msg_set.to_string());
// need to report both if it was created (not known before) and appended (which
// could mean created or simply a known email associated with this folder)
Gee.Map<Geary.Email, bool> created_or_merged =
yield this.owner.local_folder.create_or_merge_email_async(list, null);
foreach (Geary.Email email in created_or_merged.keys) {
// true means created
if (created_or_merged.get(email)) {
debug("%s do_replay_appended_message: appended email ID %s added",
to_string(), email.id.to_string());
created.add(email.id);
} else {
debug("%s do_replay_appended_message: appended email ID %s associated",
to_string(), email.id.to_string());
}
appended.add(email.id);
}
} else {
debug("%s do_replay_appended_message: no new messages in %s", to_string(),
msg_set.to_string());
}
}
} catch (Error err) {
debug("%s do_replay_appended_message: Unable to process: %s",
to_string(), err.message);
}
// store the reported count, *not* the current count (which is updated outside the of
// the queue) to ensure that updates happen serially and reflect committed local changes
try {
yield this.owner.local_folder.update_remote_selected_message_count(this.remote_count, null);
} catch (Error err) {
debug("%s do_replay_appended_message: Unable to save appended remote count %d: %s",
to_string(), this.remote_count, err.message);
}
if (appended.size > 0)
email_appended(appended);
if (created.size > 0)
email_locally_appended(created);
email_count_changed(this.remote_count, Folder.CountChangeReason.APPENDED);
debug("%s do_replay_appended_message: completed, current remote_count=%d this.remote_count=%d",
to_string(), remote_count, this.remote_count);
}
}

View file

@ -5,10 +5,16 @@
*/
private class Geary.ImapEngine.ReplayRemoval : Geary.ImapEngine.ReplayOperation {
private MinimalFolder owner;
private int remote_count;
private Imap.SequenceNumber position;
public signal void email_removed(Gee.Collection<Geary.EmailIdentifier> ids);
public signal void marked_email_removed(Gee.Collection<Geary.EmailIdentifier> ids);
public signal void email_count_changed(int count, Folder.CountChangeReason reason);
public ReplayRemoval(MinimalFolder owner, int remote_count, Imap.SequenceNumber position) {
// remote error will cause folder to reconnect and re-normalize, making this remove moot
base ("Removal", Scope.LOCAL_AND_REMOTE, OnError.IGNORE);
@ -39,15 +45,117 @@ private class Geary.ImapEngine.ReplayRemoval : Geary.ImapEngine.ReplayOperation
public override async void backout_local_async() throws Error {
}
public override async ReplayOperation.Status replay_remote_async() throws Error {
yield owner.do_replay_removed_message(remote_count, position);
debug("%s: ReplayRemoval current remote_count=%d this.position=%s reported_remote_count=%d",
this.owner.to_string(), this.owner.remote_count,
this.position.value.to_string(), this.remote_count);
if (this.position.is_valid()) {
yield do_replay_removed_message();
} else {
debug("%s do_replay_removed_message: ignoring, invalid remote position or count",
to_string());
}
return ReplayOperation.Status.COMPLETED;
}
public override string describe_state() {
return "position=%s".printf(position.to_string());
}
}
private async void do_replay_removed_message() {
int local_count = -1;
int64 local_position = -1;
ImapDB.EmailIdentifier? owned_id = null;
try {
// need total count, including those marked for removal,
// to accurately calculate position from server's point of
// view, not client's. The extra 1 taken off is due to the
// remote count already being decremented in MinimalFolder
// when this op was queued.
local_count = yield this.owner.local_folder.get_email_count_async(
ImapDB.Folder.ListFlags.INCLUDE_MARKED_FOR_REMOVE, null);
local_position = this.position.value - (this.remote_count + 1 - local_count);
// zero or negative means the message exists beyond the local vector's range, so
// nothing to do there
if (local_position > 0) {
debug("%s do_replay_removed_message: local_count=%d local_position=%s", to_string(),
local_count, local_position.to_string());
owned_id = yield this.owner.local_folder.get_id_at_async(local_position, null);
} else {
debug("%s do_replay_removed_message: message not stored locally (local_count=%d local_position=%s)",
to_string(), local_count, local_position.to_string());
}
} catch (Error err) {
debug("%s do_replay_removed_message: unable to determine ID of removed message %s: %s",
to_string(), this.position.to_string(), err.message);
}
bool marked = false;
if (owned_id != null) {
debug("%s do_replay_removed_message: detaching from local store Email ID %s", to_string(),
owned_id.to_string());
try {
// Reflect change in the local store and notify subscribers
yield this.owner.local_folder.detach_single_email_async(owned_id, out marked, null);
} catch (Error err) {
debug("%s do_replay_removed_message: unable to remove message #%s: %s", to_string(),
this.position.to_string(), err.message);
}
// Notify queued replay operations that the email has been removed (by EmailIdentifier)
this.owner.replay_queue.notify_remote_removed_ids(
Geary.iterate<ImapDB.EmailIdentifier>(owned_id).to_array_list());
} else {
debug("%s do_replay_removed_message: this.position=%ld unknown in local store "
+ "(this.remote_count=%d local_position=%ld local_count=%d)",
to_string(), this.position.value, this.remote_count, local_position, local_count);
}
// for debugging
int new_local_count = -1;
try {
new_local_count = yield this.owner.local_folder.get_email_count_async(
ImapDB.Folder.ListFlags.INCLUDE_MARKED_FOR_REMOVE, null);
} catch (Error err) {
debug("%s do_replay_removed_message: error fetching new local count: %s", to_string(),
err.message);
}
// as with on_remote_appended(), only update in local store inside a queue operation, to
// ensure serial commits
try {
yield this.owner.local_folder.update_remote_selected_message_count(this.remote_count, null);
} catch (Error err) {
debug("%s do_replay_removed_message: unable to save removed remote count: %s", to_string(),
err.message);
}
// notify of change ... use "marked-email-removed" for marked email to allow internal code
// to be notified when a removed email is "really" removed
if (owned_id != null) {
Gee.List<EmailIdentifier> removed = Geary.iterate<Geary.EmailIdentifier>(owned_id).to_array_list();
if (!marked)
email_removed(removed);
else
marked_email_removed(removed);
}
if (!marked) {
this.owner.replay_notify_email_count_changed(
this.remote_count, Folder.CountChangeReason.REMOVED
);
}
debug("%s ReplayRemoval: completed, current remote_count=%d "
+ "(this.remote_count=%d local_count=%d starting local_count=%d this.position=%ld local_position=%ld marked=%s)",
this.owner.to_string(), this.owner.remote_count,
this.remote_count, new_local_count, local_count,
this.position.value, local_position, marked.to_string());
}
}

View file

@ -0,0 +1,96 @@
/*
* Copyright 2016 Software Freedom Conservancy Inc.
* Copyright 2017 Michael Gratton <mike@vee.net>
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* Updates an existing message in the local store after un unsolicited FETCH
*/
private class Geary.ImapEngine.ReplayUpdate : Geary.ImapEngine.ReplayOperation {
private MinimalFolder owner;
private int remote_count;
private Imap.SequenceNumber position;
private Imap.FetchedData data;
public ReplayUpdate(MinimalFolder owner,
int remote_count,
Imap.SequenceNumber position,
Imap.FetchedData data) {
base ("Update", Scope.LOCAL_ONLY, OnError.RETRY);
this.owner = owner;
this.remote_count = remote_count;
this.position = position;
this.data = data;
}
public override void notify_remote_removed_position(Imap.SequenceNumber removed) {
}
public override void notify_remote_removed_ids(Gee.Collection<ImapDB.EmailIdentifier> ids) {
}
public override void get_ids_to_be_remote_removed(Gee.Collection<ImapDB.EmailIdentifier> ids) {
}
public override async ReplayOperation.Status replay_local_async()
throws Error {
Imap.MessageFlags? message_flags =
this.data.data_map.get(Imap.FetchDataSpecifier.FLAGS) as Imap.MessageFlags;
if (message_flags != null) {
int local_count = -1;
int64 local_position = -1;
// need total count, including those marked for removal, to accurately calculate position
// from server's point of view, not client's
local_count = yield this.owner.local_folder.get_email_count_async(
ImapDB.Folder.ListFlags.INCLUDE_MARKED_FOR_REMOVE, null);
local_position = this.position.value - (this.remote_count - local_count);
ImapDB.EmailIdentifier? id = null;
if (local_position > 0) {
id = yield this.owner.local_folder.get_id_at_async(
local_position, null
);
}
if (id != null) {
Gee.Map<Geary.ImapDB.EmailIdentifier, Geary.EmailFlags> changed_map =
new Gee.HashMap<Geary.ImapDB.EmailIdentifier, Geary.EmailFlags>();
changed_map.set(id, new Imap.EmailFlags(message_flags));
yield this.owner.local_folder.set_email_flags_async(changed_map, null);
this.owner.replay_notify_email_flags_changed(changed_map);
} else {
debug("%s replay_local_async id is null!", to_string());
}
} else {
debug("%s Don't know what to do without any FLAGS: %s",
to_string(), this.data.to_string());
}
return ReplayOperation.Status.COMPLETED;
}
public override async void backout_local_async() throws Error {
}
public override async ReplayOperation.Status replay_remote_async() {
return ReplayOperation.Status.CONTINUE;
}
public override string describe_state() {
Imap.MessageData fetch_flags =
this.data.data_map.get(Imap.FetchDataSpecifier.FLAGS);
return "position.value=%lld, flags=%s".printf(
this.position.value, fetch_flags.to_string()
);
}
}

View file

@ -37,13 +37,11 @@ private class Geary.ImapEngine.YahooAccount : Geary.ImapEngine.GenericAccount {
special_map.set(new Imap.FolderRoot("Trash"), Geary.SpecialFolderType.TRASH);
}
}
protected override MinimalFolder new_folder(Geary.FolderPath path, Imap.Account remote_account,
ImapDB.Account local_account, ImapDB.Folder local_folder) {
protected override MinimalFolder new_folder(ImapDB.Folder local_folder) {
Geary.FolderPath path = local_folder.get_path();
SpecialFolderType special_folder_type = special_map.has_key(path) ? special_map.get(path)
: Geary.SpecialFolderType.NONE;
return new YahooFolder(this, remote_account, local_account, local_folder,
special_folder_type);
return new YahooFolder(this, this.remote, this.local, local_folder, special_folder_type);
}
}

View file

@ -28,7 +28,7 @@
* When new STATUS information comes in, this object's status_messages, unseen, recent, and attrs
* fields are updated.
*
* When a SELECT/EXAMINE occurs on this folder, this object's select_examine_messages, unseen,
* When a SELECT/EXAMINE occurs on this folder, this object's select_examine_messages,
* recent, uid_validity, and uid_next are updated.
*
* Over time, this object accumulates information depending on what operation was last

View file

@ -41,41 +41,45 @@ private class Geary.Imap.Folder : BaseObject {
private ClientSessionManager session_mgr;
private ClientSession? session = null;
private Nonblocking.Mutex cmd_mutex = new Nonblocking.Mutex();
private Gee.HashMap<SequenceNumber, FetchedData> fetch_accumulator = new Gee.HashMap<
SequenceNumber, FetchedData>();
private Gee.Set<Imap.UID> search_accumulator = new Gee.HashSet<Imap.UID>();
private Gee.HashMap<SequenceNumber, FetchedData>? fetch_accumulator = null;
private Gee.Set<Imap.UID>? search_accumulator = null;
/**
* A (potentially unsolicited) response from the server.
*
* See [[http://tools.ietf.org/html/rfc3501#section-7.3.1]]
*/
public signal void exists(int total);
/**
* A (potentially unsolicited) response from the server.
*
* See [[http://tools.ietf.org/html/rfc3501#section-7.4.1]]
*/
public signal void expunge(SequenceNumber position);
/**
* A (potentially unsolicited) response from the server.
*
* See [[http://tools.ietf.org/html/rfc3501#section-7.3.2]]
*/
public signal void recent(int total);
/**
* A (potentially unsolicited) response from the server.
*
* See [[http://tools.ietf.org/html/rfc3501#section-7.4.1]]
*/
public signal void expunge(SequenceNumber position);
/**
* Fabricated from the IMAP signals and state obtained at open_async().
*/
public signal void appended(int total);
/**
* Fabricated from the IMAP signals and state obtained at open_async().
*/
public signal void updated(SequenceNumber pos, FetchedData data);
/**
* Fabricated from the IMAP signals and state obtained at open_async().
*/
public signal void removed(SequenceNumber pos, int total);
/**
* Note that close_async() still needs to be called after this signal is fired.
*/
@ -91,11 +95,9 @@ private class Geary.Imap.Folder : BaseObject {
public async void open_async(Cancellable? cancellable) throws Error {
if (is_open)
throw new EngineError.ALREADY_OPEN("%s already open", to_string());
fetch_accumulator.clear();
session = yield session_mgr.claim_authorized_session_async(cancellable);
// connect to interesting signals *before* selecting
session.exists.connect(on_exists);
session.expunge.connect(on_expunge);
@ -148,33 +150,32 @@ private class Geary.Imap.Folder : BaseObject {
public async void close_async(Cancellable? cancellable) throws Error {
if (!is_open)
return;
yield release_session_async(cancellable);
fetch_accumulator.clear();
readonly = Trillian.UNKNOWN;
accepts_user_flags = Trillian.UNKNOWN;
is_open = false;
this.fetch_accumulator = null;
this.search_accumulator = null;
this.readonly = Trillian.UNKNOWN;
this.accepts_user_flags = Trillian.UNKNOWN;
this.is_open = false;
}
private async void release_session_async(Cancellable? cancellable) {
if (session == null)
if (this.session == null)
return;
// set this.session to null before yielding to ClientSessionManager
ClientSession release_session = session;
session = null;
release_session.exists.disconnect(on_exists);
release_session.expunge.disconnect(on_expunge);
release_session.fetch.disconnect(on_fetch);
release_session.recent.disconnect(on_recent);
release_session.search.disconnect(on_search);
release_session.status_response_received.disconnect(on_status_response);
release_session.disconnected.disconnect(on_disconnected);
this.session.exists.disconnect(on_exists);
this.session.expunge.disconnect(on_expunge);
this.session.fetch.disconnect(on_fetch);
this.session.recent.disconnect(on_recent);
this.session.search.disconnect(on_search);
this.session.status_response_received.disconnect(on_status_response);
this.session.disconnected.disconnect(on_disconnected);
ClientSession release_session = this.session;
this.session = null;
try {
yield session_mgr.release_session_async(release_session, cancellable);
} catch (Error err) {
@ -209,14 +210,22 @@ private class Geary.Imap.Folder : BaseObject {
expunge(pos);
removed(pos, properties.select_examine_messages);
}
private void on_fetch(FetchedData fetched_data) {
private void on_fetch(FetchedData data) {
// add if not found, merge if already received data for this email
FetchedData? already_present = fetch_accumulator.get(fetched_data.seq_num);
fetch_accumulator.set(fetched_data.seq_num,
(already_present != null) ? fetched_data.combine(already_present) : fetched_data);
if (this.fetch_accumulator != null) {
FetchedData? existing = this.fetch_accumulator.get(data.seq_num);
this.fetch_accumulator.set(
data.seq_num, (existing != null) ? data.combine(existing) : data
);
} else {
debug("%s: FETCH (unsolicited): %s:",
to_string(),
data.to_string());
updated(data.seq_num, data);
}
}
private void on_recent(int total) {
debug("%s RECENT %d", to_string(), total);
@ -230,15 +239,19 @@ private class Geary.Imap.Folder : BaseObject {
private void on_search(int64[] seq_or_uid) {
// All SEARCH from this class are UID SEARCH, so can reliably convert and add to
// accumulator
foreach (int64 uid in seq_or_uid) {
try {
search_accumulator.add(new UID.checked(uid));
} catch (ImapError imaperr) {
debug("%s Unable to process SEARCH UID result: %s", to_string(), imaperr.message);
if (this.search_accumulator != null) {
foreach (int64 uid in seq_or_uid) {
try {
this.search_accumulator.add(new UID.checked(uid));
} catch (ImapError imaperr) {
debug("%s Unable to process SEARCH UID result: %s", to_string(), imaperr.message);
}
}
} else {
debug("%s Not handling unsolicited SEARCH response", to_string());
}
}
private void on_status_response(StatusResponse status_response) {
// only interested in ResponseCodes here
ResponseCode? response_code = status_response.response_code;
@ -304,51 +317,39 @@ private class Geary.Imap.Folder : BaseObject {
// FETCH commands can generate a FolderError.RETRY. State will be updated to accomodate retry,
// but all Commands must be regenerated to ensure new state is reflected in requests.
private async Gee.Map<Command, StatusResponse>? exec_commands_async(Gee.Collection<Command> cmds,
out Gee.HashMap<SequenceNumber, FetchedData>? fetched, out Gee.Set<Imap.UID>? search_results,
Cancellable? cancellable) throws Error {
int token = yield cmd_mutex.claim_async(cancellable);
Gee.HashMap<SequenceNumber, FetchedData>? fetch_results,
Gee.Set<Imap.UID>? search_results,
Cancellable? cancellable)
throws Error {
Gee.Map<Command, StatusResponse>? responses = null;
// execute commands with mutex locked
Error? err = null;
int token = yield cmd_mutex.claim_async(cancellable);
Error? thrown = null;
try {
// check open after acquiring mutex, so that if an error is thrown it's caught and
// mutex can be closed
check_open();
this.fetch_accumulator = fetch_results;
this.search_accumulator = search_results;
responses = yield session.send_multiple_commands_async(cmds, cancellable);
} catch (Error store_fetch_err) {
err = store_fetch_err;
} catch (Error err) {
thrown = err;
}
// swap out results and clear accumulators
if (fetch_accumulator.size > 0) {
fetched = fetch_accumulator;
fetch_accumulator = new Gee.HashMap<SequenceNumber, FetchedData>();
} else {
fetched = null;
}
if (search_accumulator.size > 0) {
search_results = search_accumulator;
search_accumulator = new Gee.HashSet<Imap.UID>();
} else {
search_results = null;
}
// unlock after clearing accumulators
this.fetch_accumulator = null;
this.search_accumulator = null;
cmd_mutex.release(ref token);
if (err != null)
throw err;
// process response stati after unlocking and clearing accumulators
assert(responses != null);
foreach (Command cmd in responses.keys)
if (thrown != null) {
throw thrown;
}
foreach (Command cmd in responses.keys) {
throw_on_failed_status(responses.get(cmd), cmd);
}
return responses;
}
// HACK: See https://bugzilla.gnome.org/show_bug.cgi?id=714902
//
// Detect when a server has returned a BAD response to FETCH BODY[HEADER.FIELDS (HEADER-LIST)]
@ -421,12 +422,16 @@ private class Geary.Imap.Folder : BaseObject {
// which is all we're interested in here
SearchCriteria criteria = new SearchCriteria(SearchCriterion.message_set(msg_set));
SearchCommand cmd = new SearchCommand.uid(criteria);
Gee.Set<Imap.UID>? search_results;
yield exec_commands_async(Geary.iterate<Command>(cmd).to_array_list(), null, out search_results,
cancellable);
return (search_results != null && search_results.size > 0) ? search_results : null;
Gee.Set<Imap.UID> search_results = new Gee.HashSet<Imap.UID>();
yield exec_commands_async(
Geary.iterate<Command>(cmd).to_array_list(),
null,
search_results,
cancellable
);
return (search_results.size > 0) ? search_results : null;
}
private Gee.Collection<FetchCommand> assemble_list_commands(Imap.MessageSet msg_set,
@ -520,8 +525,8 @@ private class Geary.Imap.Folder : BaseObject {
public async Gee.List<Geary.Email>? list_email_async(MessageSet msg_set, Geary.Email.Field fields,
Cancellable? cancellable) throws Error {
check_open();
Gee.HashMap<SequenceNumber, FetchedData>? fetched = null;
Gee.HashMap<SequenceNumber, FetchedData> fetched =
new Gee.HashMap<SequenceNumber, FetchedData>();
FetchBodyDataSpecifier? header_specifier = null;
FetchBodyDataSpecifier? body_specifier = null;
FetchBodyDataSpecifier? preview_specifier = null;
@ -537,7 +542,7 @@ private class Geary.Imap.Folder : BaseObject {
// Commands prepped, do the fetch and accumulate all the responses
try {
yield exec_commands_async(cmds, out fetched, null, cancellable);
yield exec_commands_async(cmds, fetched, null, cancellable);
} catch (Error err) {
if (err is FolderError.RETRY) {
debug("Retryable server failure detected for %s: %s", to_string(), err.message);
@ -550,10 +555,10 @@ private class Geary.Imap.Folder : BaseObject {
break;
}
if (fetched == null || fetched.size == 0)
if (fetched.size == 0)
return null;
// Convert fetched data into Geary.Email objects
// because this could be for a lot of email, do in a background thread
Gee.List<Geary.Email> email_list = new Gee.ArrayList<Geary.Email>();
@ -609,11 +614,12 @@ private class Geary.Imap.Folder : BaseObject {
Gee.List<Command> cmds = new Gee.ArrayList<Command>();
cmds.add(new FetchCommand.data_type(msg_set, FetchDataSpecifier.UID));
Gee.HashMap<SequenceNumber, FetchedData>? fetched;
yield exec_commands_async(cmds, out fetched, null, cancellable);
if (fetched == null || fetched.is_empty) {
Gee.HashMap<SequenceNumber, FetchedData> fetched =
new Gee.HashMap<SequenceNumber, FetchedData>();
yield exec_commands_async(cmds, fetched, null, cancellable);
if (fetched.is_empty) {
throw new ImapError.INVALID("Server returned no sequence numbers");
}
@ -626,7 +632,7 @@ private class Geary.Imap.Folder : BaseObject {
}
return map;
}
public async void remove_email_async(Gee.List<MessageSet> msg_sets, Cancellable? cancellable)
throws Error {
check_open();
@ -741,18 +747,18 @@ private class Geary.Imap.Folder : BaseObject {
// always perform a UID SEARCH
Gee.Collection<Command> cmds = new Gee.ArrayList<Command>();
cmds.add(new SearchCommand.uid(criteria));
Gee.Set<Imap.UID>? search_results;
yield exec_commands_async(cmds, null, out search_results, cancellable);
if (search_results == null || search_results.size == 0)
return null;
Gee.SortedSet<Imap.UID> tree = new Gee.TreeSet<Imap.UID>();
tree.add_all(search_results);
Gee.Set<Imap.UID> search_results = new Gee.HashSet<Imap.UID>();
yield exec_commands_async(cmds, null, search_results, cancellable);
Gee.SortedSet<Imap.UID> tree = null;
if (search_results.size > 0) {
tree = new Gee.TreeSet<Imap.UID>();
tree.add_all(search_results);
}
return tree;
}
// NOTE: If fields are added or removed from this method, BASIC_FETCH_FIELDS *must* be updated
// as well
private void fields_to_fetch_data_types(Geary.Email.Field fields,

View file

@ -1,109 +0,0 @@
/* Copyright 2016 Software Freedom Conservancy Inc.
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
public class Geary.Nonblocking.Mailbox<G> : BaseObject {
public int size { get { return queue.size; } }
public bool allow_duplicates { get; set; default = true; }
public bool requeue_duplicate { get; set; default = false; }
private bool _is_paused = false;
public bool is_paused {
get { return _is_paused; }
set {
// if no longer paused, wake up any waiting recipients
if (_is_paused && !value)
spinlock.blind_notify();
_is_paused = value;
}
}
private Gee.Queue<G> queue;
private Nonblocking.Spinlock spinlock = new Nonblocking.Spinlock();
public Mailbox(owned CompareDataFunc<G>? comparator = null) {
if (comparator == null && !typeof(G).is_a(typeof(Gee.Comparable)))
queue = new Gee.LinkedList<G>();
else
queue = new Gee.PriorityQueue<G>((owned) comparator);
}
public bool send(G msg) {
if (!allow_duplicates && queue.contains(msg)) {
if (requeue_duplicate)
queue.remove(msg);
else
return false;
}
if (!queue.offer(msg))
return false;
if (!is_paused)
spinlock.blind_notify();
return true;
}
/**
* Returns true if the message was revoked.
*/
public bool revoke(G msg) {
return queue.remove(msg);
}
/**
* Returns number of removed items.
*/
public int clear() {
int count = queue.size;
if (count != 0)
queue.clear();
return count;
}
/**
* Remove messages matching the given predicate. Return the removed messages.
*/
public Gee.Collection<G> revoke_matching(owned Gee.Predicate<G> predicate) {
Gee.ArrayList<G> removed = new Gee.ArrayList<G>();
// Iterate over a copy so we can modify the original.
foreach (G msg in queue.to_array()) {
if (predicate(msg)) {
queue.remove(msg);
removed.add(msg);
}
}
return removed;
}
public async G recv_async(Cancellable? cancellable = null) throws Error {
for (;;) {
if (queue.size > 0 && !is_paused)
return queue.poll();
yield spinlock.wait_async(cancellable);
}
}
/**
* Returns a read-only version of the mailbox queue that can be iterated in queue-order.
*
* Since the queue could potentially alter when the main loop runs, it's important to only
* examine the queue when not allowing other operations to process.
*
* Altering will not affect the actual queue. Use {@link revoke} to remove enqueued operations.
*/
public Gee.Collection<G> get_all() {
return queue.read_only_view;
}
}

View file

@ -0,0 +1,171 @@
/*
* Copyright 2016 Software Freedom Conservancy Inc.
* Copyright 2017 Michael Gratton <mike@vee.net>
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* An asynchronous queue, first-in first-out (FIFO) or priority.
*
* This class can be used to asynchronously wait for items to be added
* to the queue, the asynchronous call blocking until an item is
* ready.
*/
public class Geary.Nonblocking.Queue<G> : BaseObject {
/** Returns the number of items currently in the queue. */
public int size { get { return queue.size; } }
/**
* Determines if duplicate items can be added to the queue.
*
* If a priory queue, this applies to items of the same priority,
* otherwise uses the item's natural identity.
*/
public bool allow_duplicates { get; set; default = true; }
/**
* Determines if duplicate items will be added to the queue.
*
* If {@link allow_duplicates} is `true` and an item is already in
* the queue, this determines if it will be added again.
*/
public bool requeue_duplicate { get; set; default = false; }
/**
* Determines if the queue is currently running.
*/
public bool is_paused {
get { return _is_paused; }
set {
// if no longer paused, wake up any waiting recipients
if (_is_paused && !value)
spinlock.blind_notify();
_is_paused = value;
}
}
private bool _is_paused = false;
private Gee.Queue<G> queue;
private Nonblocking.Spinlock spinlock = new Nonblocking.Spinlock();
/**
* Constructs a new first-in first-out (FIFO) queue.
*
* If `equalator` is not null it will be used to determine the
* identity of objects in the queue, else the items' natural
* identity will be used.
*/
public Queue.fifo(owned Gee.EqualDataFunc<G>? equalator = null) {
this(new Gee.LinkedList<G>(equalator));
}
/**
* Constructs a new priority queue.
*
* If `comparator` is not null it will be used to determine the
* ordering of objects in the queue, else the items' natural
* ordering will be used.
*/
public Queue.priority(owned CompareDataFunc<G>? comparator = null) {
this(new Gee.PriorityQueue<G>(comparator));
}
/**
* Constructs a new queue.
*/
protected Queue(Gee.Queue<G> queue) {
this.queue = queue;
}
/**
* Adds an item to the queue.
*
* If the queue is a priority queue, it is added according to its
* relative priority, else it is added to the end.
*
* Returns `true` if the item was added to the queue.
*/
public bool send(G msg) {
if (!allow_duplicates && queue.contains(msg)) {
if (requeue_duplicate)
queue.remove(msg);
else
return false;
}
if (!queue.offer(msg))
return false;
if (!is_paused)
spinlock.blind_notify();
return true;
}
/**
* Retrieves the next item from the queue, blocking until available.
*
* If the queue is paused, this will continue to wait until
* unpaused and an item is ready. If `cancellable` is non-null,
* when used will cancel this call.
*/
public async G receive(Cancellable? cancellable = null) throws Error {
for (;;) {
if (queue.size > 0 && !is_paused)
return queue.poll();
yield spinlock.wait_async(cancellable);
}
}
/**
* Removes all items in queue, returning the number of removed items.
*/
public int clear() {
int count = queue.size;
if (count != 0)
queue.clear();
return count;
}
/**
* Removes an item from the queue, returning `true` if removed.
*/
public bool revoke(G msg) {
return queue.remove(msg);
}
/**
* Remove items matching the given predicate, returning those removed.
*/
public Gee.Collection<G> revoke_matching(owned Gee.Predicate<G> predicate) {
Gee.ArrayList<G> removed = new Gee.ArrayList<G>();
// Iterate over a copy so we can modify the original.
foreach (G msg in queue.to_array()) {
if (predicate(msg)) {
queue.remove(msg);
removed.add(msg);
}
}
return removed;
}
/**
* Returns a read-only version of the queue queue.
*
* Since the queue could potentially alter when the main loop
* runs, it's important to only examine the queue when not
* allowing other operations to process.
*/
public Gee.Collection<G> get_all() {
return queue.read_only_view;
}
}

View file

@ -6,6 +6,7 @@ set(TEST_ENGINE_SRC
test-engine.vala
testcase.vala # Based on same file in libgee, courtesy Julien Peeters
engine/api/geary-account-test.vala
engine/api/geary-attachment-test.vala
engine/api/geary-engine-test.vala
engine/api/geary-email-identifier-test.vala
@ -16,6 +17,7 @@ set(TEST_ENGINE_SRC
engine/imap/command/imap-create-command-test.vala
engine/imap/response/imap-namespace-response-test.vala
engine/imap/transport/imap-deserializer-test.vala
engine/imap-engine/account-processor-test.vala
engine/mime-content-type-test.vala
engine/rfc822-mailbox-address-test.vala
engine/rfc822-message-test.vala

View file

@ -0,0 +1,130 @@
/*
* Copyright 2017 Michael Gratton <mike@vee.net>
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
public class Geary.MockAccount : Account {
public class MockSearchQuery : SearchQuery {
internal MockSearchQuery() {
base("", SearchQuery.Strategy.EXACT);
}
}
public class MockContactStore : ContactStore {
internal MockContactStore() {
}
public override async void
mark_contacts_async(Gee.Collection<Contact> contacts,
ContactFlags? to_add,
ContactFlags? to_remove) throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
}
public MockAccount(string name, AccountInformation information) {
base(name, information);
}
public override async void open_async(Cancellable? cancellable = null) throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override async void close_async(Cancellable? cancellable = null) throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override bool is_open() {
return false;
}
public override async void rebuild_async(Cancellable? cancellable = null) throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override async void start_outgoing_client()
throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override async void start_incoming_client()
throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override Gee.Collection<Geary.Folder> list_matching_folders(Geary.FolderPath? parent)
throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override Gee.Collection<Geary.Folder> list_folders() throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override Geary.ContactStore get_contact_store() {
return new MockContactStore();
}
public override async bool folder_exists_async(Geary.FolderPath path, Cancellable? cancellable = null)
throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override async Geary.Folder fetch_folder_async(Geary.FolderPath path,
Cancellable? cancellable = null) throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override async Geary.Folder get_required_special_folder_async(Geary.SpecialFolderType special,
Cancellable? cancellable = null) throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override async void send_email_async(Geary.ComposedEmail composed, Cancellable? cancellable = null)
throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override async Gee.MultiMap<Geary.Email, Geary.FolderPath?>? local_search_message_id_async(
Geary.RFC822.MessageID message_id, Geary.Email.Field requested_fields, bool partial_ok,
Gee.Collection<Geary.FolderPath?>? folder_blacklist, Geary.EmailFlags? flag_blacklist,
Cancellable? cancellable = null) throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override async Geary.Email local_fetch_email_async(Geary.EmailIdentifier email_id,
Geary.Email.Field required_fields, Cancellable? cancellable = null) throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override Geary.SearchQuery open_search(string query, Geary.SearchQuery.Strategy strategy) {
return new MockSearchQuery();
}
public override async Gee.Collection<Geary.EmailIdentifier>? local_search_async(Geary.SearchQuery query,
int limit = 100, int offset = 0, Gee.Collection<Geary.FolderPath?>? folder_blacklist = null,
Gee.Collection<Geary.EmailIdentifier>? search_ids = null, Cancellable? cancellable = null) throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override async Gee.Set<string>? get_search_matches_async(Geary.SearchQuery query,
Gee.Collection<Geary.EmailIdentifier> ids, Cancellable? cancellable = null) throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
public override async Gee.MultiMap<EmailIdentifier, FolderPath>?
get_containing_folders_async(Gee.Collection<EmailIdentifier> ids,
Cancellable? cancellable) throws Error {
throw new EngineError.UNSUPPORTED("Mock method");
}
}

View file

@ -0,0 +1,175 @@
/*
* Copyright 2017 Michael Gratton <mike@vee.net>
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
errordomain AccountProcessorTestError {
TEST;
}
public class Geary.ImapEngine.AccountProcessorTest : Gee.TestCase {
public class TestOperation : AccountOperation {
public bool throw_error = false;
public bool wait_for_cancel = false;
public bool execute_called = false;
private Nonblocking.Spinlock spinlock = new Nonblocking.Spinlock();
internal TestOperation(Geary.Account account) {
base(account);
}
public override async void execute(Cancellable cancellable)
throws Error {
print("Test op/");
this.execute_called = true;
if (this.wait_for_cancel) {
yield this.spinlock.wait_async(cancellable);
}
if (this.throw_error) {
throw new AccountProcessorTestError.TEST("Failed");
}
}
}
public class OtherOperation : TestOperation {
internal OtherOperation(Geary.Account account) {
base(account);
}
}
private AccountProcessor? processor = null;
private Geary.Account? account = null;
private Geary.AccountInformation? info = null;
private uint succeeded;
private uint failed;
private uint completed;
public AccountProcessorTest() {
base("Geary.ImapEngine.AccountProcessorTest");
add_test("test_success", test_success);
add_test("test_failure", test_failure);
add_test("test_duplicate", test_duplicate);
add_test("test_stop", test_stop);
// XXX this has to be here instead of in set_up for some
// reason...
this.processor = new AccountProcessor("processor");
}
public override void set_up() {
this.info = new Geary.AccountInformation(
"test-info",
File.new_for_path("."),
File.new_for_path(".")
);
this.account = new Geary.MockAccount("test-account", this.info);
this.succeeded = 0;
this.failed = 0;
this.completed = 0;
}
public void test_success() {
TestOperation op = setup_operation(new TestOperation(this.account));
this.processor.enqueue(op);
assert(this.processor.waiting == 1);
execute_all();
assert(op.execute_called);
assert(this.succeeded == 1);
assert(this.failed == 0);
assert(this.completed == 1);
}
public void test_failure() {
TestOperation op = setup_operation(new TestOperation(this.account));
op.throw_error = true;
AccountOperation? error_op = null;
Error? error = null;
this.processor.operation_error.connect((proc, op, err) => {
error_op = op;
error = err;
});
this.processor.enqueue(op);
execute_all();
assert(this.succeeded == 0);
assert(this.failed == 1);
assert(this.completed == 1);
assert(error_op == op);
assert(error is AccountProcessorTestError.TEST);
}
public void test_duplicate() {
TestOperation op1 = setup_operation(new TestOperation(this.account));
TestOperation op2 = setup_operation(new TestOperation(this.account));
TestOperation op3 = setup_operation(new OtherOperation(this.account));
this.processor.enqueue(op1);
this.processor.enqueue(op2);
assert(this.processor.waiting == 1);
this.processor.enqueue(op3);
assert(this.processor.waiting == 2);
}
public void test_stop() {
TestOperation op1 = setup_operation(new TestOperation(this.account));
op1.wait_for_cancel = true;
TestOperation op2 = setup_operation(new OtherOperation(this.account));
this.processor.enqueue(op1);
this.processor.enqueue(op2);
while (!this.processor.is_executing) {
this.main_loop.iteration(true);
}
this.processor.stop();
while (this.main_loop.pending()) {
this.main_loop.iteration(true);
}
assert(!this.processor.is_executing);
assert(this.processor.waiting == 0);
assert(this.succeeded == 0);
assert(this.failed == 1);
assert(this.completed == 1);
}
private TestOperation setup_operation(TestOperation op) {
op.succeeded.connect(() => {
this.succeeded++;
});
op.failed.connect(() => {
this.failed++;
});
op.completed.connect(() => {
this.completed++;
});
return op;
}
private void execute_all() {
while (this.processor.is_executing || this.processor.waiting > 0) {
this.main_loop.iteration(true);
}
}
}

View file

@ -32,6 +32,7 @@ int main(string[] args) {
engine.add_suite(new Geary.Imap.DeserializerTest().get_suite());
engine.add_suite(new Geary.Imap.CreateCommandTest().get_suite());
engine.add_suite(new Geary.Imap.NamespaceResponseTest().get_suite());
engine.add_suite(new Geary.ImapEngine.AccountProcessorTest().get_suite());
engine.add_suite(new Geary.Inet.Test().get_suite());
engine.add_suite(new Geary.JS.Test().get_suite());
engine.add_suite(new Geary.Mime.ContentTypeTest().get_suite());