Reduce database pauses: Refs bug #725929

Although not completely solved, this reduces database pauses
(along with prior commit improving parallelization of database) by
how ImapDB.Folder accesses the database.  In particular, one code
path required an INNER JOIN that simply took too long (~8s) on large
databases, locking the entire db in the process.  Tests and hacking
showed that, annoyingly, manually performing the INNER JOIN by
intersecting the results of two SQL transactions was more efficient.

In addition, creating/merging emails into the database is done in
chunks with a small pause between each chunk to allow other tasks
the opportunity to get to the database.  Create/merge is one of the
most expensive operations on the database.
This commit is contained in:
Jim Nelson 2014-06-11 17:48:23 -07:00
parent 9efbf64f3e
commit 28629ce6e2
3 changed files with 139 additions and 91 deletions

View file

@ -21,6 +21,7 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
private const int LIST_EMAIL_WITH_MESSAGE_CHUNK_COUNT = 10;
private const int LIST_EMAIL_METADATA_COUNT = 100;
private const int LIST_EMAIL_FIELDS_CHUNK_COUNT = 500;
private const int CREATE_MERGE_EMAIL_CHUNK_COUNT = 25;
[Flags]
public enum ListFlags {
@ -180,45 +181,57 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
public async Gee.Map<Geary.Email, bool> create_or_merge_email_async(Gee.Collection<Geary.Email> emails,
Cancellable? cancellable) throws Error {
Gee.HashMap<Geary.Email, bool> results = new Gee.HashMap<Geary.Email, bool>();
Gee.ArrayList<Geary.EmailIdentifier> complete_ids = new Gee.ArrayList<Geary.EmailIdentifier>();
Gee.Collection<Contact> updated_contacts = new Gee.ArrayList<Contact>();
int total_unread_change = 0;
yield db.exec_transaction_async(Db.TransactionType.RW, (cx) => {
foreach (Geary.Email email in emails) {
Gee.Collection<Contact>? contacts_this_email = null;
Geary.Email.Field pre_fields;
Geary.Email.Field post_fields;
int unread_change = 0;
bool created = do_create_or_merge_email(cx, email, out pre_fields,
out post_fields, out contacts_this_email, ref unread_change, cancellable);
if (contacts_this_email != null)
updated_contacts.add_all(contacts_this_email);
results.set(email, created);
// in essence, only fire the "email-completed" signal if the local version didn't
// have all the fields but after the create/merge now does
if (post_fields.is_all_set(Geary.Email.Field.ALL) && !pre_fields.is_all_set(Geary.Email.Field.ALL))
complete_ids.add(email.id);
// Update unread count in DB.
do_add_to_unread_count(cx, unread_change, cancellable);
total_unread_change += unread_change;
}
Gee.ArrayList<Geary.Email> list = traverse<Geary.Email>(emails).to_array_list();
int index = 0;
while (index < list.size) {
int stop = Numeric.int_ceiling(index + CREATE_MERGE_EMAIL_CHUNK_COUNT, list.size);
Gee.List<Geary.Email> slice = list.slice(index, stop);
return Db.TransactionOutcome.COMMIT;
}, cancellable);
if (updated_contacts.size > 0)
contact_store.update_contacts(updated_contacts);
// Update the email_unread properties.
properties.set_status_unseen((properties.email_unread + total_unread_change).clamp(0, int.MAX));
if (complete_ids.size > 0)
email_complete(complete_ids);
Gee.ArrayList<Geary.EmailIdentifier> complete_ids = new Gee.ArrayList<Geary.EmailIdentifier>();
Gee.Collection<Contact> updated_contacts = new Gee.ArrayList<Contact>();
int total_unread_change = 0;
yield db.exec_transaction_async(Db.TransactionType.RW, (cx) => {
foreach (Geary.Email email in slice) {
Gee.Collection<Contact>? contacts_this_email = null;
Geary.Email.Field pre_fields;
Geary.Email.Field post_fields;
int unread_change = 0;
bool created = do_create_or_merge_email(cx, email, out pre_fields,
out post_fields, out contacts_this_email, ref unread_change, cancellable);
if (contacts_this_email != null)
updated_contacts.add_all(contacts_this_email);
results.set(email, created);
// in essence, only fire the "email-completed" signal if the local version didn't
// have all the fields but after the create/merge now does
if (post_fields.is_all_set(Geary.Email.Field.ALL) && !pre_fields.is_all_set(Geary.Email.Field.ALL))
complete_ids.add(email.id);
// Update unread count in DB.
do_add_to_unread_count(cx, unread_change, cancellable);
total_unread_change += unread_change;
}
return Db.TransactionOutcome.COMMIT;
}, cancellable);
if (updated_contacts.size > 0)
contact_store.update_contacts(updated_contacts);
// Update the email_unread properties.
properties.set_status_unseen((properties.email_unread + total_unread_change).clamp(0, int.MAX));
if (complete_ids.size > 0)
email_complete(complete_ids);
index = stop;
if (index < list.size)
yield Scheduler.sleep_ms_async(100);
}
return results;
}
@ -269,12 +282,6 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
SELECT MessageLocationTable.message_id, ordering, remove_marker
FROM MessageLocationTable
""");
if (only_incomplete) {
sql.append("""
INNER JOIN MessageTable
ON MessageTable.id = MessageLocationTable.message_id
""");
}
sql.append("WHERE folder_id = ? ");
@ -283,26 +290,32 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
else
sql.append("AND ordering <= ? ");
if (only_incomplete)
sql.append_printf("AND fields != %d ", Geary.Email.Field.ALL);
if (oldest_to_newest)
sql.append("ORDER BY ordering ASC ");
else
sql.append("ORDER BY ordering DESC ");
sql.append("LIMIT ?");
Db.Statement stmt = cx.prepare(sql.str);
stmt.bind_rowid(0, folder_id);
stmt.bind_int64(1, start_uid.value);
stmt.bind_int(2, count);
locations = do_results_to_locations(stmt.exec(cancellable), flags, cancellable);
if (locations.size > count)
locations = locations.slice(0, count);
return Db.TransactionOutcome.SUCCESS;
}, cancellable);
// remove complete locations (emails with all fields downloaded)
if (only_incomplete && locations.size > 0) {
yield db.exec_transaction_async(Db.TransactionType.RO, (cx) => {
do_remove_complete_locations(cx, locations, cancellable);
return Db.TransactionOutcome.SUCCESS;
}, cancellable);
}
// Next, read in email in chunks
return yield list_email_in_chunks_async(locations, required_fields, flags, cancellable);
}
@ -389,16 +402,7 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
FROM MessageLocationTable
""");
if (only_incomplete) {
sql.append("""
INNER JOIN MessageTable
ON MessageTable.id = MessageLocationTable.message_id
""");
}
sql.append("WHERE folder_id = ? AND ordering >= ? AND ordering <= ? ");
if (only_incomplete)
sql.append_printf(" AND fields != %d ", Geary.Email.Field.ALL);
Db.Statement stmt = cx.prepare(sql.str);
stmt.bind_rowid(0, folder_id);
@ -407,6 +411,9 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
locations = do_results_to_locations(stmt.exec(cancellable), flags, cancellable);
if (only_incomplete)
do_remove_complete_locations(cx, locations, cancellable);
return Db.TransactionOutcome.SUCCESS;
}, cancellable);
@ -436,13 +443,6 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
FROM MessageLocationTable
""");
if (only_incomplete) {
sql.append("""
INNER JOIN MessageTable
ON MessageTable.id = MessageLocationTable.message_id
""");
}
if (locs.size != 1) {
sql.append("WHERE ordering IN (");
bool first = true;
@ -458,9 +458,6 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
sql.append_printf("WHERE ordering = '%s' ", locs[0].uid.to_string());
}
if (only_incomplete)
sql.append_printf(" AND fields != %d ", Geary.Email.Field.ALL);
sql.append("AND folder_id = ? ");
Db.Statement stmt = cx.prepare(sql.str);
@ -468,6 +465,9 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
locations = do_results_to_locations(stmt.exec(cancellable), flags, cancellable);
if (only_incomplete)
do_remove_complete_locations(cx, locations, cancellable);
return Db.TransactionOutcome.SUCCESS;
}, cancellable);
@ -2042,6 +2042,42 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
return locations;
}
// Use a separate step to strip out complete emails because original implementation (using an
// INNER JOIN) was horribly slow under load
private void do_remove_complete_locations(Db.Connection cx, Gee.List<LocationIdentifier>? locations,
Cancellable? cancellable) throws Error {
if (locations == null || locations.size == 0)
return;
StringBuilder sql = new StringBuilder("""
SELECT id FROM MessageTable WHERE fields <> ?
""");
Db.Statement stmt = cx.prepare(sql.str);
stmt.bind_int(0, Geary.Email.Field.ALL);
Db.Result results = stmt.exec(cancellable);
Gee.HashSet<int64?> incomplete_locations = new Gee.HashSet<int64?>(Collection.int64_hash_func,
Collection.int64_equal_func);
while (!results.finished) {
incomplete_locations.add(results.int64_at(0));
results.next(cancellable);
}
if (incomplete_locations.size == 0) {
locations.clear();
return;
}
Gee.Iterator<LocationIdentifier> iter = locations.iterator();
while (iter.next()) {
if (!incomplete_locations.contains(iter.get().message_id))
iter.remove();
}
}
private LocationIdentifier? do_get_location_for_id(Db.Connection cx, ImapDB.EmailIdentifier id,
ListFlags flags, Cancellable? cancellable) throws Error {
Db.Statement stmt = cx.prepare("""

View file

@ -15,7 +15,6 @@ private class Geary.ImapEngine.EmailPrefetcher : Object {
public const int PREFETCH_DELAY_SEC = 1;
private const Geary.Email.Field PREFETCH_FIELDS = Geary.Email.Field.ALL;
private const int PREFETCH_IDS_CHUNKS = 500;
private const int PREFETCH_CHUNK_BYTES = 32 * 1024;
public Nonblocking.CountingSemaphore active_sem { get; private set;
@ -88,7 +87,10 @@ private class Geary.ImapEngine.EmailPrefetcher : Object {
}
// emails should include PROPERTIES
private void schedule_prefetch(Gee.Collection<Geary.Email> emails) {
private void schedule_prefetch(Gee.Collection<Geary.Email>? emails) {
if (emails == null || emails.size == 0)
return;
debug("%s: scheduling %d emails for prefetching", folder.to_string(), emails.size);
prefetch_emails.add_all(emails);
@ -108,41 +110,34 @@ private class Geary.ImapEngine.EmailPrefetcher : Object {
}
private async void do_prepare_all_local_async() {
Geary.EmailIdentifier? lowest = null;
for (;;) {
Gee.List<Geary.Email>? list = null;
try {
list = yield folder.local_folder.list_email_by_id_async((ImapDB.EmailIdentifier) lowest,
PREFETCH_IDS_CHUNKS, Geary.Email.Field.PROPERTIES, ImapDB.Folder.ListFlags.ONLY_INCOMPLETE,
cancellable);
} catch (Error err) {
debug("Error while list local emails for %s: %s", folder.to_string(), err.message);
}
if (list == null || list.size == 0)
break;
// find lowest for next iteration
lowest = Geary.EmailIdentifier.sort_emails(list).first().id;
schedule_prefetch(list);
Gee.List<Geary.Email>? list = null;
try {
debug("Listing all emails needing prefetching in %s...", folder.to_string());
list = yield folder.local_folder.list_email_by_id_async(null, int.MAX,
Geary.Email.Field.PROPERTIES, ImapDB.Folder.ListFlags.ONLY_INCOMPLETE, cancellable);
debug("Listed all emails needing prefetching in %s", folder.to_string());
} catch (Error err) {
debug("Error while list local emails for %s: %s", folder.to_string(), err.message);
}
schedule_prefetch(list);
active_sem.blind_notify();
}
private async void do_prepare_new_async(Gee.Collection<Geary.EmailIdentifier> ids) {
Gee.List<Geary.Email>? list = null;
try {
debug("Listing new %d emails needing prefetching in %s...", ids.size, folder.to_string());
list = yield folder.local_folder.list_email_by_sparse_id_async(
(Gee.Collection<ImapDB.EmailIdentifier>) ids,
Geary.Email.Field.PROPERTIES, ImapDB.Folder.ListFlags.ONLY_INCOMPLETE, cancellable);
debug("Listed new emails needing prefetching in %s", folder.to_string());
} catch (Error err) {
debug("Error while list local emails for %s: %s", folder.to_string(), err.message);
}
if (list != null && list.size > 0)
schedule_prefetch(list);
schedule_prefetch(list);
active_sem.blind_notify();
}

View file

@ -125,6 +125,23 @@ public inline static uint int64_hash(int64 value) {
return hash_memory(&value, sizeof(int64));
}
/**
* To be used as hash_func for Gee collections.
*/
public uint int64_hash_func(int64? n) {
return hash_memory((uint8 *) n, sizeof(int64));
}
/**
* To be used as equal_func for Gee collections.
*/
public bool int64_equal_func(int64? a, int64? b) {
int64 *bia = (int64 *) a;
int64 *bib = (int64 *) b;
return (*bia) == (*bib);
}
/**
* A rotating-XOR hash that can be used to hash memory buffers of any size.
*/