Further optimizations on the database side

Merging in additions to an email's fields in MessageTable could result in many
serialized SQLite operations that were independent of each other.  This change
(a) parallelizes them via NonblockingBatch, and (b) reads the messages existing
fields and strips out writes to fields it already holds.  This is legal because
none of the current fields in MessageTable are mutable on the server, but is not
allowable for ImapMessageProperties because they are.

Also, there were some naming inconsistencies in NonblockingBatch that are
corrected here.
This commit is contained in:
Jim Nelson 2011-11-16 20:02:29 -08:00
parent b8bc1325e8
commit 9dfcb26597
7 changed files with 65 additions and 38 deletions

View file

@ -22,7 +22,7 @@ public class MainWindow : Gtk.Window {
this.index = index;
}
public override async Object? execute(Cancellable? cancellable) throws Error {
public override async Object? execute_async(Cancellable? cancellable) throws Error {
Geary.Email? preview = yield folder.fetch_email_async(email_id,
MessageListStore.WITH_PREVIEW_FIELDS, cancellable);
@ -41,7 +41,7 @@ public class MainWindow : Gtk.Window {
this.path = path;
}
public override async Object? execute(Cancellable? cancellable) throws Error {
public override async Object? execute_async(Cancellable? cancellable) throws Error {
return yield account.list_folders_async(path, cancellable);
}
}
@ -55,7 +55,7 @@ public class MainWindow : Gtk.Window {
this.special_folder = special_folder;
}
public override async Object? execute(Cancellable? cancellable) throws Error {
public override async Object? execute_async(Cancellable? cancellable) throws Error {
return yield account.fetch_folder_async(special_folder.path);
}
}
@ -117,7 +117,7 @@ public class MainWindow : Gtk.Window {
batch.add(new FetchSpecialFolderOperation(account, special_folder));
debug("Listing special folders");
yield batch.execute_all();
yield batch.execute_all_async();
debug("Completed list of special folders");
foreach (int id in batch.get_ids()) {
@ -345,7 +345,7 @@ public class MainWindow : Gtk.Window {
}
debug("Fetching %d previews", count);
yield batch.execute_all(cancellable);
yield batch.execute_all_async(cancellable);
debug("Completed fetching %d previews", count);
batch.throw_first_exception();
@ -434,7 +434,7 @@ public class MainWindow : Gtk.Window {
debug("Listing folder children");
try {
yield batch.execute_all();
yield batch.execute_all_async();
} catch (Error err) {
debug("Unable to execute batch: %s", err.message);

View file

@ -22,7 +22,7 @@ private class Geary.Imap.Account : Geary.AbstractAccount, Geary.RemoteAccount {
this.path = path;
}
public override async Object? execute(Cancellable? cancellable) throws Error {
public override async Object? execute_async(Cancellable? cancellable) throws Error {
return yield session_mgr.status_async(path.get_fullpath(), StatusDataType.all(), cancellable);
}
}
@ -80,7 +80,7 @@ private class Geary.Imap.Account : Geary.AbstractAccount, Geary.RemoteAccount {
folders.add(new Geary.Imap.Folder(session_mgr, path, null, mbox));
}
yield batch.execute_all(cancellable);
yield batch.execute_all_async(cancellable);
foreach (int id in batch.get_ids()) {
StatusOperation op = (StatusOperation) batch.get_operation(id);

View file

@ -14,7 +14,7 @@ public class Geary.Imap.Mailbox : Geary.SmartReference {
this.cmd = cmd;
}
public override async Object? execute(Cancellable? cancellable) throws Error {
public override async Object? execute_async(Cancellable? cancellable) throws Error {
return yield context.session.send_command_async(cmd, cancellable);
}
}
@ -101,7 +101,7 @@ public class Geary.Imap.Mailbox : Geary.SmartReference {
preview_id = batch.add(new MailboxOperation(context, preview_cmd));
}
yield batch.execute_all(cancellable);
yield batch.execute_all_async(cancellable);
// process "plain" FETCH results ... these are fetched every time for, if nothing else,
// the UID which provides a position -> UID mapping that is kept in the map.

View file

@ -61,7 +61,7 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
this.email = email;
}
public override async Object? execute(Cancellable? cancellable) throws Error {
public override async Object? execute_async(Cancellable? cancellable) throws Error {
yield folder.create_email_async(email, cancellable);
return null;
@ -790,7 +790,7 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
foreach (Geary.Email email in remote_list)
batch.add(new CommitOperation(local_folder, email));
yield batch.execute_all(cancellable);
yield batch.execute_all_async(cancellable);
batch.throw_first_exception();

View file

@ -9,7 +9,7 @@
* a single task of asynchronous work. NonblockingBatch will execute it one time only.
*/
public abstract class Geary.NonblockingBatchOperation : Object {
public abstract async Object? execute(Cancellable? cancellable) throws Error;
public abstract async Object? execute_async(Cancellable? cancellable) throws Error;
}
/**
@ -29,15 +29,15 @@ public abstract class Geary.NonblockingBatchOperation : Object {
* a thrown exception. Other results should be stored by the subclass.
*
* To use, create a NonblockingBatch and populate it via the add() method. When all
* NonblockingBatchOperations have been added, call execute_all(). NonblockingBatch will fire off
* all at once and only complete execute_all() when all of them have finished. As mentioned
* NonblockingBatchOperations have been added, call execute_all_async(). NonblockingBatch will fire off
* all at once and only complete execute_all_async() when all of them have finished. As mentioned
* earlier, it's also gather their returned objects and thrown exceptions while they run. See
* get_result() and throw_first_exception() for more information.
*
* The caller will want to call *either* get_result() or throw_first_exception() to ensure that
* errors are propagated. It's not necessary to call both.
*
* After execute_all() has completed, the results may be examined. The NonblockingBatch object
* After execute_all_async() has completed, the results may be examined. The NonblockingBatch object
* can *not* be reused.
*
* Currently NonblockingBatch will fire off all operations at once and let them complete. It does
@ -64,15 +64,15 @@ public class Geary.NonblockingBatch : Object {
this.op = op;
}
public void execute(Cancellable? cancellable) {
op.execute.begin(cancellable, on_op_completed);
public void schedule(Cancellable? cancellable) {
op.execute_async.begin(cancellable, on_op_completed);
}
private void on_op_completed(Object? source, AsyncResult result) {
completed = true;
try {
returned = op.execute.end(result);
returned = op.execute_async.end(result);
} catch (Error err) {
threw = err;
}
@ -141,7 +141,7 @@ public class Geary.NonblockingBatch : Object {
*
* If there are no operations added to the batch, the method quietly exits.
*/
public async void execute_all(Cancellable? cancellable = null) throws Error {
public async void execute_all_async(Cancellable? cancellable = null) throws Error {
if (locked)
throw new IOError.PENDING("NonblockingBatch already executed or executing");
@ -164,7 +164,7 @@ public class Geary.NonblockingBatch : Object {
BatchContext? context = contexts.get(id);
assert(context != null);
context.execute(cancellable);
context.schedule(cancellable);
count++;
}

View file

@ -5,6 +5,20 @@
*/
public abstract class Geary.Sqlite.Table {
internal class ExecuteQueryOperation : NonblockingBatchOperation {
public SQLHeavy.Query query;
public ExecuteQueryOperation(SQLHeavy.Query query) {
this.query = query;
}
public override async Object? execute_async(Cancellable? cancellable) throws Error {
yield query.execute_async(cancellable);
return null;
}
}
internal weak Geary.Sqlite.Database gdb;
internal SQLHeavy.Table table;

View file

@ -80,26 +80,35 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table {
Transaction locked = yield obtain_lock_async(transaction, "MessageTable.merge_async",
cancellable);
Geary.Email.Field available_fields;
if (!yield fetch_fields_async(locked, row.id, out available_fields, cancellable))
throw new EngineError.NOT_FOUND("No message with ID %lld found in database", row.id);
// This calculates the fields in the row that are not in the database already
Geary.Email.Field new_fields = (row.fields ^ available_fields) & row.fields;
// merge the valid fields in the row
SQLHeavy.Query query = locked.prepare(
"UPDATE MessageTable SET fields = fields | ? WHERE id=?");
query.bind_int(0, row.fields);
query.bind_int(0, new_fields);
query.bind_int64(1, row.id);
yield query.execute_async(cancellable);
locked.set_commit_required();
if (row.fields.is_any_set(Geary.Email.Field.DATE)) {
NonblockingBatch batch = new NonblockingBatch();
if (new_fields.is_any_set(Geary.Email.Field.DATE)) {
query = locked.prepare(
"UPDATE MessageTable SET date_field=?, date_time_t=? WHERE id=?");
query.bind_string(0, row.date);
query.bind_int64(1, row.date_time_t);
query.bind_int64(2, row.id);
yield query.execute_async(cancellable);
batch.add(new ExecuteQueryOperation(query));
}
if (row.fields.is_any_set(Geary.Email.Field.ORIGINATORS)) {
if (new_fields.is_any_set(Geary.Email.Field.ORIGINATORS)) {
query = locked.prepare(
"UPDATE MessageTable SET from_field=?, sender=?, reply_to=? WHERE id=?");
query.bind_string(0, row.from);
@ -107,10 +116,10 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table {
query.bind_string(2, row.reply_to);
query.bind_int64(3, row.id);
yield query.execute_async(cancellable);
batch.add(new ExecuteQueryOperation(query));
}
if (row.fields.is_any_set(Geary.Email.Field.RECEIVERS)) {
if (new_fields.is_any_set(Geary.Email.Field.RECEIVERS)) {
query = locked.prepare(
"UPDATE MessageTable SET to_field=?, cc=?, bcc=? WHERE id=?");
query.bind_string(0, row.to);
@ -118,10 +127,10 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table {
query.bind_string(2, row.bcc);
query.bind_int64(3, row.id);
yield query.execute_async(cancellable);
batch.add(new ExecuteQueryOperation(query));
}
if (row.fields.is_any_set(Geary.Email.Field.REFERENCES)) {
if (new_fields.is_any_set(Geary.Email.Field.REFERENCES)) {
query = locked.prepare(
"UPDATE MessageTable SET message_id=?, in_reply_to=?, reference_ids=? WHERE id=?");
query.bind_string(0, row.message_id);
@ -129,45 +138,49 @@ public class Geary.Sqlite.MessageTable : Geary.Sqlite.Table {
query.bind_string(2, row.references);
query.bind_int64(3, row.id);
yield query.execute_async(cancellable);
batch.add(new ExecuteQueryOperation(query));
}
if (row.fields.is_any_set(Geary.Email.Field.SUBJECT)) {
if (new_fields.is_any_set(Geary.Email.Field.SUBJECT)) {
query = locked.prepare(
"UPDATE MessageTable SET subject=? WHERE id=?");
query.bind_string(0, row.subject);
query.bind_int64(1, row.id);
yield query.execute_async(cancellable);
batch.add(new ExecuteQueryOperation(query));
}
if (row.fields.is_any_set(Geary.Email.Field.HEADER)) {
if (new_fields.is_any_set(Geary.Email.Field.HEADER)) {
query = locked.prepare(
"UPDATE MessageTable SET header=? WHERE id=?");
query.bind_string(0, row.header);
query.bind_int64(1, row.id);
yield query.execute_async(cancellable);
batch.add(new ExecuteQueryOperation(query));
}
if (row.fields.is_any_set(Geary.Email.Field.BODY)) {
if (new_fields.is_any_set(Geary.Email.Field.BODY)) {
query = locked.prepare(
"UPDATE MessageTable SET body=? WHERE id=?");
query.bind_string(0, row.body);
query.bind_int64(1, row.id);
yield query.execute_async(cancellable);
batch.add(new ExecuteQueryOperation(query));
}
if (row.fields.is_any_set(Geary.Email.Field.PREVIEW)) {
if (new_fields.is_any_set(Geary.Email.Field.PREVIEW)) {
query = locked.prepare(
"UPDATE MessageTable SET preview=? WHERE id=?");
query.bind_string(0, row.preview);
query.bind_int64(1, row.id);
yield query.execute_async(cancellable);
batch.add(new ExecuteQueryOperation(query));
}
yield batch.execute_all_async(cancellable);
batch.throw_first_exception();
// only commit if internally atomic
if (transaction == null)
yield locked.commit_async(cancellable);