No pauses while writing drafts: Closes #7426

The slight pause (sometimes up to 2s on my machine) was due to some
CPU-intensive work occurring during folder normalization.  Some of
the work can be done in a background thread (using the new
Nonblocking.Concurrent class), but some of the problem was simply
that the Deserializer's priority was too high, causing it to block
UI events when it was busy (such as requesting a lot of UIDs from
the server).
This commit is contained in:
Jim Nelson 2013-09-03 15:09:39 -07:00
parent 1638468978
commit 0dd3edf7a2
7 changed files with 253 additions and 103 deletions

View file

@ -227,6 +227,7 @@ engine/memory/memory-unowned-string-buffer.vala
engine/nonblocking/nonblocking-abstract-semaphore.vala
engine/nonblocking/nonblocking-batch.vala
engine/nonblocking/nonblocking-concurrent.vala
engine/nonblocking/nonblocking-counting-semaphore.vala
engine/nonblocking/nonblocking-error.vala
engine/nonblocking/nonblocking-mailbox.vala

View file

@ -512,7 +512,8 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
}
// Note that this does INCLUDES messages marked for removal
public async Gee.SortedSet<Imap.UID>? list_uids_by_range_async(Imap.UID first_uid, Imap.UID last_uid,
// TODO: Let the user request a SortedSet, or have them provide the Set to add to
public async Gee.Set<Imap.UID>? list_uids_by_range_async(Imap.UID first_uid, Imap.UID last_uid,
bool include_marked_for_removal, Cancellable? cancellable) throws Error {
// order correctly
Imap.UID start, end;
@ -524,7 +525,7 @@ private class Geary.ImapDB.Folder : BaseObject, Geary.ReferenceSemantics {
end = first_uid;
}
Gee.SortedSet<Imap.UID> uids = new Gee.TreeSet<Imap.UID>();
Gee.Set<Imap.UID> uids = new Gee.HashSet<Imap.UID>();
yield db.exec_transaction_async(Db.TransactionType.RO, (cx) => {
Db.Statement stmt = cx.prepare("""
SELECT ordering, remove_marker

View file

@ -5,7 +5,7 @@
*/
private abstract class Geary.ImapEngine.GenericAccount : Geary.AbstractAccount {
private const int REFRESH_FOLDER_LIST_SEC = 2 * 60;
private const int REFRESH_FOLDER_LIST_SEC = 4 * 60;
private static Geary.FolderPath? outbox_path = null;
private static Geary.FolderPath? search_path = null;
@ -433,28 +433,10 @@ private abstract class Geary.ImapEngine.GenericAccount : Geary.AbstractAccount {
}
}
// always update, openable or not; update UIDs if remote opened, otherwise will keep
// signalling that it's changed (because the only time UIDNEXT/UIDValidity is updated
// is when the remote folder is first opened)
// always update, openable or not; have the folder update the UID info the next time
// it's opened
try {
bool update_uid_info;
switch (generic_folder.get_open_state()) {
case Folder.OpenState.REMOTE:
case Folder.OpenState.BOTH:
update_uid_info = true;
break;
case Folder.OpenState.LOCAL:
case Folder.OpenState.OPENING:
case Folder.OpenState.CLOSED:
update_uid_info = false;
break;
default:
assert_not_reached();
}
yield local.update_folder_status_async(remote_folder, update_uid_info, cancellable);
yield local.update_folder_status_async(remote_folder, false, cancellable);
} catch (Error update_error) {
debug("Unable to update local folder %s with remote properties: %s",
remote_folder.to_string(), update_error.message);

View file

@ -212,40 +212,48 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde
// get all the UIDs in said range from the local store, sorted; convert to non-null
// for ease of use later
Gee.SortedSet<Imap.UID>? local_uids = yield local_folder.list_uids_by_range_async(
Gee.Set<Imap.UID>? local_uids = yield local_folder.list_uids_by_range_async(
first_uid, last_uid, true, cancellable);
if (local_uids == null)
local_uids = new Gee.TreeSet<Imap.UID>();
local_uids = Gee.Set.empty<Imap.UID>();
check_open("normalize_folders (list local)");
// Do the same on the remote ... make non-null for ease of use later
Gee.SortedSet<Imap.UID>? remote_uids = yield remote_folder.list_uids_async(
Gee.Set<Imap.UID>? remote_uids = yield remote_folder.list_uids_async(
new Imap.MessageSet.uid_range(first_uid, last_uid), cancellable);
if (remote_uids == null)
remote_uids = new Gee.TreeSet<Imap.UID>();
local_uids = Gee.Set.empty<Imap.UID>();
check_open("normalize_folders (list remote)");
// walk local UIDs looking for UIDs no longer on remote, removing those that are available
// make the next pass that much shorter
Gee.HashSet<Imap.UID> removed_uids = new Gee.HashSet<Imap.UID>();
foreach (Imap.UID local_uid in local_uids) {
// if in local but not remote, consider removed from remote
if (!remote_uids.remove(local_uid))
removed_uids.add(local_uid);
}
debug("%s: Loaded local (%d) and remote (%d) UIDs, normalizing...", to_string(),
local_uids.size, remote_uids.size);
// everything remaining in remote has been added since folder last seen ... whether they're
// discovered (inserted) or appended depends on the highest local UID
Gee.HashSet<Imap.UID> removed_uids = new Gee.HashSet<Imap.UID>();
Gee.HashSet<Imap.UID> appended_uids = new Gee.HashSet<Imap.UID>();
Gee.HashSet<Imap.UID> discovered_uids = new Gee.HashSet<Imap.UID>();
foreach (Imap.UID remote_uid in remote_uids) {
if (remote_uid.compare_to(local_latest_id.uid) > 0)
appended_uids.add(remote_uid);
else
discovered_uids.add(remote_uid);
}
// Because the number of UIDs being processed can be immense in large folders, process
// in a background thread
yield Nonblocking.Concurrent.global.schedule_async(() => {
// walk local UIDs looking for UIDs no longer on remote, removing those that are available
// make the next pass that much shorter
foreach (Imap.UID local_uid in local_uids) {
// if in local but not remote, consider removed from remote
if (!remote_uids.remove(local_uid))
removed_uids.add(local_uid);
}
// everything remaining in remote has been added since folder last seen ... whether they're
// discovered (inserted) or appended depends on the highest local UID
foreach (Imap.UID remote_uid in remote_uids) {
if (remote_uid.compare_to(local_latest_id.uid) > 0)
appended_uids.add(remote_uid);
else
discovered_uids.add(remote_uid);
}
}, cancellable);
debug("%s: changes since last seen: removed=%d appended=%d discovered=%d", to_string(),
removed_uids.size, appended_uids.size, discovered_uids.size);
@ -272,21 +280,27 @@ private class Geary.ImapEngine.GenericFolder : Geary.AbstractFolder, Geary.Folde
to_create, cancellable);
assert(created_or_merged != null);
foreach (Email email in created_or_merged.keys) {
ImapDB.EmailIdentifier id = (ImapDB.EmailIdentifier) email.id;
bool created = created_or_merged.get(email);
// report all appended email, but separate out email never seen before (created)
// as locally-appended
if (appended_uids.contains(id.uid)) {
appended_ids.add(id);
// it's possible a large number of messages have come in, so process them in the
// background
yield Nonblocking.Concurrent.global.schedule_async(() => {
foreach (Email email in created_or_merged.keys) {
ImapDB.EmailIdentifier id = (ImapDB.EmailIdentifier) email.id;
bool created = created_or_merged.get(email);
if (created)
locally_appended_ids.add(id);
} else if (discovered_uids.contains(id.uid) && created) {
discovered_ids.add(id);
// report all appended email, but separate out email never seen before (created)
// as locally-appended
if (appended_uids.contains(id.uid)) {
appended_ids.add(id);
if (created)
locally_appended_ids.add(id);
} else if (discovered_uids.contains(id.uid) && created) {
discovered_ids.add(id);
}
}
}
}, cancellable);
debug("%s: Finished creating/merging %d emails", to_string(), created_or_merged.size);
}
check_open("normalize_folders (created/merged appended/discovered emails)");

View file

@ -323,22 +323,28 @@ private class Geary.Imap.Folder : BaseObject {
}
// Utility method for listing a UID range
public async Gee.SortedSet<Imap.UID>? list_uids_async(MessageSet msg_set, Cancellable? cancellable)
// TODO: Offer parameter so a SortedSet could be returned (or, the caller must supply the Set)
public async Gee.Set<Imap.UID>? list_uids_async(MessageSet msg_set, Cancellable? cancellable)
throws Error {
Gee.List<Geary.Email>? list = yield list_email_async(msg_set, Geary.Email.Field.NONE,
FetchCommand cmd = new FetchCommand.data_type(msg_set, FetchDataType.UID);
Gee.HashMap<SequenceNumber, FetchedData>? fetched;
yield exec_commands_async(new Collection.SingleItem<Command>(cmd), out fetched, null,
cancellable);
if (list == null || list.size == 0)
if (fetched == null || fetched.size == 0)
return null;
Gee.SortedSet<Imap.UID> uids = new Gee.TreeSet<Imap.UID>();
foreach (Geary.Email email in list) {
Imap.UID? uid = ((ImapDB.EmailIdentifier) email.id).uid;
assert(uid != null);
uids.add(uid);
}
// Because the number of UIDs can be immense, do hashing in the background
Gee.Set<Imap.UID> uids = new Gee.HashSet<Imap.UID>();
yield Nonblocking.Concurrent.global.schedule_async(() => {
foreach (FetchedData fetched_data in fetched.values) {
Imap.UID? uid = fetched_data.data_map.get(FetchDataType.UID) as Imap.UID;
if (uid != null)
uids.add(uid);
}
}, cancellable);
return uids;
return (uids.size > 0) ? uids : null;
}
// Returns a no-message-id ImapDB.EmailIdentifier with the UID stored in it.
@ -435,37 +441,40 @@ private class Geary.Imap.Folder : BaseObject {
return null;
// Convert fetched data into Geary.Email objects
// because this could be for a lot of email, do in a background thread
Gee.List<Geary.Email> email_list = new Gee.ArrayList<Geary.Email>();
foreach (SequenceNumber seq_num in fetched.keys) {
FetchedData fetched_data = fetched.get(seq_num);
// the UID should either have been fetched (if using positional addressing) or should
// have come back with the response (if using UID addressing)
UID? uid = fetched_data.data_map.get(FetchDataType.UID) as UID;
if (uid == null) {
message("Unable to list message #%s on %s: No UID returned from server",
seq_num.to_string(), to_string());
yield Nonblocking.Concurrent.global.schedule_async(() => {
foreach (SequenceNumber seq_num in fetched.keys) {
FetchedData fetched_data = fetched.get(seq_num);
continue;
}
try {
Geary.Email email = fetched_data_to_email(uid, fetched_data, fields,
partial_header_identifier, body_identifier, preview_identifier,
preview_charset_identifier);
if (!email.fields.fulfills(fields)) {
message("%s: %s missing=%s fetched=%s", to_string(), email.id.to_string(),
fields.clear(email.fields).to_list_string(), fetched_data.to_string());
// the UID should either have been fetched (if using positional addressing) or should
// have come back with the response (if using UID addressing)
UID? uid = fetched_data.data_map.get(FetchDataType.UID) as UID;
if (uid == null) {
message("Unable to list message #%s on %s: No UID returned from server",
seq_num.to_string(), to_string());
continue;
}
email_list.add(email);
} catch (Error err) {
debug("%s: Unable to convert email for %s %s: %s", to_string(), uid.to_string(),
fetched_data.to_string(), err.message);
try {
Geary.Email email = fetched_data_to_email(to_string(), uid, fetched_data, fields,
partial_header_identifier, body_identifier, preview_identifier,
preview_charset_identifier);
if (!email.fields.fulfills(fields)) {
message("%s: %s missing=%s fetched=%s", to_string(), email.id.to_string(),
fields.clear(email.fields).to_list_string(), fetched_data.to_string());
continue;
}
email_list.add(email);
} catch (Error err) {
debug("%s: Unable to convert email for %s %s: %s", to_string(), uid.to_string(),
fetched_data.to_string(), err.message);
}
}
}
}, cancellable);
return (email_list.size > 0) ? email_list : null;
}
@ -660,10 +669,10 @@ private class Geary.Imap.Folder : BaseObject {
}
}
private Geary.Email fetched_data_to_email(UID uid, FetchedData fetched_data, Geary.Email.Field required_fields,
private static Geary.Email fetched_data_to_email(string folder_name, UID uid,
FetchedData fetched_data, Geary.Email.Field required_fields,
FetchBodyDataIdentifier? partial_header_identifier, FetchBodyDataIdentifier? body_identifier,
FetchBodyDataIdentifier? preview_identifier, FetchBodyDataIdentifier? preview_charset_identifier)
throws Error {
FetchBodyDataIdentifier? preview_identifier, FetchBodyDataIdentifier? preview_charset_identifier) throws Error {
// note the use of INVALID_ROWID, as the rowid for this email (if one is present in the
// database) is unknown at this time; this means ImapDB *must* create a new EmailIdentifier
// for this email after create/merge is completed
@ -731,10 +740,10 @@ private class Geary.Imap.Folder : BaseObject {
// if the header was requested, convert its fields now
bool has_partial_header = fetched_data.body_data_map.has_key(partial_header_identifier);
if (partial_header_identifier != null && !has_partial_header) {
message("[%s] No partial header identifier \"%s\" found:", to_string(),
message("[%s] No partial header identifier \"%s\" found:", folder_name,
partial_header_identifier.to_string());
foreach (FetchBodyDataIdentifier id in fetched_data.body_data_map.keys)
message("[%s] has %s", to_string(), id.to_string());
message("[%s] has %s", folder_name, id.to_string());
} else if (partial_header_identifier != null && has_partial_header) {
RFC822.Header headers = new RFC822.Header(
fetched_data.body_data_map.get(partial_header_identifier));
@ -832,10 +841,10 @@ private class Geary.Imap.Folder : BaseObject {
email.set_message_body(new Geary.RFC822.Text(
fetched_data.body_data_map.get(body_identifier)));
} else {
message("[%s] No body identifier \"%s\" found", to_string(),
message("[%s] No body identifier \"%s\" found", folder_name,
body_identifier.to_string());
foreach (FetchBodyDataIdentifier id in fetched_data.body_data_map.keys)
message("[%s] has %s", to_string(), id.to_string());
message("[%s] has %s", folder_name, id.to_string());
}
}
@ -849,10 +858,10 @@ private class Geary.Imap.Folder : BaseObject {
fetched_data.body_data_map.get(preview_identifier),
fetched_data.body_data_map.get(preview_charset_identifier)));
} else {
message("[%s] No preview identifiers \"%s\" and \"%s\" found", to_string(),
message("[%s] No preview identifiers \"%s\" and \"%s\" found", folder_name,
preview_identifier.to_string(), preview_charset_identifier.to_string());
foreach (FetchBodyDataIdentifier id in fetched_data.body_data_map.keys)
message("[%s] has %s", to_string(), id.to_string());
message("[%s] has %s", folder_name, id.to_string());
}
}
@ -893,7 +902,7 @@ private class Geary.Imap.Folder : BaseObject {
return null;
}
private bool required_but_not_set(Geary.Email.Field check, Geary.Email.Field users_fields, Geary.Email email) {
private static bool required_but_not_set(Geary.Email.Field check, Geary.Email.Field users_fields, Geary.Email email) {
return users_fields.require(check) ? !email.fields.is_all_set(check) : false;
}

View file

@ -217,7 +217,7 @@ public class Geary.Imap.Deserializer : BaseObject {
*
* Subscribe to the various signals before starting to ensure that all responses are trapped.
*/
public async void start_async(int priority = GLib.Priority.DEFAULT) throws Error {
public async void start_async(int priority = GLib.Priority.DEFAULT_IDLE) throws Error {
if (cancellable != null)
throw new EngineError.ALREADY_OPEN("Deserializer already open");

View file

@ -0,0 +1,143 @@
/* Copyright 2013 Yorba Foundation
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* Schedule an asynchronous operation (a {@link ConcurrentCallback} to run in a background thread.
*
* Useful to perform blocking I/O or CPU-bound tasks where the result must be ready before
* other work can proceed.
*/
public class Geary.Nonblocking.Concurrent : BaseObject {
public const int DEFAULT_MAX_THREADS = 4;
/**
* A callback invoked from a {@link Concurrent} background thread.
*
* The Cancellable passed to the callback is the same Cancellable the caller supplied to
* {@link schedule_async}.
*
* Note that this callback may throw an Error. If it does, Concurrent will throw it to the
* foreground caller on behalf of the callback.
*/
public delegate void ConcurrentCallback(Cancellable? cancellable) throws Error;
private class ConcurrentOperation : BaseObject {
private unowned ConcurrentCallback cb;
private Cancellable? cancellable;
private Error? caught_err = null;
private Event event = new Event();
public ConcurrentOperation(ConcurrentCallback cb, Cancellable? cancellable) {
this.cb = cb;
this.cancellable = cancellable;
}
// Called from the foreground thread to wait for the background to complete.
//
// Can't cancel here because we *must* wait for the operation to be executed by the
// thread and complete
public async void wait_async() throws Error {
yield event.wait_async();
if (caught_err != null)
throw caught_err;
// now deal with cancellation
if (cancellable != null && cancellable.is_cancelled())
throw new IOError.CANCELLED("Geary.Nonblocking.Concurrent cancelled");
}
// Called from a background thread
public void execute() {
// only execute if not already cancelled
if (cancellable == null || !cancellable.is_cancelled()) {
try {
cb(cancellable);
} catch (Error err) {
caught_err = err;
}
}
// can't notify event here, Nonblocking.Event is not thread safe
//
// artificially increment the ref count of this object, schedule a completion callback
// on the forground thread, and signal there
ref();
Idle.add(on_notify_completed);
}
// Called in the context of the Event loop in the foreground thread
private bool on_notify_completed() {
// alert waiters
event.blind_notify();
// unref; do not touch "self" from here on, it's possibly deallocated
unref();
return false;
}
}
private static Concurrent? _global = null;
/**
* Returns the global instance of a {@link Concurrent} scheduler.
*
* Note that this call is ''not'' thread-safe and should only be called from the foreground
* thread.
*/
public static Concurrent global {
get {
return (_global != null) ? _global : _global = new Concurrent();
}
}
private ThreadPool<ConcurrentOperation>? thread_pool = null;
private ThreadError? init_err = null;
/**
* Creates a new Concurrent pool for scheduling background work.
*
* A caller may create their own pool, or they may use the default one available with
* {link instance}.
*/
public Concurrent(int max_threads = DEFAULT_MAX_THREADS) {
try {
thread_pool = new ThreadPool<ConcurrentOperation>.with_owned_data(on_work_ready,
max_threads, false);
} catch (ThreadError err) {
init_err = err;
warning("Unable to create Geary.Nonblocking.Concurrent: %s", err.message);
}
}
/**
* Schedule a callback to be invoked in a background thread.
*
* The caller should take care that the callback's state is available until
* {@link schedule_async} completes.
*
* This method is thread-safe.
*/
public async void schedule_async(ConcurrentCallback cb, Cancellable? cancellable = null)
throws Error {
if (init_err != null)
throw init_err;
// hold ConcurrentOperation ref until thread completes
ConcurrentOperation op = new ConcurrentOperation(cb, cancellable);
thread_pool.add(op);
yield op.wait_async();
}
private void on_work_ready(owned ConcurrentOperation op) {
op.execute();
}
}