Add an operations queue to GenericAccount for server operations.

This generalises the approach used to execute the flag watcher and
background sync, provides a high-level means of managing local and remote
operations, and provides a means of compartmentalising operation-specific
code.

* src/engine/imap-engine/imap-engine-account-operation.vala
  (AccountProcessor): Interface for denoting classes that implements some
  account-specific operation.

* src/engine/imap-engine/imap-engine-account-processor.vala
  (AccountProcessor): Class to manage the operation queue and execute
  operations.

* src/engine/imap-engine/imap-engine-generic-account.vala
  (GenericAccount): Create and manage an instance of AccountProcessor,
  add queue_operation method to allow operations to be queued.
This commit is contained in:
Michael James Gratton 2017-11-23 10:11:30 +11:00
parent 5ed7de55dd
commit b24c554f51
8 changed files with 355 additions and 0 deletions

View file

@ -230,6 +230,8 @@ src/engine/imap-engine/gmail/imap-engine-gmail-drafts-folder.vala
src/engine/imap-engine/gmail/imap-engine-gmail-folder.vala
src/engine/imap-engine/gmail/imap-engine-gmail-search-folder.vala
src/engine/imap-engine/gmail/imap-engine-gmail-spam-trash-folder.vala
src/engine/imap-engine/imap-engine-account-operation.vala
src/engine/imap-engine/imap-engine-account-processor.vala
src/engine/imap-engine/imap-engine-account-synchronizer.vala
src/engine/imap-engine/imap-engine-batch-operations.vala
src/engine/imap-engine/imap-engine-contact-store.vala

View file

@ -190,6 +190,8 @@ engine/imap-db/outbox/smtp-outbox-folder-properties.vala
engine/imap-db/outbox/smtp-outbox-folder-root.vala
engine/imap-engine/imap-engine.vala
engine/imap-engine/imap-engine-account-operation.vala
engine/imap-engine/imap-engine-account-processor.vala
engine/imap-engine/imap-engine-account-synchronizer.vala
engine/imap-engine/imap-engine-batch-operations.vala
engine/imap-engine/imap-engine-contact-store.vala

View file

@ -0,0 +1,86 @@
/*
* Copyright 2017 Michael Gratton <mike@vee.net>
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* A unit of work to be executed by {@link GenericAccount}.
*
* To queue an operation for execution, pass an instance to {@link
* GenericAccount.queue_operation} when the account is opened. It will
* added to the accounts queue and executed asynchronously when it
* reaches the front.
*
* Execution of the operation is managed by {@link
* AccountProcessor}. Since the processor will not en-queue duplicate
* operations, implementations may override the {@link equal_to}
* method to ensure that the same operation is not queued twice.
*/
public abstract class Geary.ImapEngine.AccountOperation : Geary.BaseObject {
/**
* Fired by after processing when the operation has completed.
*
* This is fired regardless of if an error was thrown after {@link
* execute} is called. It is always fired after either {@link
* succeeded} or {@link failed} is fired.
*
* Implementations should not fire this themselves, the
* processor will do it for them.
*/
public signal void completed();
/**
* Fired by the processor if the operation completes successfully.
*
* This is fired only after {@link execute} was called and did
* not raise an error.
*
* Implementations should not fire this themselves, the
* processor will do it for them.
*/
public signal void succeeded();
/**
* Fired by the processor if the operation throws an error.
*
* This is fired only after {@link execute} was called and
* threw an error. The argument is the error that was thrown.
*
* Implementations should not fire this themselves, the
* processor will do it for them.
*/
public signal void failed(Error err);
/**
* Called by the processor to execute this operation.
*/
public abstract async void execute(Cancellable cancellable) throws Error;
/**
* Determines if this operation is equal to another.
*
* By default assumes that the same instance or two different
* instances of the exact same type are equal. Implementations
* should override it if they wish to guard against different
* instances of the same high-level operation from being executed
* twice.
*/
public virtual bool equal_to(AccountOperation op) {
return (op != null && (this == op || this.get_type() == op.get_type()));
}
/**
* Provides a representation of this operation for debugging.
*
* By default simply returns the name of the class.
*/
public virtual string to_string() {
return this.get_type().name();
}
}

View file

@ -0,0 +1,88 @@
/*
* Copyright 2017 Michael Gratton <mike@vee.net>
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* Queues and asynchronously executes {@link AccountOperation} instances.
*
* Operations that are equal to any currently executing or currently
* in the queue will not be re-queued.
*
* Errors thrown are reported to the user via the account's
* `problem_report` signal.
*/
internal class Geary.ImapEngine.AccountProcessor : Geary.BaseObject {
private static bool op_equal(AccountOperation a, AccountOperation b) {
return a.equal_to(b);
}
/** Determines an operation is currently being executed. */
public bool is_executing { get { return this.current_op != null; } }
/** Returns the number of operations currently waiting in the queue. */
public uint waiting { get { return this.queue.size; } }
/** Fired when an error occurs processing an operation. */
public signal void operation_error(AccountOperation op, Error error);
private string id;
private Nonblocking.Queue<AccountOperation> queue =
new Nonblocking.Queue<AccountOperation>.fifo(op_equal);
private AccountOperation? current_op = null;
private Cancellable cancellable = new Cancellable();
public AccountProcessor(string id) {
this.id = id;
this.queue.allow_duplicates = false;
this.run.begin();
}
public void enqueue(AccountOperation op) {
if (this.current_op == null || !op.equal_to(this.current_op)) {
this.queue.send(op);
}
}
public void stop() {
this.cancellable.cancel();
this.queue.clear();
}
private async void run() {
while (!this.cancellable.is_cancelled()) {
AccountOperation? op = null;
try {
op = yield this.queue.receive(this.cancellable);
} catch (Error err) {
// we've been cancelled, so bail out
return;
}
if (op != null) {
debug("%s: Executing operation: %s", id, op.to_string());
this.current_op = op;
try {
yield op.execute(this.cancellable);
op.succeeded();
} catch (Error err) {
op.failed(err);
operation_error(op, err);
}
op.completed();
this.current_op = null;
}
}
}
}

View file

@ -28,6 +28,7 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.Account {
private Gee.HashMap<FolderPath, uint> refresh_unseen_timeout_ids
= new Gee.HashMap<FolderPath, uint>();
private Gee.HashSet<Geary.Folder> in_refresh_unseen = new Gee.HashSet<Geary.Folder>();
private AccountProcessor? processor;
private AccountSynchronizer sync;
private Cancellable? enumerate_folder_cancellable = null;
private TimeoutManager refresh_folder_timer;
@ -72,6 +73,19 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.Account {
compile_special_search_names();
}
/**
* Queues an operation for execution by this account.
*
* The operation will added to the account's {@link
* AccountProcessor} and executed asynchronously by that when it
* reaches the front.
*/
public void queue_operation(AccountOperation op)
throws EngineError {
check_open();
this.processor.enqueue(op);
}
protected override void notify_folders_available_unavailable(Gee.List<Geary.Folder>? available,
Gee.List<Geary.Folder>? unavailable) {
base.notify_folders_available_unavailable(available, unavailable);
@ -141,6 +155,8 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.Account {
}
private async void internal_open_async(Cancellable? cancellable) throws Error {
this.processor = new AccountProcessor(this.to_string());
try {
yield local.open_async(information.data_dir, Engine.instance.resource_dir.get_child("sql"),
cancellable);
@ -195,6 +211,9 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.Account {
if (!open)
return;
// Halt internal tasks early so they stop using local and
// remote connections.
this.processor.stop();
this.sync.stop();
Cancellable folder_cancellable = this.enumerate_folder_cancellable;

View file

@ -11,6 +11,7 @@ set(TEST_ENGINE_SRC
engine/imap/command/imap-create-command-test.vala
engine/imap/response/imap-namespace-response-test.vala
engine/imap/transport/imap-deserializer-test.vala
engine/imap-engine/account-processor-test.vala
engine/mime-content-type-test.vala
engine/rfc822-mailbox-address-test.vala
engine/rfc822-message-test.vala

View file

@ -0,0 +1,156 @@
/*
* Copyright 2017 Michael Gratton <mike@vee.net>
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
errordomain AccountProcessorTestError {
TEST;
}
public class Geary.ImapEngine.AccountProcessorTest : Gee.TestCase {
public class TestOperation : AccountOperation {
public bool throw_error = false;
public bool wait_for_cancel = false;
public bool execute_called = false;
private Nonblocking.Spinlock spinlock = new Nonblocking.Spinlock();
public override async void execute(Cancellable cancellable)
throws Error {
print("Test op/");
this.execute_called = true;
if (this.wait_for_cancel) {
yield this.spinlock.wait_async(cancellable);
}
if (this.throw_error) {
throw new AccountProcessorTestError.TEST("Failed");
}
}
}
public class OtherOperation : TestOperation {
}
private AccountProcessor processor;
private uint succeeded;
private uint failed;
private uint completed;
public AccountProcessorTest() {
base("Geary.ImapEngine.AccountProcessorTest");
add_test("test_success", test_success);
add_test("test_failure", test_failure);
add_test("test_duplicate", test_duplicate);
add_test("test_stop", test_stop);
this.processor = new AccountProcessor("processor");
}
public override void set_up() {
this.succeeded = 0;
this.failed = 0;
this.completed = 0;
}
public void test_success() {
TestOperation op = setup_operation(new TestOperation());
this.processor.enqueue(op);
assert(this.processor.waiting == 1);
execute_all();
assert(op.execute_called);
assert(this.succeeded == 1);
assert(this.failed == 0);
assert(this.completed == 1);
}
public void test_failure() {
TestOperation op = setup_operation(new TestOperation());
op.throw_error = true;
AccountOperation? error_op = null;
Error? error = null;
this.processor.operation_error.connect((proc, op, err) => {
error_op = op;
error = err;
});
this.processor.enqueue(op);
execute_all();
assert(this.succeeded == 0);
assert(this.failed == 1);
assert(this.completed == 1);
assert(error_op == op);
assert(error is AccountProcessorTestError.TEST);
}
public void test_duplicate() {
TestOperation op1 = setup_operation(new TestOperation());
TestOperation op2 = setup_operation(new TestOperation());
TestOperation op3 = setup_operation(new OtherOperation());
this.processor.enqueue(op1);
this.processor.enqueue(op2);
assert(this.processor.waiting == 1);
this.processor.enqueue(op3);
assert(this.processor.waiting == 2);
}
public void test_stop() {
TestOperation op1 = setup_operation(new TestOperation());
op1.wait_for_cancel = true;
TestOperation op2 = setup_operation(new OtherOperation());
this.processor.enqueue(op1);
this.processor.enqueue(op2);
while (!this.processor.is_executing) {
this.main_loop.iteration(true);
}
this.processor.stop();
while (this.main_loop.pending()) {
this.main_loop.iteration(true);
}
assert(!this.processor.is_executing);
assert(this.processor.waiting == 0);
assert(this.succeeded == 0);
assert(this.failed == 1);
assert(this.completed == 1);
}
private TestOperation setup_operation(TestOperation op) {
op.succeeded.connect(() => {
this.succeeded++;
});
op.failed.connect(() => {
this.failed++;
});
op.completed.connect(() => {
this.completed++;
});
return op;
}
private void execute_all() {
while (this.processor.is_executing || this.processor.waiting > 0) {
this.main_loop.iteration(true);
}
}
}

View file

@ -29,6 +29,7 @@ int main(string[] args) {
engine.add_suite(new Geary.Imap.DeserializerTest().get_suite());
engine.add_suite(new Geary.Imap.CreateCommandTest().get_suite());
engine.add_suite(new Geary.Imap.NamespaceResponseTest().get_suite());
engine.add_suite(new Geary.ImapEngine.AccountProcessorTest().get_suite());
engine.add_suite(new Geary.Inet.Test().get_suite());
engine.add_suite(new Geary.JS.Test().get_suite());
engine.add_suite(new Geary.Mime.ContentTypeTest().get_suite());