Archive/delete and mark unread now use ReplayQueue. Closes #4526

This commit is contained in:
Eric Gregory 2012-01-09 10:58:13 -08:00
parent eea2b31a73
commit 049a718c34
20 changed files with 693 additions and 157 deletions

View file

@ -54,7 +54,8 @@ CREATE TABLE MessageLocationTable (
id INTEGER PRIMARY KEY,
message_id INTEGER REFERENCES MessageTable ON DELETE CASCADE,
folder_id INTEGER REFERENCES FolderTable ON DELETE CASCADE,
ordering INTEGER
ordering INTEGER,
remove_marker INTEGER DEFAULT 0
);
CREATE INDEX MessageLocationTableMessageIDIndex ON MessageLocationTable(message_id);

View file

@ -81,6 +81,7 @@ public class GearyController {
private bool second_list_pass_required = false;
private int busy_count = 0;
private Geary.Conversation? current_conversation = null;
private Geary.Conversation? last_deleted_conversation = null;
public GearyController() {
// Setup actions.
@ -573,6 +574,12 @@ public class GearyController {
}
private void on_delete_message() {
// Prevent deletes of the same conversation from repeating.
if (current_conversation == last_deleted_conversation)
return;
last_deleted_conversation = current_conversation;
Gee.Set<Geary.Email>? pool = current_conversation.get_pool();
if (pool == null)
return;

View file

@ -27,11 +27,11 @@ public class Geary.EmailFlags : Geary.Equalable {
return list.read_only_view;
}
public void add(EmailFlag flag) {
public virtual void add(EmailFlag flag) {
list.add(flag);
}
public bool remove(EmailFlag flag) {
public virtual bool remove(EmailFlag flag) {
return list.remove(flag);
}

View file

@ -14,6 +14,6 @@ public errordomain Geary.EngineError {
BAD_RESPONSE,
INCOMPLETE_MESSAGE,
SERVER_UNAVAILABLE,
CLOSED
ALREADY_CLOSED
}

View file

@ -384,9 +384,9 @@ public interface Geary.Folder : Object {
*
* The Folder must be opened prior to attempting this operation.
*/
public abstract async Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> mark_email_async(
Gee.List<Geary.EmailIdentifier> to_mark, Geary.EmailFlags? flags_to_add,
Geary.EmailFlags? flags_to_remove, Cancellable? cancellable = null) throws Error;
public abstract async void mark_email_async(Gee.List<Geary.EmailIdentifier> to_mark,
Geary.EmailFlags? flags_to_add, Geary.EmailFlags? flags_to_remove,
Cancellable? cancellable = null) throws Error;
/**
* check_span_specifiers() verifies that the span specifiers match the requirements set by

View file

@ -13,5 +13,19 @@ public class Geary.Imap.EmailFlags : Geary.EmailFlags {
if (!flags.contains(MessageFlag.SEEN))
add(UNREAD);
}
public override void add(EmailFlag flag) {
if (flag.equals(UNREAD))
message_flags.remove(MessageFlag.SEEN);
base.add(flag);
}
public override bool remove(EmailFlag flag) {
if (flag.equals(UNREAD))
message_flags.add(MessageFlag.SEEN);
return base.remove(flag);
}
}

View file

@ -196,7 +196,7 @@ private class Geary.Imap.Folder : Geary.AbstractFolder, Geary.RemoteFolder {
yield mailbox.expunge_email_async(cancellable);
}
public override async Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> mark_email_async(
public override async void mark_email_async(
Gee.List<Geary.EmailIdentifier> to_mark, Geary.EmailFlags? flags_to_add,
Geary.EmailFlags? flags_to_remove, Cancellable? cancellable = null) throws Error {
if (mailbox == null)
@ -207,7 +207,7 @@ private class Geary.Imap.Folder : Geary.AbstractFolder, Geary.RemoteFolder {
MessageFlag.from_email_flags(flags_to_add, flags_to_remove, out msg_flags_add,
out msg_flags_remove);
return yield mailbox.mark_email_async(message_set_from_id_list(to_mark), msg_flags_add,
yield mailbox.mark_email_async(message_set_from_id_list(to_mark), msg_flags_add,
msg_flags_remove, cancellable);
}

View file

@ -44,7 +44,7 @@ public class Geary.Imap.MessageNumber : Geary.Common.IntMessageData, Geary.Imap.
public abstract class Geary.Imap.Flags : Geary.Common.MessageData, Geary.Imap.MessageData, Equalable {
public int size { get { return list.size; } }
private Gee.Set<Flag> list;
protected Gee.Set<Flag> list;
public Flags(Gee.Collection<Flag> flags) {
list = new Gee.HashSet<Flag>(Hashable.hash_func, Equalable.equal_func);
@ -121,6 +121,14 @@ public class Geary.Imap.MessageFlags : Geary.Imap.Flags {
return new MessageFlags(flags);
}
internal void add(MessageFlag flag) {
list.add(flag);
}
internal void remove(MessageFlag flag) {
list.remove(flag);
}
}
public class Geary.Imap.MailboxAttributes : Geary.Imap.Flags {

View file

@ -13,19 +13,19 @@ public abstract class Geary.AbstractFolder : Object, Geary.Folder {
closed(reason);
}
protected virtual void notify_messages_appended(int total) {
internal virtual void notify_messages_appended(int total) {
messages_appended(total);
}
protected virtual void notify_message_removed(Geary.EmailIdentifier id) {
internal virtual void notify_message_removed(Geary.EmailIdentifier id) {
message_removed(id);
}
protected virtual void notify_email_count_changed(int new_count, Folder.CountChangeReason reason) {
internal virtual void notify_email_count_changed(int new_count, Folder.CountChangeReason reason) {
email_count_changed(new_count, reason);
}
protected virtual void notify_email_flags_changed(Gee.Map<Geary.EmailIdentifier,
internal virtual void notify_email_flags_changed(Gee.Map<Geary.EmailIdentifier,
Geary.EmailFlags> flag_map) {
email_flags_changed(flag_map);
}
@ -135,7 +135,7 @@ public abstract class Geary.AbstractFolder : Object, Geary.Folder {
yield remove_email_async(list, cancellable);
}
public abstract async Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> mark_email_async(
public abstract async void mark_email_async(
Gee.List<Geary.EmailIdentifier> to_mark, Geary.EmailFlags? flags_to_add,
Geary.EmailFlags? flags_to_remove, Cancellable? cancellable = null) throws Error;

View file

@ -7,51 +7,6 @@
private class Geary.EngineFolder : Geary.AbstractFolder {
private const int REMOTE_FETCH_CHUNK_COUNT = 5;
private class ReplayAppend : ReplayOperation {
public EngineFolder owner;
public int new_remote_count;
public ReplayAppend(EngineFolder owner, int new_remote_count) {
base ("Append");
this.owner = owner;
this.new_remote_count = new_remote_count;
}
public override async void replay() {
yield owner.do_replay_appended_messages(new_remote_count);
}
}
private class ReplayRemoval : ReplayOperation {
public EngineFolder owner;
public int position;
public int new_remote_count;
public EmailIdentifier? id;
public ReplayRemoval(EngineFolder owner, int position, int new_remote_count) {
base ("Removal");
this.owner = owner;
this.position = position;
this.new_remote_count = new_remote_count;
this.id = null;
}
public ReplayRemoval.with_id(EngineFolder owner, EmailIdentifier id) {
base ("Removal.with_id");
this.owner = owner;
position = -1;
new_remote_count = -1;
this.id = id;
}
public override async void replay() {
yield owner.do_replay_remove_message(position, new_remote_count, id);
}
}
private class CommitOperation : NonblockingBatchOperation {
public Folder folder;
public Geary.Email email;
@ -68,15 +23,16 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
}
}
protected LocalFolder local_folder;
protected RemoteFolder? remote_folder = null;
internal LocalFolder local_folder { get; protected set; }
internal RemoteFolder? remote_folder { get; protected set; default = null; }
internal int remote_count { get; private set; default = -1; }
private RemoteAccount remote;
private LocalAccount local;
private int remote_count = -1;
private bool opened = false;
private NonblockingSemaphore remote_semaphore = new NonblockingSemaphore();
private ReplayQueue? replay_queue = null;
private ReceiveReplayQueue? recv_replay_queue = null;
private SendReplayQueue? send_replay_queue = null;
public EngineFolder(RemoteAccount remote, LocalAccount local, LocalFolder local_folder) {
this.remote = remote;
@ -165,8 +121,9 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
// all set; bless the remote folder as opened
remote_folder = folder;
// start the replay queue
replay_queue = new ReplayQueue();
// start the replay queues
recv_replay_queue = new ReceiveReplayQueue();
send_replay_queue = new SendReplayQueue();
} else {
debug("Unable to prepare remote folder %s: prepare_opened_file() failed", to_string());
}
@ -225,10 +182,12 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
folder.close_async.begin(cancellable);
// close the replay queue *after* the folder has been closed (in case any final upcalls
// close the replay queues *after* the folder has been closed (in case any final upcalls
// come and can be handled)
yield replay_queue.close_async();
replay_queue = null;
yield recv_replay_queue.close_async();
yield send_replay_queue.close_async();
recv_replay_queue = null;
send_replay_queue = null;
notify_closed(CloseReason.FOLDER_CLOSED);
}
@ -238,7 +197,7 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
private void on_remote_messages_appended(int total) {
debug("on_remote_messages_appended: total=%d", total);
replay_queue.schedule(new ReplayAppend(this, total));
recv_replay_queue.schedule(new ReplayAppend(this, total));
}
// Need to prefetch at least an EmailIdentifier (and duplicate detection fields) to create a
@ -248,7 +207,7 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
// which is exactly what we want.
//
// This MUST only be called from ReplayAppend.
private async void do_replay_appended_messages(int new_remote_count) {
internal async void do_replay_appended_messages(int new_remote_count) {
// this only works when the list is grown
if (remote_count >= new_remote_count) {
debug("Message reported appended by server but remote count %d already known",
@ -286,16 +245,16 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
private void on_remote_message_removed(Geary.EmailIdentifier id) {
debug("on_remote_message_removed: %s", id.to_string());
replay_queue.schedule(new ReplayRemoval.with_id(this, id));
recv_replay_queue.schedule(new ReplayRemoval.with_id(this, id));
}
private void on_remote_message_at_removed(int position, int total) {
debug("on_remote_message_at_removed: position=%d total=%d", position, total);
replay_queue.schedule(new ReplayRemoval(this, position, total));
recv_replay_queue.schedule(new ReplayRemoval(this, position, total));
}
// This MUST only be called from ReplayRemoval.
private async void do_replay_remove_message(int remote_position, int new_remote_count,
internal async void do_replay_remove_message(int remote_position, int new_remote_count,
Geary.EmailIdentifier? id) {
if (remote_position < 1)
assert(id != null);
@ -305,37 +264,21 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
Geary.EmailIdentifier? owned_id = id;
if (owned_id == null) {
try {
// convert remote positional addressing to local positional addressing
int local_count = yield local_folder.get_email_count_async();
int local_position = remote_position_to_local_position(remote_position, local_count);
// possible we don't have the remote email locally
if (local_position >= 1) {
// get EmailIdentifier for removed email
Gee.List<Geary.Email>? local = yield local_folder.list_email_async(local_position, 1,
Geary.Email.Field.NONE, Geary.Folder.ListFlags.NONE, null);
if (local != null && local.size == 1) {
owned_id = local[0].id;
} else {
debug("list_email_async unable to convert position %d into id (count=%d)",
local_position, yield local_folder.get_email_count_async());
}
} else {
debug("Unable to get local position for remote position %d (local_count=%d remote_count=%d)",
remote_position, local_count, remote_count);
}
owned_id = yield local_folder.id_from_remote_position(remote_position, remote_count);
} catch (Error err) {
debug("Unable to determine ID of removed message #%d from %s: %s", remote_position,
to_string(), err.message);
}
}
bool marked = false;
if (owned_id != null) {
debug("Removing from local store Email ID %s", owned_id.to_string());
try {
// Reflect change in the local store and notify subscribers
yield local_folder.remove_single_email_async(owned_id, null);
yield local_folder.remove_marked_email_async(owned_id, out marked, null);
if (!marked)
notify_message_removed(owned_id);
} catch (Error err2) {
debug("Unable to remove message #%d from %s: %s", remote_position, to_string(),
@ -346,6 +289,7 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
// save new remote count and notify of change
remote_count = new_remote_count;
if (!marked)
notify_email_count_changed(remote_count, CountChangeReason.REMOVED);
}
@ -875,9 +819,7 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
if (!opened)
throw new EngineError.OPEN_REQUIRED("Folder %s not opened", to_string());
// Only need to remove from remote folder, since it will be signaled and automatically
// removed from the local folder.
yield remote_folder.remove_email_async(email_ids, cancellable);
send_replay_queue.schedule(new RemoveEmail(this, email_ids, cancellable));
}
// Converts a remote position to a local position, assuming that the remote has been completely
@ -948,19 +890,14 @@ private class Geary.EngineFolder : Geary.AbstractFolder {
debug("prefetched %d for %s", prefetch_count, to_string());
}
public override async Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> mark_email_async(
Gee.List<Geary.EmailIdentifier> to_mark, Geary.EmailFlags? flags_to_add,
Geary.EmailFlags? flags_to_remove, Cancellable? cancellable = null) throws Error {
public override async void mark_email_async(Gee.List<Geary.EmailIdentifier> to_mark,
Geary.EmailFlags? flags_to_add, Geary.EmailFlags? flags_to_remove,
Cancellable? cancellable = null) throws Error {
if (!yield wait_for_remote_to_open())
throw new EngineError.SERVER_UNAVAILABLE("No connection to %s", remote.to_string());
Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> map =
yield remote_folder.mark_email_async(to_mark, flags_to_add, flags_to_remove, cancellable);
yield local_folder.set_email_flags_async(map, cancellable);
notify_email_flags_changed(map);
return map;
send_replay_queue.schedule(new MarkEmail(this, to_mark, flags_to_add, flags_to_remove,
cancellable));
}
}

View file

@ -38,11 +38,36 @@ private interface Geary.LocalFolder : Object, Geary.Folder {
public async abstract int get_id_position_async(Geary.EmailIdentifier id, Cancellable? cancellable)
throws Error;
/**
* Removes an email while returning the "marked" status flag. This flag is used internally
* by the SendReplayQueue to record whether we've already notified for the removal.
*/
public async abstract void remove_marked_email_async(Geary.EmailIdentifier id, out bool marked,
Cancellable? cancellable) throws Error;
/**
* Marks or unmarks an e-mail for removal.
*/
public async abstract void mark_removed_async(Geary.EmailIdentifier id, bool remove,
Cancellable? cancellable) throws Error;
/**
* Retrieves email flags for the given list of email identifiers.
*/
public async abstract Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> get_email_flags_async(
Gee.List<Geary.EmailIdentifier> to_get, Cancellable? cancellable) throws Error;
/**
* Sets an e-mails flags based on the MessageFlags. Note that the EmailFlags MUST be of
* type Geary.Imap.EmailFlags and contain a valid MessageFlags object.
*/
public async abstract void set_email_flags_async(Gee.Map<Geary.EmailIdentifier,
Geary.EmailFlags> map, Cancellable? cancellable) throws Error;
/**
* Converts a remote position and count into an email ID.
*/
public async abstract Geary.EmailIdentifier? id_from_remote_position(int remote_position,
int new_remote_count) throws Error;
}

View file

@ -0,0 +1,50 @@
/* Copyright 2011 Yorba Foundation
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
private class Geary.ReplayAppend : Geary.ReceiveReplayOperation {
public EngineFolder owner;
public int new_remote_count;
public ReplayAppend(EngineFolder owner, int new_remote_count) {
base ("Append");
this.owner = owner;
this.new_remote_count = new_remote_count;
}
public override async void replay() {
yield owner.do_replay_appended_messages(new_remote_count);
}
}
private class Geary.ReplayRemoval : Geary.ReceiveReplayOperation {
public EngineFolder owner;
public int position;
public int new_remote_count;
public EmailIdentifier? id;
public ReplayRemoval(EngineFolder owner, int position, int new_remote_count) {
base ("Removal");
this.owner = owner;
this.position = position;
this.new_remote_count = new_remote_count;
this.id = null;
}
public ReplayRemoval.with_id(EngineFolder owner, EmailIdentifier id) {
base ("Removal.with_id");
this.owner = owner;
position = -1;
new_remote_count = -1;
this.id = id;
}
public override async void replay() {
yield owner.do_replay_remove_message(position, new_remote_count, id);
}
}

View file

@ -4,18 +4,18 @@
* (version 2.1 or later). See the COPYING file in this distribution.
*/
private abstract class Geary.ReplayOperation {
private abstract class Geary.ReceiveReplayOperation {
private string name;
public ReplayOperation(string name) {
public ReceiveReplayOperation(string name) {
this.name = name;
}
public abstract async void replay();
}
private class Geary.ReplayQueue {
private class ReplayClose : ReplayOperation {
private class Geary.ReceiveReplayQueue {
private class ReplayClose : ReceiveReplayOperation {
public NonblockingSemaphore semaphore = new NonblockingSemaphore();
public ReplayClose() {
@ -31,14 +31,15 @@ private class Geary.ReplayQueue {
}
}
private NonblockingMailbox<ReplayOperation> queue = new NonblockingMailbox<ReplayOperation>();
private NonblockingMailbox<ReceiveReplayOperation> queue = new
NonblockingMailbox<ReceiveReplayOperation>();
private bool closed = false;
public ReplayQueue() {
public ReceiveReplayQueue() {
do_process_queue.begin();
}
public void schedule(ReplayOperation op) {
public void schedule(ReceiveReplayOperation op) {
try {
queue.send(op);
} catch (Error err) {
@ -48,7 +49,7 @@ private class Geary.ReplayQueue {
public async void close_async() throws EngineError {
if (closed)
throw new EngineError.CLOSED("Closed");
throw new EngineError.ALREADY_CLOSED("Closed");
closed = true;
@ -58,7 +59,7 @@ private class Geary.ReplayQueue {
try {
yield replay_close.semaphore.wait_async();
} catch (Error err) {
error("Error waiting for replay queue to close: %s", err.message);
error("Error waiting for receive replay queue to close: %s", err.message);
}
}
@ -69,7 +70,7 @@ private class Geary.ReplayQueue {
if (queue.size == 0 && closed)
break;
ReplayOperation op;
ReceiveReplayOperation op;
try {
op = yield queue.recv_async();
} catch (Error err) {

View file

@ -0,0 +1,100 @@
/* Copyright 2011 Yorba Foundation
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
private class Geary.MarkEmail : Geary.SendReplayOperation {
private EngineFolder engine;
private Gee.List<Geary.EmailIdentifier> to_mark;
private Geary.EmailFlags? flags_to_add;
private Geary.EmailFlags? flags_to_remove;
private Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags>? original_flags = null;
private Cancellable? cancellable;
public MarkEmail(EngineFolder engine, Gee.List<Geary.EmailIdentifier> to_mark,
Geary.EmailFlags? flags_to_add, Geary.EmailFlags? flags_to_remove,
Cancellable? cancellable = null) {
base("MarkEmail");
this.engine = engine;
this.to_mark = to_mark;
this.flags_to_add = flags_to_add;
this.flags_to_remove = flags_to_remove;
this.cancellable = cancellable;
}
public override async bool replay_local() throws Error {
// Save original flags, then set new ones.
original_flags = yield engine.local_folder.get_email_flags_async(to_mark, cancellable);
yield engine.local_folder.mark_email_async(to_mark, flags_to_add, flags_to_remove,
cancellable);
// Notify using flags from DB.
engine.notify_email_flags_changed(yield engine.local_folder.get_email_flags_async(to_mark,
cancellable));
return false;
}
public override async bool replay_remote() throws Error {
yield engine.remote_folder.mark_email_async(to_mark, flags_to_add, flags_to_remove,
cancellable);
return true;
}
public override async void backout_local() throws Error {
// Restore original flags.
yield engine.local_folder.set_email_flags_async(original_flags, cancellable);
}
}
private class Geary.RemoveEmail : Geary.SendReplayOperation {
private EngineFolder engine;
private Gee.List<Geary.EmailIdentifier> to_remove;
private Cancellable? cancellable;
private int original_count = 0;
public RemoveEmail(EngineFolder engine, Gee.List<Geary.EmailIdentifier> to_remove,
Cancellable? cancellable = null) {
base("RemoveEmail");
this.engine = engine;
this.to_remove = to_remove;
this.cancellable = cancellable;
}
public override async bool replay_local() throws Error {
foreach (Geary.EmailIdentifier id in to_remove) {
yield engine.local_folder.mark_removed_async(id, true, cancellable);
engine.notify_message_removed(id);
}
original_count = engine.remote_count;
engine.notify_email_count_changed(original_count - to_remove.size,
Geary.Folder.CountChangeReason.REMOVED);
return false;
}
public override async bool replay_remote() throws Error {
// Remove from server. Note that this causes the receive replay queue to kick into
// action, removing the e-mail but *NOT* firing a signal; the "remove marker" indicates
// that the signal has already been fired.
yield engine.remote_folder.remove_email_async(to_remove, cancellable);
return true;
}
public override async void backout_local() throws Error {
foreach (Geary.EmailIdentifier id in to_remove)
yield engine.local_folder.mark_removed_async(id, false, cancellable);
engine.notify_messages_appended(to_remove.size);
engine.notify_email_count_changed(original_count, Geary.Folder.CountChangeReason.REMOVED);
}
}

View file

@ -0,0 +1,191 @@
/* Copyright 2011 Yorba Foundation
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
private abstract class Geary.SendReplayOperation {
public Error? error { get; private set; default = null; }
private NonblockingSemaphore semaphore = new NonblockingSemaphore();
private string name;
public SendReplayOperation(string name) {
this.name = name;
}
/**
* Runs the local operation.
* Returns true if the operation is complete, or false if a call to
* replay_remote() is also needed.
*/
public abstract async bool replay_local() throws Error;
/**
* Runs the remote operation.
* Returns true if the operation is complete, or false if the remote operation
* didn't complete successfully; in that case, backout_local() will be called.
*/
public abstract async bool replay_remote() throws Error;
/**
* Backs out the local operation.
* This is effectively an "undo" for when the remote operation failed.
*/
public virtual async void backout_local() throws Error {}
/**
* Waits until the operation is ready.
* On a read, this should wait until the entire operation completes.
* On a write, this should wait until the local operation completes.
*
* To trigger this, call set_ready() with either a null or an Error. Either will
* trigger completion, if an error is passed in this function will throw that error.
*/
public async void wait_for_ready() throws Error {
yield semaphore.wait_async();
if (error != null)
throw error;
}
// See the comments on wait_for_ready() on how to use this function.
internal void set_ready(Error? error) {
this.error = error;
try {
semaphore.notify();
} catch (Error e) {
debug("Unable to compelte send replay queue operation [%s] error: %s", name, e.message);
}
}
}
private class Geary.SendReplayQueue {
private class ReplayClose: Geary.SendReplayOperation {
public ReplayClose() {
base ("Close");
}
public override async bool replay_local() throws Error {
return false;
}
public override async bool replay_remote() throws Error {
return true;
}
public override async void backout_local() {}
}
private NonblockingMailbox<SendReplayOperation> local_queue = new
NonblockingMailbox<SendReplayOperation>();
private NonblockingMailbox<SendReplayOperation> remote_queue = new
NonblockingMailbox<SendReplayOperation>();
private bool closed = false;
// Signals an operation has failedand the failure was non-recoverable.
public signal void replay_failed(SendReplayOperation op, Error? fatal_error);
public SendReplayQueue() {
do_process_local_queue.begin();
do_process_remote_queue.begin();
}
public void schedule(SendReplayOperation op) {
try {
local_queue.send(op);
} catch (Error err) {
error("Unable to schedule operation on replay queue: %s", err.message);
}
}
public async void close_async() throws EngineError {
if (closed)
throw new EngineError.ALREADY_CLOSED("Closed");
closed = true;
// flush a ReplayClose operation down the pipe so all enqueued operations complete
ReplayClose replay_close = new ReplayClose();
schedule(replay_close);
try {
yield replay_close.wait_for_ready();
} catch (Error err) {
error("Error waiting for replay queue to close: %s", err.message);
}
}
private async void do_process_local_queue() {
for (;;) {
if (local_queue.size == 0 && closed)
break;
SendReplayOperation op;
try {
op = yield local_queue.recv_async();
} catch (Error err) {
error("Unable to receive next replay operation on queue: %s", err.message);
}
bool completed = false;
try {
completed = yield op.replay_local();
} catch (Error e) {
debug("Replay local error: %s", e.message);
op.set_ready(e);
continue;
}
if (!completed) {
try {
remote_queue.send(op);
} catch (Error err) {
error("Unable to schedule operation on remote replay queue: %s", err.message);
}
}
op.set_ready(null);
}
}
private async void do_process_remote_queue() {
for (;;) {
if (remote_queue.size == 0 && closed)
break;
SendReplayOperation op;
try {
op = yield remote_queue.recv_async();
} catch (Error err) {
error("Unable to receive next replay operation on queue: %s", err.message);
}
bool completed = false;
Error? remote_error = null;
try {
completed = yield op.replay_remote();
} catch (Error e) {
debug("Error: could not replay remote");
remote_error = e;
}
if (op.error != null || !completed) {
try {
yield op.backout_local();
} catch (Error e) {
replay_failed(op, e);
op.set_ready(remote_error);
continue;
}
// Signal that a recovery happened.
replay_failed(op, null);
}
op.set_ready(null);
}
}
}

View file

@ -4,7 +4,7 @@
* (version 2.1 or later). See the COPYING file in this distribution.
*/
public abstract class Geary.Sqlite.Row {
public abstract class Geary.Sqlite.Row : Object {
public const int64 INVALID_ID = -1;
protected Table table;

View file

@ -82,7 +82,17 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
check_open();
// TODO: This can be cached and updated when changes occur
return yield location_table.fetch_count_for_folder_async(null, folder_row.id, cancellable);
return yield location_table.fetch_count_for_folder_async(null, folder_row.id, false,
cancellable);
}
private async int get_email_count_including_removed_async(Cancellable? cancellable = null)
throws Error {
check_open();
// TODO: This can be cached and updated when changes occur
return yield location_table.fetch_count_for_folder_async(null, folder_row.id, true,
cancellable);
}
public async int get_id_position_async(Geary.EmailIdentifier id, Cancellable? cancellable)
@ -239,7 +249,7 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
}
int count = yield location_table.fetch_count_for_folder_async(transaction, folder_row.id,
cancellable);
false, cancellable);
// only commit if not supplied a transaction
if (supplied_transaction == null)
@ -262,9 +272,29 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
cancellable);
Gee.List<MessageLocationRow>? list = yield location_table.list_async(transaction,
folder_row.id, low, count, cancellable);
folder_row.id, low, count, false, cancellable);
return yield list_email(transaction, list, required_fields, cancellable);
return yield list_email(transaction, list, required_fields, false, cancellable);
}
private async Gee.List<Geary.Email>? list_email_including_removed_async(int low, int count,
Geary.Email.Field required_fields, Geary.Folder.ListFlags flags, Cancellable? cancellable)
throws Error {
check_open();
normalize_span_specifiers(ref low, ref count, yield get_email_count_including_removed_async(
cancellable));
if (count == 0)
return null;
Transaction transaction = yield db.begin_transaction_async(
"Folder.list_email_including_removed_async", cancellable);
Gee.List<MessageLocationRow>? list = yield location_table.list_async(transaction,
folder_row.id, low, count, true, cancellable);
return yield list_email(transaction, list, required_fields, true, cancellable);
}
public override async Gee.List<Geary.Email>? list_email_sparse_async(int[] by_position,
@ -278,7 +308,7 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
Gee.List<MessageLocationRow>? list = yield location_table.list_sparse_async(transaction,
folder_row.id, by_position, cancellable);
return yield list_email(transaction, list, required_fields, cancellable);
return yield list_email(transaction, list, required_fields, false, cancellable);
}
public override async Gee.List<Geary.Email>? list_email_by_id_async(Geary.EmailIdentifier initial_id,
@ -314,12 +344,12 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
Gee.List<MessageLocationRow>? list = yield location_table.list_ordering_async(transaction,
folder_row.id, low, high, cancellable);
return yield list_email(transaction, list, required_fields, cancellable);
return yield list_email(transaction, list, required_fields, false, cancellable);
}
private async Gee.List<Geary.Email>? list_email(Transaction transaction,
Gee.List<MessageLocationRow>? list, Geary.Email.Field required_fields, Cancellable? cancellable)
throws Error {
Gee.List<MessageLocationRow>? list, Geary.Email.Field required_fields,
bool include_removed, Cancellable? cancellable) throws Error {
check_open();
if (list == null || list.size == 0)
@ -352,7 +382,8 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
}
Geary.Imap.UID uid = new Geary.Imap.UID(location_row.ordering);
int position = yield location_row.get_position_async(transaction, cancellable);
int position = yield location_row.get_position_async(transaction, include_removed,
cancellable);
if (position == -1) {
debug("Unable to locate position of email during list of %s, dropping", to_string());
@ -390,7 +421,7 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
to_string());
}
int position = yield location_row.get_position_async(transaction, cancellable);
int position = yield location_row.get_position_async(transaction, false, cancellable);
if (position == -1) {
throw new EngineError.NOT_FOUND("Unable to determine position of email %s in %s",
id.to_string(), to_string());
@ -474,21 +505,63 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
notify_message_removed(id);
}
// This isn't implemented yet since it was simpler to replace the flags for a message wholesale
// rather than adding and removing flags.
// Use set_email_flags_async() instead.
public override async Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> mark_email_async(
public override async void mark_email_async(
Gee.List<Geary.EmailIdentifier> to_mark, Geary.EmailFlags? flags_to_add,
Geary.EmailFlags? flags_to_remove, Cancellable? cancellable = null) throws Error {
assert_not_reached();
Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> map = yield get_email_flags_async(
to_mark, cancellable);
foreach (Geary.EmailIdentifier id in map.keys) {
if (flags_to_add != null)
foreach (Geary.EmailFlag flag in flags_to_add.get_all())
((Geary.Imap.EmailFlags) map.get(id)).add(flag);
if (flags_to_remove != null)
foreach (Geary.EmailFlag flag in flags_to_remove.get_all())
((Geary.Imap.EmailFlags) map.get(id)).remove(flag);
}
yield set_email_flags_async(map, cancellable);
}
public async Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> get_email_flags_async(
Gee.List<Geary.EmailIdentifier> to_get, Cancellable? cancellable) throws Error {
Gee.Map<Geary.EmailIdentifier, Geary.EmailFlags> map = new Gee.HashMap<Geary.EmailIdentifier,
Geary.EmailFlags>();
Transaction transaction = yield db.begin_transaction_async("Folder.get_email_flags_async",
cancellable);
foreach (Geary.EmailIdentifier id in to_get) {
MessageLocationRow? location_row = yield location_table.fetch_by_ordering_async(
transaction, folder_row.id, ((Geary.Imap.EmailIdentifier) id).uid.value,
cancellable);
if (location_row == null) {
throw new EngineError.NOT_FOUND("No message with ID %s in folder %s", id.to_string(),
to_string());
}
ImapMessagePropertiesRow? row = yield imap_message_properties_table.fetch_async(
transaction, location_row.id, cancellable);
if (row == null)
continue;
map.set(id, row.get_imap_email_properties().email_flags);
}
yield transaction.commit_async(cancellable);
return map;
}
public async void set_email_flags_async(Gee.Map<Geary.EmailIdentifier,
Geary.EmailFlags> map, Cancellable? cancellable) throws Error {
check_open();
Transaction transaction = yield db.begin_transaction_async("Folder.mark_email_async",
Transaction transaction = yield db.begin_transaction_async("Folder.set_email_flags_async",
cancellable);
foreach (Geary.EmailIdentifier id in map.keys) {
@ -560,5 +633,67 @@ private class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gea
properties.get_message_flags().serialize(), internaldate, rfc822_size, cancellable);
}
}
public async void remove_marked_email_async(Geary.EmailIdentifier id, out bool marked,
Cancellable? cancellable) throws Error {
check_open();
Transaction transaction = yield db.begin_transaction_async(
"Folder.remove_marked_email_async", cancellable);
// Get marked status.
marked = yield location_table.is_marked_removed_async(transaction, folder_row.id,
id.ordering, cancellable);
// Detaching email's association with a folder.
if (!yield location_table.remove_by_ordering_async(transaction, folder_row.id,
id.ordering, cancellable)) {
throw new EngineError.NOT_FOUND("Message %s in local store of %s not found",
id.to_string(), to_string());
}
yield transaction.commit_async(cancellable);
}
public async void mark_removed_async(Geary.EmailIdentifier id, bool remove,
Cancellable? cancellable) throws Error {
check_open();
Transaction transaction = yield db.begin_transaction_async("Folder.mark_removed_async",
cancellable);
yield location_table.mark_removed_async(transaction, folder_row.id, id.ordering,
remove, cancellable);
yield transaction.commit_async(cancellable);
}
public async Geary.EmailIdentifier? id_from_remote_position(int remote_position,
int remote_count) throws Error {
Geary.EmailIdentifier? id = null;
debug("id from remote position: pos = %d, count = %d", remote_position, remote_count);
// Get local count, convert remote to local position.
int local_count = yield get_email_count_including_removed_async();
int local_position = remote_position - (remote_count - local_count);
// possible we don't have the remote email locally
if (local_position >= 1) {
// get EmailIdentifier
Gee.List<Geary.Email>? local = yield list_email_including_removed_async(local_position, 1,
Geary.Email.Field.NONE, Geary.Folder.ListFlags.NONE, null);
if (local != null && local.size == 1) {
id = local[0].id;
} else {
debug("list_email_async unable to convert position %d into id (count=%d)",
local_position, local_count);
}
} else {
debug("Unable to get local position for remote position %d (local_count=%d remote_count=%d)",
remote_position, local_count, remote_count);
}
return id;
}
}

View file

@ -44,13 +44,13 @@ public class Geary.Sqlite.MessageLocationRow : Geary.Sqlite.Row {
* If the call ever returns a position of -1, that indicates the message does not exist in the
* database.
*/
public async int get_position_async(Transaction? transaction, Cancellable? cancellable)
throws Error {
public async int get_position_async(Transaction? transaction, bool include_removed,
Cancellable? cancellable) throws Error {
if (position >= 1)
return position;
position = yield ((MessageLocationTable) table).fetch_position_async(transaction, id, folder_id,
cancellable);
include_removed, cancellable);
return (position >= 1) ? position : -1;
}

View file

@ -10,7 +10,8 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
ID,
MESSAGE_ID,
FOLDER_ID,
ORDERING
ORDERING,
REMOVE_MARKER
}
public MessageLocationTable(Geary.Sqlite.Database db, SQLHeavy.Table table) {
@ -40,7 +41,8 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
* low is one-based. If count is -1, all messages starting at low are returned.
*/
public async Gee.List<MessageLocationRow>? list_async(Transaction? transaction,
int64 folder_id, int low, int count, Cancellable? cancellable) throws Error {
int64 folder_id, int low, int count, bool include_marked, Cancellable? cancellable)
throws Error {
assert(low >= 1);
assert(count >= 0 || count == -1);
@ -51,7 +53,8 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
if (count >= 0) {
query = locked.prepare(
"SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? "
+ "ORDER BY ordering LIMIT ? OFFSET ?");
+ "%s ORDER BY ordering LIMIT ? OFFSET ?".printf(include_marked ? "" :
"AND remove_marker = 0"));
query.bind_int64(0, folder_id);
query.bind_int(1, count);
query.bind_int(2, low - 1);
@ -59,7 +62,8 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
// count == -1
query = locked.prepare(
"SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? "
+ "ORDER BY ordering OFFSET ?");
+ "%s ORDER BY ordering OFFSET ?".printf(include_marked ? "" :
"AND remove_marker = 0"));
query.bind_int64(0, folder_id);
query.bind_int(1, low - 1);
}
@ -91,7 +95,7 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
// reuse the query for each iteration
SQLHeavy.Query query = locked.prepare(
"SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? "
+ "ORDER BY ordering LIMIT 1 OFFSET ?");
+ "AND remove_marker = 0 ORDER BY ordering LIMIT 1 OFFSET ?");
Gee.List<MessageLocationRow> list = new Gee.ArrayList<MessageLocationRow>();
foreach (int position in by_position) {
@ -126,21 +130,21 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
if (high_ordering != -1 && low_ordering != -1) {
query = locked.prepare(
"SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? "
+ "AND ordering >= ? AND ordering <= ? ORDER BY ordering ASC");
+ "AND ordering >= ? AND ordering <= ? AND remove_marker = 0 ORDER BY ordering ASC");
query.bind_int64(0, folder_id);
query.bind_int64(1, low_ordering);
query.bind_int64(2, high_ordering);
} else if (high_ordering == -1) {
query = locked.prepare(
"SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? "
+ "AND ordering >= ? ORDER BY ordering ASC");
+ "AND ordering >= ? AND remove_marker = 0 ORDER BY ordering ASC");
query.bind_int64(0, folder_id);
query.bind_int64(1, low_ordering);
} else {
assert(low_ordering == -1);
query = locked.prepare(
"SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? "
+ "AND ordering <= ? ORDER BY ordering ASC");
+ "AND ordering <= ? AND remove_marker = 0 ORDER BY ordering ASC");
query.bind_int64(0, folder_id);
query.bind_int64(1, high_ordering);
}
@ -172,7 +176,7 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
SQLHeavy.Query query = locked.prepare(
"SELECT id, message_id, ordering FROM MessageLocationTable WHERE folder_id = ? "
+ "ORDER BY ordering LIMIT 1 OFFSET ?");
+ "AND remove_marker = 0 ORDER BY ordering LIMIT 1 OFFSET ?");
query.bind_int64(0, folder_id);
query.bind_int(1, position - 1);
@ -190,7 +194,8 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
cancellable);
SQLHeavy.Query query = locked.prepare(
"SELECT id, message_id FROM MessageLocationTable WHERE folder_id = ? AND ordering = ? ");
"SELECT id, message_id FROM MessageLocationTable WHERE folder_id = ? AND ordering = ? "
+ "AND remove_marker = 0");
query.bind_int64(0, folder_id);
query.bind_int64(1, ordering);
@ -202,13 +207,33 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
folder_id, ordering, -1);
}
public async MessageLocationRow? fetch_by_message_id_async(Transaction? transaction,
int64 folder_id, int64 message_id, Cancellable? cancellable) throws Error {
Transaction locked = yield obtain_lock_async(transaction,
"MessageLocationTable.fetch_by_message_id_async", cancellable);
SQLHeavy.Query query = locked.prepare(
"SELECT id, ordering FROM MessageLocationTable WHERE folder_id = ? AND message_id = ? "
+ "AND remove_marker = 0");
query.bind_int64(0, folder_id);
query.bind_int64(1, message_id);
SQLHeavy.QueryResult results = yield query.execute_async(cancellable);
if (results.finished)
return null;
return new MessageLocationRow(this, results.fetch_int64(0), message_id,
folder_id, results.fetch_int64(1), -1);
}
public async int fetch_position_async(Transaction? transaction, int64 id,
int64 folder_id, Cancellable? cancellable) throws Error {
int64 folder_id, bool include_marked, Cancellable? cancellable) throws Error {
Transaction locked = yield obtain_lock_async(transaction, "MessageLocationTable.fetch_position_async",
cancellable);
SQLHeavy.Query query = locked.prepare(
"SELECT id FROM MessageLocationTable WHERE folder_id = ? ORDER BY ordering");
"SELECT id FROM MessageLocationTable WHERE folder_id = ? %s ".printf(include_marked ? "" :
"AND remove_marker = 0") + "ORDER BY ordering");
query.bind_int64(0, folder_id);
SQLHeavy.QueryResult results = yield query.execute_async(cancellable);
@ -232,7 +257,8 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
"MessageLocationTable.fetch_message_position_async", cancellable);
SQLHeavy.Query query = locked.prepare(
"SELECT message_id FROM MessageLocationTable WHERE folder_id=? ORDER BY ordering");
"SELECT message_id FROM MessageLocationTable WHERE folder_id=? AND remove_marker = 0 "
+ "ORDER BY ordering");
query.bind_int64(0, folder_id);
SQLHeavy.QueryResult results = yield query.execute_async(cancellable);
@ -251,12 +277,13 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
}
public async int fetch_count_for_folder_async(Transaction? transaction,
int64 folder_id, Cancellable? cancellable) throws Error {
int64 folder_id, bool include_removed, Cancellable? cancellable) throws Error {
Transaction locked = yield obtain_lock_async(transaction,
"MessageLocationTable.fetch_count_for_folder_async", cancellable);
SQLHeavy.Query query = locked.prepare(
"SELECT COUNT(*) FROM MessageLocationTable WHERE folder_id = ?");
"SELECT COUNT(*) FROM MessageLocationTable WHERE folder_id = ? %s".printf(
include_removed ? "" : "AND remove_marker = 0"));
query.bind_int64(0, folder_id);
SQLHeavy.QueryResult results = yield query.execute_async(cancellable);
@ -275,7 +302,8 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
"MessageLocationTable.does_ordering_exist_async", cancellable);
SQLHeavy.Query query = locked.prepare(
"SELECT message_id FROM MessageLocationTable WHERE folder_id = ? AND ordering = ?");
"SELECT message_id FROM MessageLocationTable WHERE folder_id = ? AND ordering = ? "
+ "AND remove_marker = 0");
query.bind_int64(0, folder_id);
query.bind_int64(1, ordering);
@ -294,7 +322,8 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
"MessageLocationTable.get_earliest_ordering_async", cancellable);
SQLHeavy.Query query = locked.prepare(
"SELECT MIN(ordering) FROM MessageLocationTable WHERE folder_id = ?");
"SELECT MIN(ordering) FROM MessageLocationTable WHERE folder_id = ? " +
"AND remove_marker = 0");
query.bind_int64(0, folder_id);
SQLHeavy.QueryResult result = yield query.execute_async(cancellable);
@ -326,5 +355,40 @@ public class Geary.Sqlite.MessageLocationTable : Geary.Sqlite.Table {
return true;
}
// Marks the given message as removed if "remove" is true, otherwise marks
// it as non-removed.
public async void mark_removed_async(Transaction? transaction, int64 folder_id, int64 ordering,
bool remove, Cancellable? cancellable) throws Error {
Transaction locked = yield obtain_lock_async(transaction,
"MessageLocationTable.mark_removed_async", cancellable);
SQLHeavy.Query query = locked.prepare(
"UPDATE MessageLocationTable SET remove_marker = ? WHERE folder_id = ? AND ordering = ?");
query.bind_int(0, (int) remove);
query.bind_int64(1, folder_id);
query.bind_int64(2, ordering);
yield query.execute_async(cancellable);
locked.set_commit_required();
yield release_lock_async(transaction, locked, cancellable);
}
public async bool is_marked_removed_async(Transaction? transaction, int64 folder_id,
int64 ordering, Cancellable? cancellable) throws Error {
Transaction locked = yield obtain_lock_async(transaction,
"MessageLocationTable.is_mark_removed_async", cancellable);
SQLHeavy.Query query = locked.prepare(
"SELECT remove_marker FROM MessageLocationTable WHERE folder_id = ? AND ordering = ?");
query.bind_int64(0, folder_id);
query.bind_int64(1, ordering);
SQLHeavy.QueryResult results = yield query.execute_async(cancellable);
return (bool) results.fetch_int(0);
}
}

View file

@ -89,8 +89,11 @@ def build(bld):
'../engine/impl/geary-generic-imap-folder.vala',
'../engine/impl/geary-gmail-account.vala',
'../engine/impl/geary-local-interfaces.vala',
'../engine/impl/geary-receive-replay-operations.vala',
'../engine/impl/geary-receive-replay-queue.vala',
'../engine/impl/geary-remote-interfaces.vala',
'../engine/impl/geary-replay-queue.vala',
'../engine/impl/geary-send-replay-operations.vala',
'../engine/impl/geary-send-replay-queue.vala',
'../engine/nonblocking/nonblocking-abstract-semaphore.vala',
'../engine/nonblocking/nonblocking-batch.vala',