Initial implementation of background prefetching: #4462

This implements a background prefetch of all normalized messages
in the folder.  It does not implement the chunked prefetch scheme
we discussed.  It uses a simple priority scheme as well, only
pulling one message at a time, although it may be worthwhile
later implementing a pull-back system where the prefetch waits
until higher-priority traffic completes before continuing.
This commit is contained in:
Jim Nelson 2012-03-13 16:03:42 -07:00
parent fc6feab340
commit a19a3d0ea7
10 changed files with 278 additions and 52 deletions

View file

@ -8,12 +8,13 @@
* CreateEmailOperation is a common Geary.NonblockingBatchOperation that can be used with
* Geary.NonblockingBatch.
*
* Note that this operation always returns null, as Geary.Folder.create_email_async() has no returned
* value.
* Note that this operation always returns null. The result of Geary.Folder.create_email_async()
* is stored in the created property.
*/
public class Geary.CreateEmailOperation : Geary.NonblockingBatchOperation {
public Geary.Folder folder { get; private set; }
public Geary.Email email { get; private set; }
public bool created { get; private set; default = false; }
public CreateEmailOperation(Geary.Folder folder, Geary.Email email) {
this.folder = folder;
@ -21,7 +22,7 @@ public class Geary.CreateEmailOperation : Geary.NonblockingBatchOperation {
}
public override async Object? execute_async(Cancellable? cancellable) throws Error {
yield folder.create_email_async(email, cancellable);
created = yield folder.create_email_async(email, cancellable);
return null;
}

View file

@ -208,9 +208,11 @@ public interface Geary.Folder : Object {
* deal with this. Callers from outside the Engine don't need to worry about this; it's taken
* care of under the covers.
*
* Returns true if the email was created in the folder, false if it was merged.
*
* The Folder must be opened prior to attempting this operation.
*/
public abstract async void create_email_async(Geary.Email email, Cancellable? cancellable = null)
public abstract async bool create_email_async(Geary.Email email, Cancellable? cancellable = null)
throws Error;
/**

View file

@ -106,7 +106,7 @@ private class Geary.Imap.Folder : Geary.AbstractFolder, Geary.RemoteFolder {
return mailbox.exists;
}
public override async void create_email_async(Geary.Email email, Cancellable? cancellable = null) throws Error {
public override async bool create_email_async(Geary.Email email, Cancellable? cancellable = null) throws Error {
if (mailbox == null)
throw new EngineError.OPEN_REQUIRED("%s not opened", to_string());

View file

@ -42,7 +42,7 @@ public abstract class Geary.AbstractFolder : Object, Geary.Folder {
public abstract async int get_email_count_async(Cancellable? cancellable = null) throws Error;
public abstract async void create_email_async(Geary.Email email, Cancellable? cancellable = null)
public abstract async bool create_email_async(Geary.Email email, Cancellable? cancellable = null)
throws Error;
public abstract async Gee.List<Geary.Email>? list_email_async(int low, int count,

View file

@ -7,22 +7,6 @@
private class Geary.EngineFolder : Geary.AbstractFolder {
private const int REMOTE_FETCH_CHUNK_COUNT = 5;
private class CommitOperation : NonblockingBatchOperation {
public Folder folder;
public Geary.Email email;
public CommitOperation(Folder folder, Geary.Email email) {
this.folder = folder;
this.email = email;
}
public override async Object? execute_async(Cancellable? cancellable) throws Error {
yield folder.create_email_async(email, cancellable);
return null;
}
}
internal LocalFolder local_folder { get; protected set; }
internal RemoteFolder? remote_folder { get; protected set; default = null; }
internal int remote_count { get; private set; default = -1; }
@ -35,6 +19,9 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
private SendReplayQueue? send_replay_queue = null;
private NonblockingMutex normalize_email_positions_mutex = new NonblockingMutex();
public virtual signal void local_added(Gee.Collection<Geary.EmailIdentifier> added) {
}
public EngineFolder(RemoteAccount remote, LocalAccount local, LocalFolder local_folder) {
this.remote = remote;
this.local = local;
@ -46,6 +33,10 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
warning("Folder %s destroyed without closing", to_string());
}
protected virtual void notify_local_added(Gee.Collection<Geary.EmailIdentifier> added) {
local_added(added);
}
public override Geary.FolderPath get_path() {
return local_folder.get_path();
}
@ -59,7 +50,7 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
Geary.Folder.ListFlags.EXCLUDING_ID;
}
public override async void create_email_async(Geary.Email email, Cancellable? cancellable) throws Error {
public override async bool create_email_async(Geary.Email email, Cancellable? cancellable) throws Error {
throw new EngineError.READONLY("Engine currently read-only");
}
@ -256,15 +247,21 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
local_folder.get_duplicate_detection_fields(), Geary.Folder.ListFlags.NONE, null);
assert(list != null && list.size > 0);
Gee.HashSet<Geary.EmailIdentifier> created = new Gee.HashSet<Geary.EmailIdentifier>(
Hashable.hash_func, Equalable.equal_func);
foreach (Geary.Email email in list) {
debug("Creating Email ID %s", email.id.to_string());
yield local_folder.create_email_async(email, null);
if (yield local_folder.create_email_async(email, null))
created.add(email.id);
}
// save new remote count
remote_count = new_remote_count;
notify_messages_appended(new_remote_count);
if (created.size > 0)
notify_local_added(created);
} catch (Error err) {
debug("Unable to normalize local store of newly appended messages to %s: %s",
to_string(), err.message);
@ -525,12 +522,24 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
NonblockingBatch batch = new NonblockingBatch();
foreach (Geary.Email email in remote_list)
batch.add(new CommitOperation(local_folder, email));
batch.add(new CreateEmailOperation(local_folder, email));
yield batch.execute_all_async(cancellable);
batch.throw_first_exception();
// report locally added (non-duplicate, not unknown) emails
Gee.HashSet<Geary.EmailIdentifier> created_ids = new Gee.HashSet<Geary.EmailIdentifier>(
Hashable.hash_func, Equalable.equal_func);
foreach (int id in batch.get_ids()) {
CreateEmailOperation? op = batch.get_operation(id) as CreateEmailOperation;
if (op != null && op.created)
created_ids.add(op.email.id);
}
if (created_ids.size > 0)
notify_local_added(created_ids);
if (cb != null)
cb(remote_list, null);
@ -570,7 +579,13 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
Geary.Email email = yield remote_folder.fetch_email_async(id, fields, cancellable);
// save to local store
yield local_folder.create_email_async(email, cancellable);
if (yield local_folder.create_email_async(email, cancellable)) {
// TODO: A Singleton collection would be useful here.
Gee.ArrayList<Geary.EmailIdentifier> ids = new Gee.ArrayList<Geary.EmailIdentifier>();
ids.add(email.id);
notify_local_added(ids);
}
return email;
}
@ -654,6 +669,18 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
yield batch.execute_all_async(cancellable);
batch.throw_first_exception();
// Collect which EmailIdentifiers were created and report them
Gee.HashSet<Geary.EmailIdentifier> created_ids = new Gee.HashSet<Geary.EmailIdentifier>(
Hashable.hash_func, Equalable.equal_func);
foreach (int id in batch.get_ids()) {
CreateEmailOperation? op = batch.get_operation(id) as CreateEmailOperation;
if (op != null && op.created)
created_ids.add(op.email.id);
}
if (created_ids.size > 0)
notify_local_added(created_ids);
} catch (Error e) {
local_count = 0; // prevent compiler warning
error = e;

View file

@ -7,9 +7,15 @@
private class Geary.GenericImapFolder : Geary.EngineFolder {
public const int DEFAULT_FLAG_WATCH_SEC = 3 * 60;
private const int PREFETCH_DELAY_SEC = 10;
private uint flag_watch_id = 0;
private Cancellable flag_watch_cancellable = new Cancellable();
private bool in_flag_watch = false;
private Cancellable prefetch_cancellable = new Cancellable();
private NonblockingMutex prefetch_mutex = new NonblockingMutex();
private Gee.HashSet<Geary.EmailIdentifier> prefetch_ids = new Gee.HashSet<Geary.EmailIdentifier>(
Hashable.hash_func, Equalable.equal_func);
public GenericImapFolder(RemoteAccount remote, LocalAccount local, LocalFolder local_folder) {
base (remote, local, local_folder);
@ -263,6 +269,9 @@ private class Geary.GenericImapFolder : Geary.EngineFolder {
if (state == Geary.Folder.OpenState.BOTH) {
flag_watch_cancellable = new Cancellable();
enable_flag_watch();
prefetch_cancellable = new Cancellable();
schedule_prefetch_all();
}
}
@ -270,9 +279,17 @@ private class Geary.GenericImapFolder : Geary.EngineFolder {
disable_flag_watch();
flag_watch_cancellable.cancel();
prefetch_cancellable.cancel();
base.notify_closed(reason);
}
protected override void notify_local_added(Gee.Collection<Geary.EmailIdentifier> added) {
schedule_prefetch(added);
base.notify_local_added(added);
}
/**
* Turns on the "flag watch." This periodtically checks if the flags on any messages have changed.
*
@ -363,4 +380,140 @@ private class Geary.GenericImapFolder : Geary.EngineFolder {
if (!flag_watch_cancellable.is_cancelled() && changed_map.size > 0)
notify_email_flags_changed(changed_map);
}
private void schedule_prefetch_all() {
// Async method will schedule prefetch once ids are known
do_prefetch_all.begin();
}
private void schedule_prefetch(Gee.Collection<Geary.EmailIdentifier> ids) {
prefetch_ids.add_all(ids);
Timeout.add_seconds(PREFETCH_DELAY_SEC, on_start_prefetch, Priority.LOW);
}
private bool on_start_prefetch() {
do_prefetch.begin();
return false;
}
private async void do_prefetch_all() {
Gee.List<Geary.Email>? list = null;
try {
// by listing NONE, retrieving only the EmailIdentifier for the range (which here is all)
list = yield local_folder.list_email_async(1, -1, Geary.Email.Field.NONE,
Geary.Folder.ListFlags.FAST, prefetch_cancellable);
} catch (Error err) {
debug("Error while prefetching all emails for %s: %s", to_string(), err.message);
}
if (list == null || list.size == 0)
return;
Gee.HashSet<Geary.EmailIdentifier> ids = new Gee.HashSet<Geary.EmailIdentifier>(
Hashable.hash_func, Equalable.equal_func);
foreach (Geary.Email email in list)
ids.add(email.id);
if (ids.size > 0)
schedule_prefetch(ids);
}
private async void do_prefetch() {
try {
yield do_prefetch_batch();
} catch (Error err) {
debug("Error while prefetching emails for %s: %s", to_string(), err.message);
}
}
private async void do_prefetch_batch() throws Error {
int token = yield prefetch_mutex.claim_async(prefetch_cancellable);
if (prefetch_ids.size == 0)
return;
debug("do_prefetch_batch %s %d", to_string(), prefetch_ids.size);
// snarf up all requested EmailIdentifiers for this round
Gee.HashSet<Geary.EmailIdentifier> ids = prefetch_ids;
prefetch_ids = new Gee.HashSet<Geary.EmailIdentifier>(Hashable.hash_func, Equalable.equal_func);
// Get the stored fields of all the local email
Gee.Map<Geary.EmailIdentifier, Geary.Email.Field>? local_fields =
yield local_folder.get_email_fields_by_id_async(ids, prefetch_cancellable);
if (local_fields == null || local_fields.size == 0) {
debug("No local fields in %s", to_string());
prefetch_mutex.release(ref token);
return;
}
// Sort email by size
Gee.TreeSet<Geary.Email> sorted_email = new Gee.TreeSet<Geary.Email>(email_size_ascending_comparator);
foreach (Geary.EmailIdentifier id in local_fields.keys) {
sorted_email.add(yield local_folder.fetch_email_async(id, Geary.Email.Field.PROPERTIES,
prefetch_cancellable));
}
// Big TODO: The engine needs to be able to synthesize ENVELOPE (and any of the fields
// constituting it) and PREVIEW from HEADER and BODY if available. When it can do that
// won't need to prefetch ENVELOPE or PREVIEW; prefetching HEADER and BODY will be enough.
foreach (Geary.Email email in sorted_email) {
Geary.EmailIdentifier id = email.id;
Geary.Email.Field field = local_fields.get(id);
if (!field.is_all_set(Geary.Email.Field.ENVELOPE)) {
try {
yield fetch_email_async(id, Geary.Email.Field.ENVELOPE, prefetch_cancellable);
} catch (Error env_error) {
debug("Error prefetching envelope for %s: %s", id.to_string(), env_error.message);
}
}
if (!field.is_all_set(Geary.Email.Field.HEADER)) {
try {
yield fetch_email_async(id, Geary.Email.Field.HEADER, prefetch_cancellable);
} catch (Error header_err) {
debug("Error prefetching headers for %s: %s", id.to_string(), header_err.message);
}
}
if (!field.is_all_set(Geary.Email.Field.BODY)) {
try {
yield fetch_email_async(email.id, Geary.Email.Field.BODY, prefetch_cancellable);
} catch (Error body_err) {
debug("Error background fetching body from %s: %s", email.id.to_string(),
body_err.message);
}
}
}
prefetch_mutex.release(ref token);
debug("finished do_prefetch_batch %s %d", to_string(), ids.size);
}
private static int email_size_ascending_comparator(void *a, void *b) {
long asize = 0;
Geary.Imap.EmailProperties? aprop = (Geary.Imap.EmailProperties) ((Geary.Email *) a)->properties;
if (aprop != null && aprop.rfc822_size != null)
asize = aprop.rfc822_size.value;
long bsize = 0;
Geary.Imap.EmailProperties? bprop = (Geary.Imap.EmailProperties) ((Geary.Email *) b)->properties;
if (bprop != null && bprop.rfc822_size != null)
bsize = bprop.rfc822_size.value;
if (asize < bsize)
return -1;
else if (asize > bsize)
return 1;
else
return 0;
}
}

View file

@ -69,5 +69,11 @@ private interface Geary.LocalFolder : Object, Geary.Folder {
*/
public async abstract Geary.EmailIdentifier? id_from_remote_position(int remote_position,
int new_remote_count) throws Error;
/**
* Returns a map of local emails and their stored fields.
*/
public async abstract Gee.Map<Geary.EmailIdentifier, Geary.Email.Field>? get_email_fields_by_id_async(
Gee.Collection<Geary.EmailIdentifier> ids, Cancellable? cancellable) throws Error;
}

View file

@ -112,9 +112,9 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
cancellable);
}
public override async void create_email_async(Geary.Email email, Cancellable? cancellable = null)
public override async bool create_email_async(Geary.Email email, Cancellable? cancellable = null)
throws Error {
yield atomic_create_email_async(null, email, cancellable);
return yield atomic_create_email_async(null, email, cancellable);
}
// TODO: Need to break out IMAP-specific functionality
@ -201,7 +201,7 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
return true;
}
private async void atomic_create_email_async(Transaction? supplied_transaction, Geary.Email email,
private async bool atomic_create_email_async(Transaction? supplied_transaction, Geary.Email email,
Cancellable? cancellable) throws Error {
check_open();
@ -217,7 +217,7 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
if (!associated || message_id == Sqlite.Row.INVALID_ID)
message_id = yield search_for_duplicate_async(transaction, email, cancellable);
// if already associated or a duplicate, associated
// if already associated or a duplicate, merge and/or associate
if (message_id != Sqlite.Row.INVALID_ID) {
if (!associated)
yield associate_with_folder_async(transaction, message_id, email, cancellable);
@ -227,7 +227,7 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
if (supplied_transaction == null)
yield transaction.commit_if_required_async(cancellable);
return;
return false;
}
// not found, so create and associate with this folder
@ -256,6 +256,8 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
yield transaction.commit_async(cancellable);
notify_messages_appended(count);
return true;
}
public override async Gee.List<Geary.Email>? list_email_async(int low, int count,
@ -528,8 +530,8 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
public async Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> get_email_flags_async(
Gee.List<Geary.EmailIdentifier> to_get, Cancellable? cancellable) throws Error {
Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> map = new Gee.HashMap<Geary.EmailIdentifier,
Geary.EmailFlags>();
Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> map = new Gee.HashMap<
Geary.EmailIdentifier, Geary.EmailFlags>(Hashable.hash_func, Equalable.equal_func);
Transaction transaction = yield db.begin_transaction_async("Folder.get_email_flags_async",
cancellable);
@ -695,5 +697,34 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
return id;
}
public async Gee.Map<Geary.EmailIdentifier, Geary.Email.Field>? get_email_fields_by_id_async(
Gee.Collection<Geary.EmailIdentifier> ids, Cancellable? cancellable) throws Error {
check_open();
if (ids.size == 0)
return null;
Gee.HashMap<Geary.EmailIdentifier, Geary.Email.Field> map = new Gee.HashMap<
Geary.EmailIdentifier, Geary.Email.Field>(Hashable.hash_func, Equalable.equal_func);
Transaction transaction = yield db.begin_transaction_async("get_email_fields_by_id_async",
cancellable);
foreach (Geary.EmailIdentifier id in ids) {
MessageLocationRow? row = yield location_table.fetch_by_ordering_async(transaction,
folder_row.id, ((Geary.Imap.EmailIdentifier) id).uid.value, cancellable);
if (row == null)
continue;
Geary.Email.Field fields;
if (yield message_table.fetch_fields_async(transaction, row.message_id, out fields,
cancellable)) {
map.set(id, fields);
}
}
return (map.size > 0) ? map : null;
}
}

View file

@ -51,24 +51,13 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
Transaction locked = yield obtain_lock_async(transaction, "MessageLocationTable.list_async",
cancellable);
SQLHeavy.Query query;
if (count >= 0) {
query = locked.prepare(
"SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? "
+ "%s ORDER BY ordering LIMIT ? OFFSET ?".printf(include_marked ? "" :
"AND remove_marker = 0"));
query.bind_int64(0, folder_id);
query.bind_int(1, count);
query.bind_int(2, low - 1);
} else {
// count == -1
query = locked.prepare(
"SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? "
+ "%s ORDER BY ordering OFFSET ?".printf(include_marked ? "" :
"AND remove_marker = 0"));
query.bind_int64(0, folder_id);
query.bind_int(1, low - 1);
}
SQLHeavy.Query query = locked.prepare(
"SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? "
+ "%s ORDER BY ordering LIMIT ? OFFSET ?".printf(include_marked ? "" :
"AND remove_marker = 0"));
query.bind_int64(0, folder_id);
query.bind_int(1, count);
query.bind_int(2, low - 1);
SQLHeavy.QueryResult results = yield query.execute_async();
check_cancel(cancellable, "list_async");

View file

@ -18,6 +18,13 @@ public interface Geary.Equalable {
public static bool equal_func(void *a, void *b) {
return ((Equalable *) a)->equals((Equalable *) b);
}
/**
* The EqualsFunc counterpart to Hashable.bare_int64_hash().
*/
public static bool bare_int64_equals(void *a, void *b) {
return *((int64 *) a) == *((int64 *) b);
}
}
public interface Geary.Hashable {
@ -27,17 +34,27 @@ public interface Geary.Hashable {
return ((Hashable *) ptr)->to_hash();
}
/**
* To be used by a Hashable's to_hash() method.
*/
public static uint int64_hash(int64 value) {
return hash_memory(&value, sizeof(int64));
}
/**
* To be used as a raw HashFunc where an int64 is being stored directly.
*/
public static uint bare_int64_hash(void *ptr) {
return hash_memory(ptr, sizeof(int64));
}
/**
* A rotating-XOR hash that can be used to hash memory buffers of any size. Use only if
* equality is determined by memory contents.
*/
public static uint hash_memory(void *ptr, size_t bytes) {
uint8 *u8 = (uint8 *) ptr;
uint32 hash = 0;
uint hash = 0;
for (int ctr = 0; ctr < bytes; ctr++)
hash = (hash << 4) ^ (hash >> 28) ^ (*u8++);