Replay queue not closing on connection hang or drop: Closes #5321

All IMAP commands now have a timeout which, if triggered, causes
the connection to be forcibly closed.  ConversationMonitor will
reestablish connection and start a re-sychronization.
This commit is contained in:
Jim Nelson 2012-06-06 16:36:52 -07:00
parent d527998d1b
commit 10599a5a42
8 changed files with 386 additions and 42 deletions

View file

@ -157,6 +157,7 @@ engine/util/util-scheduler.vala
engine/util/util-singleton.vala
engine/util/util-stream.vala
engine/util/util-string.vala
engine/util/util-timer-pool.vala
engine/util/util-trillian.vala
)

View file

@ -13,6 +13,7 @@ public errordomain Geary.ImapError {
UNAUTHENTICATED,
NOT_SUPPORTED,
NOT_SELECTED,
INVALID_PATH
INVALID_PATH,
TIMED_OUT
}

View file

@ -23,6 +23,12 @@ public class Geary.Imap.ClientConnection {
public const uint DEFAULT_TIMEOUT_SEC = ClientSession.MIN_KEEPALIVE_SEC + 15;
public const uint RECOMMENDED_TIMEOUT_SEC = ClientSession.RECOMMENDED_KEEPALIVE_SEC + 15;
/**
* The default timeout for an issued command to result in a response code from the server.
* A timed-out command will result in the connection being forcibly closed.
*/
public const uint DEFAULT_COMMAND_TIMEOUT_SEC = 15;
private const int FLUSH_TIMEOUT_MSEC = 100;
private enum State {
@ -69,6 +75,8 @@ public class Geary.Imap.ClientConnection {
// Used solely for debugging
private static int next_cx_id = 0;
public uint command_timeout_sec { get; set; default = DEFAULT_COMMAND_TIMEOUT_SEC; }
private Geary.Endpoint endpoint;
private int cx_id;
private Geary.State.Machine fsm;
@ -82,6 +90,7 @@ public class Geary.Imap.ClientConnection {
private bool idle_when_quiet = false;
private Gee.HashSet<Tag> posted_idle_tags = new Gee.HashSet<Tag>(Hashable.hash_func,
Equalable.equal_func);
private TimerPool<Tag> cmd_timer = new TimerPool<Tag>(Hashable.hash_func, Equalable.equal_func);
public virtual signal void connected() {
Logging.debug(Logging.Flag.NETWORK, "[%s] connected to %s", to_string(),
@ -190,6 +199,8 @@ public class Geary.Imap.ClientConnection {
fsm = new Geary.State.Machine(machine_desc, mappings, on_bad_transition);
fsm.set_logging(false);
cmd_timer.timed_out.connect(on_cmd_timed_out);
}
/**
@ -277,14 +288,18 @@ public class Geary.Imap.ClientConnection {
connected();
des.xon();
yield des.start_async();
}
public async void disconnect_async(Cancellable? cancellable = null) throws Error {
if (cx == null)
return;
des.xoff();
// To guard against reentrancy
SocketConnection close_cx = cx;
cx = null;
// unschedule before yielding to stop the Deserializer
unschedule_flush_timeout();
des.parameters_ready.disconnect(on_parameters_ready);
@ -292,18 +307,21 @@ public class Geary.Imap.ClientConnection {
des.deserialize_failure.disconnect(on_deserialize_failure);
des.eos.disconnect(on_eos);
yield des.stop_async();
// TODO: May need to commit Serializer before disconnecting
ser = null;
des = null;
Error? close_err = null;
try {
yield cx.close_async(Priority.DEFAULT, cancellable);
debug("[%s] Disconnecting...", to_string());
yield close_cx.close_async(Priority.DEFAULT, cancellable);
debug("[%s] Disconnected", to_string());
} catch (Error err) {
debug("[%s] Error disconnecting: %s", to_string(), err.message);
close_err = err;
} finally {
cx = null;
ser = null;
des = null;
fsm.issue(Event.DISCONNECTED);
if (close_err != null)
@ -351,6 +369,23 @@ public class Geary.Imap.ClientConnection {
recv_closed();
}
private void on_cmd_timed_out(Tag tag) {
debug("[%s] on_cmd_timed_out: %s", to_string(), tag.to_string());
// one timeout is enough; recv error is reported and it's up to caller to nuke connection
cmd_timer.timed_out.disconnect(on_cmd_timed_out);
// turn off graceful disconnect ... if the connection is hung, don't want to be stalled
// trying to flush the pipe
TcpConnection? tcp_cx = cx as TcpConnection;
if (tcp_cx != null)
tcp_cx.set_graceful_disconnect(false);
receive_failure(new ImapError.TIMED_OUT("No response to command %s after %u seconds",
tag.to_string(), command_timeout_sec));
}
// TODO: Guard against reentrancy
public async void send_async(Command cmd, Cancellable? cancellable = null) throws Error {
check_for_connection();
@ -370,8 +405,15 @@ public class Geary.Imap.ClientConnection {
// an IMAP requirement but makes tracing commands easier.)
cmd.assign_tag(generate_tag());
// set the timeout on this command; note that a zero-second timeout means no timeout,
// and that there's no timeout on serialization
bool started = cmd_timer.start(cmd.tag, command_timeout_sec);
assert(started);
Error? ser_err = null;
try {
// TODO: Make serialize non-blocking; this would also remove the need for a send_mutex
// (although reentrancy should still be checked for)
yield cmd.serialize(ser);
} catch (Error err) {
debug("[%s] Error serializing command: %s", to_string(), err.message);
@ -422,6 +464,10 @@ public class Geary.Imap.ClientConnection {
// Like send_async(), need to use mutex when flushing as OutputStream must be accessed in
// serialized fashion
//
// NOTE: Because this is happening in the background, it's possible for ser to go to null
// after any yield (if a close occurs while blocking); this is why all the checking is
// required
int token = NonblockingMutex.INVALID_TOKEN;
try {
token = yield send_mutex.claim_async();
@ -496,7 +542,13 @@ public class Geary.Imap.ClientConnection {
}
private void signal_status_response(void *user, Object? object) {
received_status_response((StatusResponse) object);
StatusResponse status_response = (StatusResponse) object;
// stop the countdown timer on the associated command
bool cancelled = cmd_timer.cancel(status_response.tag);
assert(cancelled);
received_status_response(status_response);
}
private void signal_continuation(void *user, Object? object) {

View file

@ -173,6 +173,7 @@ public class Geary.Imap.ClientSession {
private Geary.Endpoint endpoint;
private Geary.AccountInformation account_info;
private Geary.State.Machine fsm;
private ImapError not_connected_err;
private ClientConnection? cx = null;
private string? current_mailbox = null;
private bool current_mailbox_readonly = false;
@ -238,6 +239,8 @@ public class Geary.Imap.ClientSession {
this.endpoint = endpoint;
this.account_info = account_info;
not_connected_err = new ImapError.NOT_CONNECTED("Not connected to %s", endpoint.to_string());
Geary.State.Mapping[] mappings = {
new Geary.State.Mapping(State.DISCONNECTED, Event.CONNECT, on_connect),
new Geary.State.Mapping(State.DISCONNECTED, Event.LOGIN, on_early_command),
@ -361,6 +364,18 @@ public class Geary.Imap.ClientSession {
fsm.set_logging(false);
}
~ClientSession() {
switch (fsm.get_state()) {
case State.DISCONNECTED:
case State.BROKEN:
// no problem-o
break;
default:
error("[%s] ClientSession ref dropped while still active", to_string());
}
}
public string? get_current_mailbox() {
return current_mailbox;
}
@ -464,6 +479,15 @@ public class Geary.Imap.ClientSession {
cx.deserialize_failure.disconnect(on_network_receive_failure);
cx = null;
// if there are any outstanding commands waiting for responses, wake them up now
if (tag_cb.size > 0) {
debug("[%s] Cancelling %d pending commands", to_string(), tag_cb.size);
foreach (Tag tag in tag_cb.keys)
Scheduler.on_idle(tag_cb.get(tag).callback);
tag_cb.clear();
}
}
private void on_connect_completed(Object? source, AsyncResult result) {
@ -1118,10 +1142,8 @@ public class Geary.Imap.ClientSession {
private async AsyncCommandResponse issue_command_async(Command cmd, Object? user = null,
Cancellable? cancellable = null) {
if (cx == null) {
return new AsyncCommandResponse(null, user,
new ImapError.NOT_CONNECTED("Not connected to %s", endpoint.to_string()));
}
if (cx == null)
return new AsyncCommandResponse(null, user, not_connected_err);
int claim_stub = NonblockingMutex.INVALID_TOKEN;
if (!account_info.imap_server_pipeline) {
@ -1134,6 +1156,10 @@ public class Geary.Imap.ClientSession {
}
}
// watch for connection dropped after waiting for pipeline mutex
if (cx == null)
return new AsyncCommandResponse(null, user, not_connected_err);
try {
yield cx.send_async(cmd, cancellable);
} catch (Error send_err) {
@ -1155,7 +1181,13 @@ public class Geary.Imap.ClientSession {
}
CommandResponse? cmd_response = tag_response.get(cmd.tag);
assert(cmd_response != null);
if (cmd_response == null) {
// this only happens when disconnected while waiting for response
assert(cx == null);
return new AsyncCommandResponse(null, user, not_connected_err);
}
assert(cmd_response.is_sealed());
assert(cmd_response.status_response.tag.equals(cmd.tag));

View file

@ -7,13 +7,15 @@
/**
* The Deserializer performs asynchronous I/O on a supplied input stream and transforms the raw
* bytes into IMAP Parameters (which can then be converted into ServerResponses or ServerData).
* The Deserializer will only begin reading from the stream when xon() is called. Calling xoff()
* will flow control the stream, halting reading without closing the stream itself. Since all
* results from the Deserializer are reported via signals, those signals should be connected to
* prior to calling xon(), or the caller risks missing early messages. (Note that since
* Deserializer uses async I/O, this isn't technically possible unless the signals are connected
* after the Idle loop has a chance to run; however, this is an implementation detail and shouldn't
* be relied upon.)
* The Deserializer will only begin reading from the stream when start_async() is called. Calling
* stop_async() will halt reading without closing the stream itself. A Deserializer may not be
* reused once stop_async() has been invoked.
*
* Since all results from the Deserializer are reported via signals, those signals should be
* connected to prior to calling start_async(), or the caller risks missing early messages. (Note
* that since Deserializer uses async I/O, this isn't technically possible unless the signals are
* connected after the Idle loop has a chance to run; however, this is an implementation detail and
* shouldn't be relied upon.)
*/
public class Geary.Imap.Deserializer {
@ -63,18 +65,17 @@ public class Geary.Imap.Deserializer {
private DataInputStream dins;
private Geary.State.Machine fsm;
private ListParameter context;
private Cancellable? cancellable = null;
private NonblockingSemaphore closed_semaphore = new NonblockingSemaphore();
private Geary.MidstreamConverter midstream = new Geary.MidstreamConverter("Deserializer");
private RootParameters root = new RootParameters();
private StringBuilder? current_string = null;
private size_t literal_length_remaining = 0;
private Geary.Memory.GrowableBuffer? block_buffer = null;
private unowned uint8[]? current_buffer = null;
private bool flow_controlled = true;
private int ins_priority = Priority.DEFAULT;
private char[] atom_specials_exceptions = { ' ', ' ', '\0' };
public signal void flow_control(bool xon);
public signal void parameters_ready(RootParameters root);
public signal void eos();
@ -122,31 +123,40 @@ public class Geary.Imap.Deserializer {
return midstream.install(converter);
}
public void xon(int priority = GLib.Priority.DEFAULT) {
if (!flow_controlled || get_mode() == Mode.FAILED)
return;
public async void start_async(int priority = GLib.Priority.DEFAULT) throws Error {
if (cancellable != null)
throw new EngineError.ALREADY_OPEN("Deserializer already open");
flow_controlled = false;
if (get_mode() == Mode.FAILED)
throw new EngineError.ALREADY_CLOSED("Deserializer failed");
if (cancellable.is_cancelled())
throw new EngineError.ALREADY_CLOSED("Deserializer closed");
cancellable = new Cancellable();
ins_priority = priority;
next_deserialize_step();
flow_control(true);
}
public void xoff() {
if (flow_controlled || get_mode() == Mode.FAILED)
public async void stop_async() throws Error {
// quietly fail when not opened or already closed
if (cancellable == null || cancellable.is_cancelled() || get_mode() == Mode.FAILED)
return;
flow_controlled = true;
// cancel any outstanding I/O
cancellable.cancel();
flow_control(false);
// wait for outstanding I/O to exit
debug("Waiting for deserializer to close...");
yield closed_semaphore.wait_async();
debug("Deserializer closed");
}
private void next_deserialize_step() {
switch (get_mode()) {
case Mode.LINE:
dins.read_line_async.begin(ins_priority, null, on_read_line);
dins.read_line_async.begin(ins_priority, cancellable, on_read_line);
break;
case Mode.BLOCK:
@ -158,7 +168,11 @@ public class Geary.Imap.Deserializer {
current_buffer = block_buffer.allocate(
size_t.min(MAX_BLOCK_READ_SIZE, literal_length_remaining));
dins.read_async.begin(current_buffer, ins_priority, null, on_read_block);
dins.read_async.begin(current_buffer, ins_priority, cancellable, on_read_block);
break;
case Mode.FAILED:
// do nothing; Deserializer is effectively closed
break;
default:
@ -178,13 +192,16 @@ public class Geary.Imap.Deserializer {
push_line(line);
} catch (Error err) {
receive_failure(err);
// only Cancellable allowed is internal used to notify when closed
if (err is IOError.CANCELLED)
closed_semaphore.blind_notify();
else
receive_failure(err);
return;
}
if (!flow_controlled)
next_deserialize_step();
next_deserialize_step();
}
private void on_read_block(Object? source, AsyncResult result) {
@ -196,13 +213,16 @@ public class Geary.Imap.Deserializer {
push_data(bytes_read);
} catch (Error err) {
receive_failure(err);
// only Cancellable allowed is internal used to notify when closed
if (err is IOError.CANCELLED)
closed_semaphore.blind_notify();
else
receive_failure(err);
return;
}
if (!flow_controlled)
next_deserialize_step();
next_deserialize_step();
}
// Push a line (without the CRLF!).

View file

@ -88,6 +88,17 @@ public abstract class Geary.NonblockingAbstractSemaphore {
reset();
}
/**
* Calls notify() without throwing an Exception, which is merely logged if encountered.
*/
public void blind_notify() {
try {
notify();
} catch (Error err) {
message("Error notifying semaphore: %s", err.message);
}
}
public async void wait_async(Cancellable? cancellable = null) throws Error {
for (;;) {
check_user_cancelled(cancellable);

View file

@ -7,14 +7,27 @@
public interface Geary.Comparable {
public abstract int compare(Comparable other);
/**
* A CompareFunc for any object that implements Comparable.
*/
public static int compare_func(void *a, void *b) {
return ((Comparable *) a)->compare((Comparable *) b);
}
/**
* A CompareFunc for DateTime.
*/
public static int date_time_compare(void *a, void *b) {
return ((DateTime) a).compare((DateTime) b);
}
}
public interface Geary.Equalable {
public abstract bool equals(Equalable other);
/**
* An EqualFunc for any object that implements Equalable.
*/
public static bool equal_func(void *a, void *b) {
return ((Equalable *) a)->equals((Equalable *) b);
}
@ -25,11 +38,21 @@ public interface Geary.Equalable {
public static bool bare_int64_equals(void *a, void *b) {
return *((int64 *) a) == *((int64 *) b);
}
/**
* An EqualFunc for DateTime.
*/
public static bool date_time_equal(void *a, void *b) {
return ((DateTime) a).equal((DateTime) b);
}
}
public interface Geary.Hashable {
public abstract uint to_hash();
/**
* A HashFunc for any object that implements Hashable.
*/
public static uint hash_func(void *ptr) {
return ((Hashable *) ptr)->to_hash();
}
@ -48,6 +71,13 @@ public interface Geary.Hashable {
return hash_memory(ptr, sizeof(int64));
}
/**
* A HashFunc for DateTime.
*/
public static uint date_time_hash(void *a) {
return ((DateTime) a).hash();
}
/**
* A rotating-XOR hash that can be used to hash memory buffers of any size. Use only if
* equality is determined by memory contents.

View file

@ -0,0 +1,197 @@
/* Copyright 2012 Yorba Foundation
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* A TimerPool accepts items of type G with an associated timeout in seconds (a count of seconds
* that, in the future, indicates a timeout). TimerPool checks for timeouts in an Idle task that
* fires the "timed-out" signal when a late item is detected.
*
* Submit items to a TimerPool with start(). If the item can be removed from the TimerPool (its
* associated work has completed for example), remove it with cancel(). All items can be cancelled
* with cancel_all(), which effectively clears the queue.
*
* It's assumed that the timeout case is not common, and so TimerPool is coded for the efficiency
* of start() and cancel().
*
* While making all attempts for chronological accuracy, due to the nature of the MainLoop, there's
* no guarantee that the signal will fire precisely the number of seconds later.
*
* TODO: Implementation is not the most efficient, especially if the timeout case is considered
* rare, in which case efficiency should be the goal for start() and cancel().
*/
public class Geary.TimerPool<G> : Object {
private unowned HashFunc? hash_func;
private unowned EqualFunc? equal_func;
private uint timeout_id = 0;
private DateTime? next_timeout_check = null;
private Gee.TreeMap<DateTime, Gee.HashSet<G>> timeouts;
private Gee.HashMap<G, DateTime> timeout_lookup;
public signal void timed_out(G item);
public TimerPool(HashFunc? hash_func, EqualFunc? equal_func) {
this.hash_func = hash_func;
this.equal_func = equal_func;
timeouts = new Gee.TreeMap<DateTime, Gee.HashSet<G>>(Comparable.date_time_compare);
timeout_lookup = new Gee.HashMap<G, DateTime>(hash_func, equal_func, Equalable.date_time_equal);
}
~TimerPool() {
if (timeout_id != 0)
Source.remove(timeout_id);
}
/**
* Note that multiple "equal" items may be added via start(). The timeout will be replaced for
* that item.
*
* If timeout_sec is zero, no timeout is schedule and start() returns false.
*/
public bool start(G item, uint timeout_sec) {
if (timeout_sec == 0)
return false;
DateTime now = new DateTime.now_local();
DateTime timeout = now.add_seconds(timeout_sec);
// add to cmd_timeouts sorted tree, creating new HashSet to hold all the commands for this
// timeout if necessary
Gee.HashSet<G>? pool = timeouts.get(timeout);
if (pool == null) {
pool = new Gee.HashSet<G>(hash_func, equal_func);
timeouts.set(timeout, pool);
}
pool.add(item);
// add to reverse lookup table
timeout_lookup.set(item, timeout);
// if no timeout check scheduled or the next timeout is too far in the future, (re)schedule
if (next_timeout_check == null || timeout.compare(next_timeout_check) < 0) {
if (timeout_id != 0)
Source.remove(timeout_id);
timeout_id = Timeout.add_seconds(timeout_sec, on_check_for_timeouts);
next_timeout_check = timeout;
}
return true;
}
/**
* Returns false if the item is not found.
*/
public bool cancel(G item) {
// lookup the timeout on this item, removing it from the lookup table in the process
DateTime timeout;
bool removed = timeout_lookup.unset(item, out timeout);
if (!removed)
return false;
// fetch the pool of items for this timeout
Gee.HashSet<G>? pool = timeouts.get(timeout);
if (pool == null)
return false;
// remove from the pool
removed = pool.remove(item);
if (!removed)
return false;
// if the pool is empty, remove it from the timeout queue entirely
if (pool.size == 0)
timeouts.unset(timeout);
// If no more timeouts, no reason to perform background checking
if (timeouts.size == 0) {
assert(timeout_lookup.size == 0);
if (timeout_id != 0) {
Source.remove(timeout_id);
timeout_id = 0;
next_timeout_check = null;
}
}
return true;
}
/**
* Cancels all outstanding items.
*/
public void cancel_all() {
timeouts.clear();
timeout_lookup.clear();
if (timeout_id != 0) {
Source.remove(timeout_id);
timeout_id = 0;
next_timeout_check = null;
}
}
private bool on_check_for_timeouts() {
DateTime now = new DateTime.now_local();
// create a list of times and items that have timed out rather than signal them as they're
// discovered... this allows for reentrancy inside a signal handlers
Gee.HashSet<DateTime>? timed_out_times = null;
next_timeout_check = null;
foreach (DateTime timeout in timeouts.keys) {
// cmd_timeouts is sorted, so stop as soon as timeout is hit that's in the future
if (timeout.compare(now) > 0) {
next_timeout_check = timeout;
break;
}
if (timed_out_times == null) {
timed_out_times = new Gee.HashSet<DateTime>(Hashable.date_time_hash,
Equalable.date_time_equal);
}
timed_out_times.add(timeout);
}
// remove everything that's timed out from the queue
if (timed_out_times != null) {
Gee.HashSet<G>? timed_out_items = new Gee.HashSet<G>(hash_func, equal_func);
foreach (DateTime timeout in timed_out_times) {
Gee.HashSet<G> pool;
bool removed = timeouts.unset(timeout, out pool);
assert(removed);
timed_out_items.add_all(pool);
}
// report all the timed out items
if (timed_out_items != null) {
foreach (G item in timed_out_items)
timed_out(item);
}
}
// one-shot; exit this method but reschedule for next timeout, if one is present
if (next_timeout_check != null) {
TimeSpan diff = next_timeout_check.difference(now);
// TimeSpan is in microseconds ... min. of 1 because, if got here, there's at least
// one item on the timeout queue and don't want it to be left there
uint diff_sec = (uint) (diff / 1000000);
if (diff_sec == 0)
diff_sec = 1;
timeout_id = Timeout.add_seconds(diff_sec, on_check_for_timeouts);
} else {
timeout_id = 0;
}
return false;
}
}