From b74190e0108e25b9678f4fb90c30516b9569ae68 Mon Sep 17 00:00:00 2001 From: Michael James Gratton Date: Tue, 10 Jul 2018 16:52:12 +1000 Subject: [PATCH] Don't send commands while another has an active command continuation. Issue #26 underscored Geary's buggy behaviour in sending commands while an IDLE command was active - when a new command was queued, DONE would be sent but then the new command would also be sent without waiting for the status response for the IDLE command from the server, in violation of RFC 3501 sections 2.2.1 and 5.5. A quick fix was to ensure that the new command was held while DEIDLING, but because of the way the synchronization process between ClientConnection and the Serialiser works, there is a race that allows any further commands added to be sent while the IDLE DONE had not yet been flushed: DONE gets queued, additional command gets queued, then both get flushed. Fixing this required re-working how ClientConnection and Serializer interacted, which was a major re-write. Serializer no longer buffers any data - it simply writes it out to its underlying stream. ClientConnection now queues commands and writes them out one by one, requiring Command implementations to manage stalling the queue until their expected completions have been received. Literals are hence written by their command's serialisation process, which gets prompted by an event from ClientConnection. The two commands that require non-literal continuations, IDLE and AUTHENTICATE, now manage that themselves. As a result, only one command can be serialised at a time, and no additional commands can be serialised while another is still waiting for a continuation. Also, command-specific details can be implemented by command itself, without needing workarounds in the serialiser, connection, or session. --- .../command/imap-authenticate-command.vala | 74 +- src/engine/imap/command/imap-command.vala | 255 +++- .../imap/command/imap-idle-command.vala | 73 +- .../parameter/imap-literal-parameter.vala | 9 + .../transport/imap-client-connection.vala | 1255 +++++------------ .../imap/transport/imap-client-session.vala | 91 +- .../imap/transport/imap-serializer.vala | 261 +--- 7 files changed, 779 insertions(+), 1239 deletions(-) diff --git a/src/engine/imap/command/imap-authenticate-command.vala b/src/engine/imap/command/imap-authenticate-command.vala index 70b5328b..80e3b962 100644 --- a/src/engine/imap/command/imap-authenticate-command.vala +++ b/src/engine/imap/command/imap-authenticate-command.vala @@ -21,10 +21,16 @@ public class Geary.Imap.AuthenticateCommand : Command { public string method { get; private set; } + private LiteralParameter? response_literal = null; + private bool serialised = false; + private Geary.Nonblocking.Spinlock error_lock; + private GLib.Cancellable error_cancellable = new GLib.Cancellable(); + private AuthenticateCommand(string method, string data) { base(NAME, { method, data }); this.method = method; + this.error_lock = new Geary.Nonblocking.Spinlock(this.error_cancellable); } public AuthenticateCommand.oauth2(string user, string token) { @@ -34,19 +40,65 @@ public class Geary.Imap.AuthenticateCommand : Command { this(OAUTH2_METHOD, encoded_token); } - public ContinuationParameter - continuation_requested(ContinuationResponse response) - throws ImapError { - if (this.method != AuthenticateCommand.OAUTH2_METHOD) { - throw new ImapError.INVALID("Unexpected continuation request"); + /** Waits after serialisation has completed for authentication. */ + public override async void serialize(Serializer ser, + GLib.Cancellable cancellable) + throws GLib.Error { + yield base.serialize(ser, cancellable); + this.serialised = true; + + // Need to manually flush here since the connection will be + // waiting this to complete before do so. + yield ser.flush_stream(cancellable); + + // Wait to either get a response or a continuation request + yield this.error_lock.wait_async(cancellable); + if (this.response_literal != null) { + yield this.response_literal.serialize_data(ser, cancellable); + ser.push_eol(cancellable); + yield ser.flush_stream(cancellable); } - // Continuation will be a Base64 encoded JSON blob and which - // indicates a login failure. We don't really care about that - // (do we?) though since once we acknowledge it with a - // zero-length response the server will respond with an IMAP - // error. - return new ContinuationParameter(new uint8[0]); + yield wait_until_complete(cancellable); + } + + public override void cancel_serialization() { + base.cancel_serialization(); + this.error_cancellable.cancel(); + } + + public override void completed(StatusResponse new_status) + throws ImapError { + this.error_lock.blind_notify(); + base.completed(new_status); + } + + public override void continuation_requested(ContinuationResponse response) + throws ImapError { + if (!this.serialised) { + // Allow any args sent as literals to be processed + // normally + base.continuation_requested(response); + } else { + if (this.method != AuthenticateCommand.OAUTH2_METHOD || + this.response_literal != null) { + cancel_serialization(); + throw new ImapError.INVALID( + "Unexpected AUTHENTICATE continuation request" + ); + } + + // Continuation will be a Base64 encoded JSON blob and which + // indicates a login failure. We don't really care about that + // (do we?) though since once we acknowledge it with a + // zero-length response the server will respond with an IMAP + // error. + this.response_literal = new LiteralParameter( + Geary.Memory.EmptyBuffer.instance + ); + // Notify serialisation to continue + this.error_lock.blind_notify(); + } } public override string to_string() { diff --git a/src/engine/imap/command/imap-command.vala b/src/engine/imap/command/imap-command.vala index a6e48179..d688cc4a 100644 --- a/src/engine/imap/command/imap-command.vala +++ b/src/engine/imap/command/imap-command.vala @@ -45,6 +45,17 @@ public class Geary.Imap.Command : BaseObject { protected ListParameter args { get; private set; default = new RootParameters(); } + + /** The status response for the command, once it has been received. */ + public StatusResponse? status { get; private set; default = null; } + + private Geary.Nonblocking.Semaphore complete_lock = + new Geary.Nonblocking.Semaphore(); + + private Geary.Nonblocking.Spinlock? literal_spinlock = null; + private GLib.Cancellable? literal_cancellable = null; + + /** * Constructs a new command with an unassigned tag. * @@ -62,52 +73,208 @@ public class Geary.Imap.Command : BaseObject { } } } - - /** - * - */ - } - - private void stock_params() { - add(tag); - add(new AtomParameter(name)); - if (args != null) { - foreach (string arg in args) - add(Parameter.get_for_string(arg)); - } - } - - /** - * Assign a {@link Tag} to a {@link Command} with an unassigned placeholder Tag. - * - * Can only be called on a Command that holds an unassigned Tag. Thus, this can only be called - * once at most, and zero times if Command.assigned() was used to generate the Command. - * Fires an assertion if either of these cases is true, or if the supplied Tag is unassigned. - */ - public void assign_tag(Tag tag) { - assert(!this.tag.is_assigned()); - assert(tag.is_assigned()); - - this.tag = tag; - - // Tag is always at index zero. - try { - Parameter param = replace(0, tag); - assert(param is Tag); - } catch (ImapError err) { - error("Unable to assign Tag for command %s: %s", to_string(), err.message); - } - } - + public bool has_name(string name) { return Ascii.stri_equal(this.name, name); } - - public override void serialize(Serializer ser, Tag tag) throws Error { - assert(tag.is_assigned()); - - base.serialize(ser, tag); - ser.push_end_of_message(); + + /** + * Assign a Tag to this command, if currently unassigned. + * + * Can only be called on a Command that holds an unassigned Tag. + * Thus, this can only be called once at most, and zero times if + * Command.assigned() was used to generate the Command. Fires an + * assertion if either of these cases is true, or if the supplied + * Tag is unassigned. + */ + public void assign_tag(Tag new_tag) throws ImapError { + if (this.tag.is_assigned()) { + throw new ImapError.SERVER_ERROR( + "%s: Command tag is already assigned", to_brief_string() + ); + } + if (!new_tag.is_assigned()) { + throw new ImapError.SERVER_ERROR( + "%s: New tag is not assigned", to_brief_string() + ); + } + + this.tag = new_tag; + } + + /** + * Serialises this command for transmission to the server. + * + * This will serialise its tag, name and arguments (if + * any). Arguments are treated as strings and escaped as needed, + * including being encoded as a literal. If any literals are + * required, this method will yield until a command continuation + * has been received, when it will resume the same process. + */ + public virtual async void serialize(Serializer ser, + GLib.Cancellable cancellable) + throws GLib.Error { + this.tag.serialize(ser, cancellable); + ser.push_space(cancellable); + ser.push_unquoted_string(this.name, cancellable); + + if (this.args != null) { + foreach (Parameter arg in this.args.get_all()) { + ser.push_space(cancellable); + arg.serialize(ser, cancellable); + + LiteralParameter literal = arg as LiteralParameter; + if (literal != null) { + // Need to manually flush after serialising the + // literal param, so it actually gets to the + // server + yield ser.flush_stream(cancellable); + + if (this.literal_spinlock == null) { + // Lazily create these since they usually + // won't be needed + this.literal_cancellable = new GLib.Cancellable(); + this.literal_spinlock = new Geary.Nonblocking.Spinlock( + this.literal_cancellable + ); + } + + // Will get notified via continuation_requested + // when server indicated the literal can be sent. + yield this.literal_spinlock.wait_async(cancellable); + yield literal.serialize_data(ser, cancellable); + } + } + } + + ser.push_eol(cancellable); + } + + /** + * Cancels any existing serialisation in progress. + * + * When this method is called, any non I/O related process + * blocking the blocking {@link serialize} must be cancelled. + */ + public virtual void cancel_serialization() { + if (this.literal_cancellable != null) { + this.literal_cancellable.cancel(); + } + } + + /** + * Yields until the command has been completed or cancelled. + * + * Throws an error if cancelled, or if the command's response was + * bad. + */ + public async void wait_until_complete(GLib.Cancellable cancellable) + throws GLib.Error { + yield this.complete_lock.wait_async(cancellable); + check_status(); + } + + /** + * Called when a tagged status response is received for this command. + * + * This will update the command's {@link status} property, then + * throw an error it does not indicate a successful completion. + */ + public virtual void completed(StatusResponse new_status) + throws ImapError { + if (this.status != null) { + cancel_serialization(); + throw new ImapError.SERVER_ERROR( + "%s: Duplicate status response received: %s", + to_brief_string(), + status.to_string() + ); + } + + this.status = new_status; + this.complete_lock.blind_notify(); + cancel_serialization(); + check_status(); + } + + /** + * Called when tagged server data is received for this command. + */ + public virtual void data_received(ServerData data) + throws ImapError { + if (this.status != null) { + cancel_serialization(); + throw new ImapError.SERVER_ERROR( + "%s: Server data received when command already complete: %s", + to_brief_string(), + data.to_string() + ); + } + // Nothing to do otherwise + } + + /** + * Called when a continuation was requested by the server. + * + * This will notify the command's literal spinlock so that if + * {@link serialize} is waiting to send a literal, it will do so + * now. + */ + public virtual void + continuation_requested(ContinuationResponse continuation) + throws ImapError { + if (this.status != null) { + cancel_serialization(); + throw new ImapError.SERVER_ERROR( + "%s: Continuation requested when command already complete", + to_brief_string() + ); + } + + if (this.literal_spinlock == null) { + cancel_serialization(); + throw new ImapError.SERVER_ERROR( + "%s: Continuation requested but no literals available", + to_brief_string() + ); + } + + this.literal_spinlock.blind_notify(); + } + + public virtual string to_string() { + string args = this.args.to_string(); + return (Geary.String.is_empty(args)) + ? "%s %s".printf(this.tag.to_string(), this.name) + : "%s %s %s".printf(this.tag.to_string(), this.name, args); + } + + private void check_status() throws ImapError { + if (this.status == null) { + throw new ImapError.SERVER_ERROR( + "%s: No command response was received", + to_brief_string() + ); + } + + if (!this.status.is_completion) { + throw new ImapError.SERVER_ERROR( + "%s: Command status response is not a completion: %s", + to_brief_string(), + this.status.to_string() + ); + } + + if (this.status.status != Status.OK) { + throw new ImapError.SERVER_ERROR( + "%s: Command failed: %s", + to_brief_string(), + this.status.to_string() + ); + } + } + + private string to_brief_string() { + return "%s %s".printf(this.tag.to_string(), this.name); } } - diff --git a/src/engine/imap/command/imap-idle-command.vala b/src/engine/imap/command/imap-idle-command.vala index e781b249..a2fac042 100644 --- a/src/engine/imap/command/imap-idle-command.vala +++ b/src/engine/imap/command/imap-idle-command.vala @@ -11,10 +11,71 @@ */ public class Geary.Imap.IdleCommand : Command { - public const string NAME = "idle"; - - public IdleCommand() { - base (NAME); - } -} + public const string NAME = "IDLE"; + + private const string DONE = "DONE"; + + /** Determines if the server has acknowledged the IDLE request. */ + public bool idle_started { get; private set; default = false; } + + private bool serialised = false; + private Geary.Nonblocking.Spinlock? exit_lock; + private GLib.Cancellable? exit_cancellable = new GLib.Cancellable(); + + + public IdleCommand() { + base(NAME); + this.exit_lock = new Geary.Nonblocking.Spinlock(this.exit_cancellable); + } + + public void exit_idle() { + this.exit_lock.blind_notify(); + } + + /** Waits after serialisation has completed for {@link exit_idle}. */ + public override async void serialize(Serializer ser, + GLib.Cancellable cancellable) + throws GLib.Error { + // Need to manually flush here since Dovecot doesn't like + // getting IDLE in the same buffer as other commands. + yield ser.flush_stream(cancellable); + + yield base.serialize(ser, cancellable); + this.serialised = true; + + // Need to manually flush again since the connection will be + // waiting this to complete before do so. + yield ser.flush_stream(cancellable); + + // Now wait for exit_idle() to be called, the server to send a + // status response, or everything to be cancelled. + yield this.exit_lock.wait_async(cancellable); + + // If we aren't closed already, try sending DONE to exit IDLE + if (this.status == null) { + ser.push_unquoted_string(DONE); + ser.push_eol(cancellable); + yield ser.flush_stream(cancellable); + } + + yield wait_until_complete(cancellable); + } + + public override void cancel_serialization() { + base.cancel_serialization(); + this.exit_cancellable.cancel(); + } + + public override void continuation_requested(ContinuationResponse response) + throws ImapError { + if (!this.serialised) { + // Allow any args sent as literals to be processed + // normally + base.continuation_requested(response); + } else { + this.idle_started = true; + } + } + +} diff --git a/src/engine/imap/parameter/imap-literal-parameter.vala b/src/engine/imap/parameter/imap-literal-parameter.vala index 6c9164c6..4d394697 100644 --- a/src/engine/imap/parameter/imap-literal-parameter.vala +++ b/src/engine/imap/parameter/imap-literal-parameter.vala @@ -64,4 +64,13 @@ public class Geary.Imap.LiteralParameter : Geary.Imap.Parameter { ser.push_eol(cancellable); } + /** + * Serialises the literal parameter data. + */ + public async void serialize_data(Serializer ser, + GLib.Cancellable cancellable) + throws GLib.Error { + yield ser.push_literal_data(buffer, cancellable); + } + } diff --git a/src/engine/imap/transport/imap-client-connection.vala b/src/engine/imap/transport/imap-client-connection.vala index 1b9eb635..3e4e4429 100644 --- a/src/engine/imap/transport/imap-client-connection.vala +++ b/src/engine/imap/transport/imap-client-connection.vala @@ -36,76 +36,19 @@ public class Geary.Imap.ClientConnection : BaseObject { public const uint RECOMMENDED_TIMEOUT_SEC = ClientSession.RECOMMENDED_KEEPALIVE_SEC + 15; /** - * The default timeout for an issued command to result in a response code from the server. - * - * @see command_timeout_sec + * Default timeout to wait for a server response for a command. */ public const uint DEFAULT_COMMAND_TIMEOUT_SEC = 30; - - private const int FLUSH_TIMEOUT_MSEC = 10; - - // At least one server out there requires this to be in caps - private const string IDLE_DONE = "DONE"; - - private enum State { - UNCONNECTED, - CONNECTED, - IDLING, - IDLE, - DEIDLING, - DEIDLING_SYNCHRONIZING, - SYNCHRONIZING, - DISCONNECTED, - - COUNT - } - - private static string state_to_string(uint state) { - return ((State) state).to_string(); - } - - private enum Event { - CONNECTED, - DISCONNECTED, - - // Use issue_conditional_event() for SEND events, using the result to determine whether - // or not to continue; the transition handlers do no signalling or I/O - SEND, - SEND_IDLE, - - // To initiate a command continuation request - SYNCHRONIZE, - - // RECVD_* will emit appropriate signals inside their transition handlers; do *not* use - // issue_conditional_event() for these events - RECVD_STATUS_RESPONSE, - RECVD_SERVER_DATA, - RECVD_CONTINUATION_RESPONSE, - - COUNT - } - - private static string event_to_string(uint event) { - return ((Event) event).to_string(); - } - - private static Geary.State.MachineDescriptor machine_desc = new Geary.State.MachineDescriptor( - "Geary.Imap.ClientConnection", State.UNCONNECTED, State.COUNT, Event.COUNT, - state_to_string, event_to_string); - + + /** + * Default timeout to wait for another command before going idle. + */ + public const uint DEFAULT_IDLE_TIMEOUT_SEC = 2; + // Used solely for debugging private static int next_cx_id = 0; - - /** - * The timeout in seconds before an uncompleted {@link Command} is considered abandoned. - * - * ClientConnection does not time out the initial greeting from the server (as there's no - * command associated with it). That's the responsibility of the caller. - * - * A timed-out command will result in the connection being forcibly closed. - */ - public uint command_timeout_sec { get; set; default = DEFAULT_COMMAND_TIMEOUT_SEC; } - + + /** * This identifier is used only for debugging, to differentiate connections from one another * in logs and debug output. @@ -131,27 +74,26 @@ public class Geary.Imap.ClientConnection : BaseObject { public bool idle_when_quiet = false; private Geary.Endpoint endpoint; - private Geary.State.Machine fsm; private SocketConnection? cx = null; private IOStream? ios = null; private Serializer? ser = null; private BufferedOutputStream? ser_buffer = null; private Deserializer? des = null; - private Geary.Nonblocking.Mutex send_mutex = new Geary.Nonblocking.Mutex(); - private Geary.Nonblocking.Semaphore synchronized_notifier = new Geary.Nonblocking.Semaphore(); - private Geary.Nonblocking.Event idle_notifier = new Geary.Nonblocking.Event(); + private int tag_counter = 0; private char tag_prefix = 'a'; - private uint flush_timeout_id = 0; - private Gee.HashSet posted_idle_tags = new Gee.HashSet(); - private int outstanding_idle_dones = 0; - private Tag? posted_synchronization_tag = null; - private StatusResponse? synchronization_status_response = null; - private bool waiting_for_idle_to_synchronize = false; - private uint timeout_id = 0; - private uint timeout_cmd_count = 0; - private int outstanding_cmds = 0; - + + private Geary.Nonblocking.Queue pending_queue = + new Geary.Nonblocking.Queue.fifo(); + private Gee.Queue sent_queue = new Gee.LinkedList(); + private Command? current_command = null; + + private TimeoutManager command_timer; + private TimeoutManager idle_timer; + + private GLib.Cancellable? open_cancellable = null; + + public virtual signal void connected() { Logging.debug(Logging.Flag.NETWORK, "[%s] connected to %s", to_string(), endpoint.to_string()); @@ -161,32 +103,15 @@ public class Geary.Imap.ClientConnection : BaseObject { Logging.debug(Logging.Flag.NETWORK, "[%s] disconnected from %s", to_string(), endpoint.to_string()); } - + public virtual signal void sent_command(Command cmd) { Logging.debug(Logging.Flag.NETWORK, "[%s S] %s", to_string(), cmd.to_string()); - - // track outstanding Command count to force switching to IDLE only when nothing outstanding - outstanding_cmds++; } - - public virtual signal void in_idle(bool idling) { - Logging.debug(Logging.Flag.NETWORK, "[%s] in idle: %s", to_string(), idling.to_string()); - - // fire the Event every time the IDLE state changes - idle_notifier.blind_notify(); - } - + public virtual signal void received_status_response(StatusResponse status_response) { Logging.debug(Logging.Flag.NETWORK, "[%s R] %s", to_string(), status_response.to_string()); - - // look for command completion, if no outstanding, schedule a flush timeout to switch to - // IDLE mode - if (status_response.is_completion) { - if (--outstanding_cmds == 0) - reschedule_flush_timeout(); - } } - + public virtual signal void received_server_data(ServerData server_data) { Logging.debug(Logging.Flag.NETWORK, "[%s R] %s", to_string(), server_data.to_string()); } @@ -224,95 +149,20 @@ public class Geary.Imap.ClientConnection : BaseObject { public virtual signal void close_error(Error err) { Logging.debug(Logging.Flag.NETWORK, "[%s] close error: %s", to_string(), err.message); } - - public ClientConnection(Geary.Endpoint endpoint) { + + + public ClientConnection(Geary.Endpoint endpoint, + uint command_timeout_sec = DEFAULT_COMMAND_TIMEOUT_SEC, + uint idle_timeout_sec = DEFAULT_IDLE_TIMEOUT_SEC) { this.endpoint = endpoint; - cx_id = next_cx_id++; - - Geary.State.Mapping[] mappings = { - new Geary.State.Mapping(State.UNCONNECTED, Event.CONNECTED, on_connected), - new Geary.State.Mapping(State.UNCONNECTED, Event.DISCONNECTED, on_disconnected), - - new Geary.State.Mapping(State.CONNECTED, Event.SEND, on_proceed), - new Geary.State.Mapping(State.CONNECTED, Event.SEND_IDLE, on_send_idle), - new Geary.State.Mapping(State.CONNECTED, Event.SYNCHRONIZE, on_synchronize), - new Geary.State.Mapping(State.CONNECTED, Event.RECVD_STATUS_RESPONSE, on_status_response), - new Geary.State.Mapping(State.CONNECTED, Event.RECVD_SERVER_DATA, on_server_data), - new Geary.State.Mapping(State.CONNECTED, Event.RECVD_CONTINUATION_RESPONSE, on_continuation), - new Geary.State.Mapping(State.CONNECTED, Event.DISCONNECTED, on_disconnected), - - new Geary.State.Mapping(State.IDLING, Event.SEND, on_idle_send), - new Geary.State.Mapping(State.IDLING, Event.SEND_IDLE, on_no_proceed), - new Geary.State.Mapping(State.IDLING, Event.SYNCHRONIZE, on_idle_synchronize), - new Geary.State.Mapping(State.IDLING, Event.RECVD_STATUS_RESPONSE, on_idle_status_response), - new Geary.State.Mapping(State.IDLING, Event.RECVD_SERVER_DATA, on_server_data), - new Geary.State.Mapping(State.IDLING, Event.RECVD_CONTINUATION_RESPONSE, on_idling_deidling_continuation), - new Geary.State.Mapping(State.IDLING, Event.DISCONNECTED, on_disconnected), - - new Geary.State.Mapping(State.IDLE, Event.SEND, on_idle_send), - new Geary.State.Mapping(State.IDLE, Event.SEND_IDLE, on_no_proceed), - new Geary.State.Mapping(State.IDLE, Event.SYNCHRONIZE, on_idle_synchronize), - new Geary.State.Mapping(State.IDLE, Event.RECVD_STATUS_RESPONSE, on_idle_status_response), - new Geary.State.Mapping(State.IDLE, Event.RECVD_SERVER_DATA, on_server_data), - new Geary.State.Mapping(State.IDLE, Event.RECVD_CONTINUATION_RESPONSE, on_idle_continuation), - new Geary.State.Mapping(State.IDLE, Event.DISCONNECTED, on_disconnected), - - new Geary.State.Mapping(State.DEIDLING, Event.SEND, on_proceed), - new Geary.State.Mapping(State.DEIDLING, Event.SEND_IDLE, on_send_idle), - new Geary.State.Mapping(State.DEIDLING, Event.SYNCHRONIZE, on_deidling_synchronize), - new Geary.State.Mapping(State.DEIDLING, Event.RECVD_STATUS_RESPONSE, on_idle_status_response), - new Geary.State.Mapping(State.DEIDLING, Event.RECVD_SERVER_DATA, on_server_data), - new Geary.State.Mapping(State.DEIDLING, Event.RECVD_CONTINUATION_RESPONSE, on_idling_deidling_continuation), - new Geary.State.Mapping(State.DEIDLING, Event.DISCONNECTED, on_disconnected), - - new Geary.State.Mapping(State.DEIDLING_SYNCHRONIZING, Event.SEND, on_proceed), - new Geary.State.Mapping(State.DEIDLING_SYNCHRONIZING, Event.SEND_IDLE, on_no_proceed), - new Geary.State.Mapping(State.DEIDLING_SYNCHRONIZING, Event.RECVD_STATUS_RESPONSE, - on_deidling_synchronizing_status_response), - new Geary.State.Mapping(State.DEIDLING_SYNCHRONIZING, Event.RECVD_SERVER_DATA, on_server_data), - new Geary.State.Mapping(State.DEIDLING_SYNCHRONIZING, Event.RECVD_CONTINUATION_RESPONSE, - on_synchronize_continuation), - new Geary.State.Mapping(State.DEIDLING_SYNCHRONIZING, Event.DISCONNECTED, on_disconnected), - - new Geary.State.Mapping(State.SYNCHRONIZING, Event.SEND, on_proceed), - new Geary.State.Mapping(State.SYNCHRONIZING, Event.SEND_IDLE, on_no_proceed), - new Geary.State.Mapping(State.SYNCHRONIZING, Event.RECVD_STATUS_RESPONSE, - on_synchronize_status_response), - new Geary.State.Mapping(State.SYNCHRONIZING, Event.RECVD_SERVER_DATA, on_server_data), - new Geary.State.Mapping(State.SYNCHRONIZING, Event.RECVD_CONTINUATION_RESPONSE, - on_synchronize_continuation), - new Geary.State.Mapping(State.SYNCHRONIZING, Event.DISCONNECTED, on_disconnected), - - // TODO: A DISCONNECTING state would be helpful here, allowing for responses and data - // received from the server after a send error caused a disconnect to be signalled to - // subscribers before moving to the DISCONNECTED state. That would require more work, - // allowing for the caller (ClientSession) to close the receive channel and wait for - // everything to flush out before it shifted to a DISCONNECTED state as well. - new Geary.State.Mapping(State.DISCONNECTED, Event.SEND, on_no_proceed), - new Geary.State.Mapping(State.DISCONNECTED, Event.SEND_IDLE, on_no_proceed), - new Geary.State.Mapping(State.DISCONNECTED, Event.SYNCHRONIZE, on_no_proceed), - new Geary.State.Mapping(State.DISCONNECTED, Event.RECVD_STATUS_RESPONSE, Geary.State.nop), - new Geary.State.Mapping(State.DISCONNECTED, Event.RECVD_SERVER_DATA, Geary.State.nop), - new Geary.State.Mapping(State.DISCONNECTED, Event.RECVD_CONTINUATION_RESPONSE, Geary.State.nop), - new Geary.State.Mapping(State.DISCONNECTED, Event.DISCONNECTED, Geary.State.nop) - }; - - fsm = new Geary.State.Machine(machine_desc, mappings, on_bad_transition); - fsm.set_logging(false); - } - - /** - * Generates a unique tag for the IMAP connection in the form of "<000-999>". - */ - private Tag generate_tag() { - // watch for odometer rollover - if (++tag_counter >= 1000) { - tag_counter = 0; - tag_prefix = (tag_prefix != 'z') ? tag_prefix + 1 : 'a'; - } - - // TODO This could be optimized, but we'll leave it for now. - return new Tag("%c%03d".printf(tag_prefix, tag_counter)); + this.cx_id = next_cx_id++; + + this.command_timer = new TimeoutManager.seconds( + command_timeout_sec, on_command_timeout + ); + this.idle_timer = new TimeoutManager.seconds( + idle_timeout_sec, on_idle_timeout + ); } public SocketAddress? get_remote_address() { @@ -340,53 +190,31 @@ public class Geary.Imap.ClientConnection : BaseObject { return null; } - + /** - * Returns true if the connection is in an IDLE state. The or_idling parameter means to return - * true if the connection is working toward an IDLE state (but additional responses are being - * returned from the server before getting there). + * Determines if the connection has an outstanding IDLE command. */ - public bool is_in_idle(bool or_idling) { - switch (fsm.get_state()) { - case State.IDLE: - return true; - - case State.IDLING: - return or_idling; - - default: - return false; - } + public bool is_in_idle() { + return (this.current_command is IdleCommand); } - - public bool install_send_converter(Converter converter) { - return ser.install_converter(converter); - } - - public bool install_recv_converter(Converter converter) { - return des.install_converter(converter); - } - + /** * Returns silently if a connection is already established. */ public async void connect_async(Cancellable? cancellable = null) throws Error { - if (cx != null) { + if (this.cx != null) { debug("Already connected/connecting to %s", to_string()); - return; } - - cx = yield endpoint.connect_async(cancellable); - ios = cx; - outstanding_cmds = 0; - - // issue CONNECTED event and fire signal because the moment the channels are hooked up, - // data can start flowing - fsm.issue(Event.CONNECTED); - + + this.cx = yield endpoint.connect_async(cancellable); + this.ios = cx; + + this.pending_queue.clear(); + this.sent_queue.clear(); + connected(); - + try { yield open_channels_async(); } catch (Error err) { @@ -397,36 +225,34 @@ public class Geary.Imap.ClientConnection : BaseObject { } catch (Error close_err) { // ignored } - - fsm.issue(Event.DISCONNECTED); - - cx = null; - ios = null; - + + this.cx = null; + this.ios = null; + receive_failure(err); - + throw err; } + + if (this.idle_when_quiet) { + this.idle_timer.start(); + } } - + public async void disconnect_async(Cancellable? cancellable = null) throws Error { if (cx == null) return; - + + this.command_timer.reset(); + this.idle_timer.reset(); + // To guard against reentrancy SocketConnection close_cx = cx; cx = null; - // unschedule before yielding to stop the Deserializer - unschedule_flush_timeout(); - - // cancel all outstanding commmand timeouts - cancel_timeout(); - // close the Serializer and Deserializer yield close_channels_async(cancellable); - outstanding_cmds = 0; - + // close the actual streams and the connection itself Error? close_err = null; try { @@ -440,63 +266,13 @@ public class Geary.Imap.ClientConnection : BaseObject { } finally { ios = null; - fsm.issue(Event.DISCONNECTED); - if (close_err != null) close_error(close_err); - + disconnected(); } } - private async void open_channels_async() throws Error { - assert(ios != null); - assert(ser == null); - assert(des == null); - - // Not buffering the Deserializer because it uses a DataInputStream, which is buffered - ser_buffer = new BufferedOutputStream(ios.output_stream); - ser_buffer.set_close_base_stream(false); - - // Use ClientConnection cx_id for debugging aid with Serializer/Deserializer - string id = "%04d".printf(cx_id); - ser = new Serializer(id, ser_buffer); - des = new Deserializer(id, ios.input_stream); - - des.parameters_ready.connect(on_parameters_ready); - des.bytes_received.connect(on_bytes_received); - des.receive_failure.connect(on_receive_failure); - des.deserialize_failure.connect(on_deserialize_failure); - des.eos.connect(on_eos); - - yield des.start_async(); - } - - /** Disconnect and deallocates the Serializer and Deserializer. */ - private async void close_channels_async(Cancellable? cancellable) throws Error { - // disconnect from Deserializer before yielding to stop it - if (des != null) { - des.parameters_ready.disconnect(on_parameters_ready); - des.bytes_received.disconnect(on_bytes_received); - des.receive_failure.disconnect(on_receive_failure); - des.deserialize_failure.disconnect(on_deserialize_failure); - des.eos.disconnect(on_eos); - - yield des.stop_async(); - } - des = null; - ser = null; - // Close the Serializer's buffered stream after it as been - // deallocated so it can't possibly write to the stream again, - // and so the stream's async thread doesn't attempt to flush - // its buffers from its finaliser at some later unspecified - // point, possibly writing to an invalid underlying stream. - if (ser_buffer != null) { - yield ser_buffer.close_async(GLib.Priority.DEFAULT, cancellable); - ser_buffer = null; - } - } - public async void starttls_async(Cancellable? cancellable = null) throws Error { if (cx == null) throw new ImapError.NOT_SUPPORTED("[%s] Unable to enable TLS: no connection", to_string()); @@ -520,637 +296,316 @@ public class Geary.Imap.ClientConnection : BaseObject { // re-open Serializer/Deserializer with the new streams yield open_channels_async(); } - + + public void send_command(Command new_command) throws ImapError { + check_connection(); + + this.pending_queue.send(new_command); + + // If the current command is an IDLE, tell it to exit so we + // can get on with life. + IdleCommand? idle = this.current_command as IdleCommand; + if (idle != null) { + idle.exit_idle(); + } + } + + public string to_string() { + return "%04X/%s/%s".printf( + cx_id, + endpoint.to_string(), + this.cx != null ? "Connected" : "Disconnected" + ); + } + + private async void open_channels_async() throws Error { + assert(ios != null); + assert(ser == null); + assert(des == null); + + this.open_cancellable = new GLib.Cancellable(); + + // Not buffering the Deserializer because it uses a DataInputStream, which is buffered + ser_buffer = new BufferedOutputStream(ios.output_stream); + ser_buffer.set_close_base_stream(false); + + // Use ClientConnection cx_id for debugging aid with Serializer/Deserializer + string id = "%04d".printf(cx_id); + ser = new Serializer(id, ser_buffer); + des = new Deserializer(id, ios.input_stream); + + des.parameters_ready.connect(on_parameters_ready); + des.bytes_received.connect(on_bytes_received); + des.receive_failure.connect(on_receive_failure); + des.deserialize_failure.connect(on_deserialize_failure); + des.eos.connect(on_eos); + + // Start this running in the "background", it will stop when + // open_cancellable is cancelled + this.send_loop.begin(); + + yield des.start_async(); + } + + /** Disconnect and deallocates the Serializer and Deserializer. */ + private async void close_channels_async(Cancellable? cancellable) throws Error { + this.open_cancellable.cancel(); + + // disconnect from Deserializer before yielding to stop it + if (des != null) { + des.parameters_ready.disconnect(on_parameters_ready); + des.bytes_received.disconnect(on_bytes_received); + des.receive_failure.disconnect(on_receive_failure); + des.deserialize_failure.disconnect(on_deserialize_failure); + des.eos.disconnect(on_eos); + + yield des.stop_async(); + } + des = null; + ser = null; + // Close the Serializer's buffered stream after it as been + // deallocated so it can't possibly write to the stream again, + // and so the stream's async thread doesn't attempt to flush + // its buffers from its finaliser at some later unspecified + // point, possibly writing to an invalid underlying stream. + if (ser_buffer != null) { + yield ser_buffer.close_async(GLib.Priority.DEFAULT, cancellable); + ser_buffer = null; + } + } + + // Generates a unique tag for the IMAP connection in the form of + // "<000-999>". + private Tag generate_tag() { + // watch for odometer rollover + if (++tag_counter >= 1000) { + tag_counter = 0; + tag_prefix = (tag_prefix != 'z') ? tag_prefix + 1 : 'a'; + } + + // TODO This could be optimized, but we'll leave it for now. + return new Tag("%c%03d".printf(tag_prefix, tag_counter)); + } + + /** Long lived method to send commands as they are queued. */ + private async void send_loop() { + while (!this.open_cancellable.is_cancelled()) { + try { + GLib.Cancellable cancellable = this.open_cancellable; + Command pending = yield this.pending_queue.receive( + this.open_cancellable + ); + + // Only send IDLE commands if they are the last in the + // queue, there's no point otherwise. + bool pending_idle = pending is IdleCommand; + if (!pending_idle || this.pending_queue.is_empty) { + yield flush_command(pending, cancellable); + } + + // Check the queue is still empty after sending the + // command, since that might have changed. + if (this.pending_queue.is_empty) { + yield this.ser.flush_stream(cancellable); + if (this.idle_when_quiet && !pending_idle) { + this.idle_timer.start(); + } + } + } catch (GLib.Error err) { + if (!(err is GLib.IOError.CANCELLED)) { + send_failure(err); + } + } + } + } + + // Only ever call this from flush_commands, to ensure serial + // assignment of tags and only one command gets flushed at a + // time. This blocks asynchronously while serialising a command, + // including while waiting for continuation request responses when + // sending literals. + private async void flush_command(Command command, Cancellable cancellable) + throws GLib.Error { + // Assign a new tag; Commands with pre-assigned Tags + // should not be re-sent. (Do this inside the critical + // section to ensure commands go out in Tag order; + // this is not an IMAP requirement but makes tracing + // commands easier.) + command.assign_tag(generate_tag()); + + this.current_command = command; + this.sent_queue.add(command); + GLib.Error? ser_error = null; + try { + yield command.serialize(this.ser, cancellable); + } catch (GLib.Error err) { + ser_error = err; + } + + this.current_command = null; + + if (ser_error != null) { + this.sent_queue.remove(command); + throw ser_error; + } + + // We want the timeout to trigger ASAP if the + // connection goes away, so don't reset it if it is + // already running. + if (!this.command_timer.is_running) { + this.command_timer.start(); + } + + sent_command(command); + } + + private Command? get_sent_command(Tag tag) { + Command? sent = null; + if (tag.is_tagged()) { + foreach (Command queued in this.sent_queue) { + if (tag.equal_to(queued.tag)) { + sent = queued; + break; + } + } + } + return sent; + } + + private void check_connection() throws ImapError { + if (this.cx == null) { + throw new ImapError.NOT_CONNECTED( + "Not connected to %s", to_string() + ); + } + } + private void on_parameters_ready(RootParameters root) { + // Reset the command timer, since we know for the + // moment that the connection is good. + this.command_timer.reset(); + ServerResponse response; try { response = ServerResponse.migrate_from_server(root); + + StatusResponse? status = response as StatusResponse; + if (status != null) { + on_status_response(status); + return; + } + + ServerData? data = response as ServerData; + if (data != null) { + on_server_data(data); + return; + } + + ContinuationResponse? continuation = response as ContinuationResponse; + if (continuation != null) { + on_continuation_response(continuation); + return; + } } catch (ImapError err) { received_bad_response(root, err); - return; } - - StatusResponse? status_response = response as StatusResponse; - if (status_response != null) { - fsm.issue(Event.RECVD_STATUS_RESPONSE, null, status_response); - - return; - } - - ServerData? server_data = response as ServerData; - if (server_data != null) { - fsm.issue(Event.RECVD_SERVER_DATA, null, server_data); - - return; - } - - ContinuationResponse? continuation_response = response as ContinuationResponse; - if (continuation_response != null) { - fsm.issue(Event.RECVD_CONTINUATION_RESPONSE, null, continuation_response); - - return; - } - - error("[%s] Unknown ServerResponse of type %s received: %s:", to_string(), response.get_type().name(), - response.to_string()); + + warning( + "[%s] Unknown ServerResponse of type %s received: %s:", + to_string(), response.get_type().name(), + response.to_string() + ); } - + + private void on_status_response(StatusResponse status) + throws ImapError { + if (status.is_completion) { + Command? sent = get_sent_command(status.tag); + if (sent == null) { + throw new ImapError.SERVER_ERROR( + "Unexpected status response: %s", status.to_string() + ); + } + this.sent_queue.remove(sent); + sent.completed(status); + } + + received_status_response(status); + } + + private void on_server_data(ServerData data) + throws ImapError { + Command? sent = get_sent_command(data.tag); + if (sent != null) { + sent.data_received(data); + } + + received_server_data(data); + } + + private void on_continuation_response(ContinuationResponse continuation) + throws ImapError { + Command? current = this.current_command; + if (current == null) { + throw new ImapError.SERVER_ERROR( + "Unexpected continuation request response: %s", + continuation.to_string() + ); + } + current.continuation_requested(continuation); + + received_continuation_response(continuation); + } + private void on_bytes_received(size_t bytes) { - // as long as receiving someone on the connection, keep the outstanding command timeouts - // alive ... this primarily prevents against the case where a command that generates a long - // download doesn't timeout the commands behind it - increase_timeout(); - + // Reset the command timer, since we know for the + // moment that the connection is good. + this.command_timer.reset(); + received_bytes(bytes); } - + private void on_receive_failure(Error err) { receive_failure(err); } - + private void on_deserialize_failure() { - deserialize_failure(new ImapError.PARSE_ERROR("Unable to deserialize from %s", to_string())); + deserialize_failure( + new ImapError.PARSE_ERROR( + "Unable to deserialize from %s", to_string() + ) + ); } - + private void on_eos() { recv_closed(); } - - public async void send_async(Command cmd, Cancellable? cancellable = null) throws Error { - check_for_connection(); - - // need to run this in critical section because Serializer requires it (don't want to be - // pushing data while a flush_async() is occurring) - int token = yield send_mutex.claim_async(cancellable); - - // This needs to happen inside mutex because flush async also manipulates FSM - if (!issue_conditional_event(Event.SEND)) { - debug("[%s] Send async not allowed", to_string()); - - send_mutex.release(ref token); - - throw new ImapError.NOT_CONNECTED("Send not allowed: connection in %s state", - fsm.get_state_string(fsm.get_state())); - } - - // Always assign a new tag; Commands with pre-assigned Tags should not be re-sent. - // (Do this inside the critical section to ensure commands go out in Tag order; this is not - // an IMAP requirement but makes tracing commands easier.) - cmd.assign_tag(generate_tag()); - - // set the timeout on this command; note that a zero-second timeout means no timeout, - // and that there's no timeout on serialization - cmd_started_timeout(); - - Error? ser_err = null; - try { - // watch for disconnect while waiting for mutex - if (ser != null) { - cmd.serialize(ser, cmd.tag); - } else { - ser_err = new ImapError.NOT_CONNECTED("Send not allowed: connection in %s state", - fsm.get_state_string(fsm.get_state())); - } - } catch (Error err) { - debug("[%s] Error serializing command: %s", to_string(), err.message); - ser_err = err; - } - - send_mutex.release(ref token); - - if (ser_err != null) { - send_failure(ser_err); - - throw ser_err; - } - - // Reset flush timer so it only fires after n msec after last command pushed out to stream - reschedule_flush_timeout(); - - // TODO: technically lying a little bit here; since ClientSession keepalives are rescheduled - // by this signal, will want to tighten this up a bit in the future - sent_command(cmd); - } - /** - * Sends a reply to an unsolicited continuation request. - * - * Do not use this if you need to send literal data as part of a - * command, add it as a {@link LiteralParameter} to the command - * instead. - */ - public async void send_continuation_reply(ContinuationParameter reply, - Cancellable? cancellable = null) - throws Error { - check_for_connection(); - // need to run this in critical section because Serializer requires it (don't want to be - // pushing data while a flush_async() is occurring) - int token = yield send_mutex.claim_async(cancellable); + private void on_command_timeout() { + debug("[%s] Sending command timed out", to_string()); - Error? ser_err = null; - try { - // watch for disconnect while waiting for mutex - if (ser != null) { - reply.serialize_continuation(ser); - } else { - ser_err = new ImapError.NOT_CONNECTED("Send not allowed: connection in %s state", - fsm.get_state_string(fsm.get_state())); - } - } catch (Error err) { - debug("[%s] Error serializing command: %s", to_string(), err.message); - ser_err = err; - } - - this.send_mutex.release(ref token); - - if (ser_err != null) { - send_failure(ser_err); - - throw ser_err; - } - - // Reset flush timer so it only fires after n msec after last command pushed out to stream - reschedule_flush_timeout(); - } - - private void reschedule_flush_timeout() { - unschedule_flush_timeout(); - - if (flush_timeout_id == 0) - flush_timeout_id = Timeout.add_full(Priority.LOW, FLUSH_TIMEOUT_MSEC, on_flush_timeout); - } - - private void unschedule_flush_timeout() { - if (flush_timeout_id != 0) { - Source.remove(flush_timeout_id); - flush_timeout_id = 0; - } - } - - private bool on_flush_timeout() { - do_flush_async.begin(); - - flush_timeout_id = 0; - - return false; - } - - private async void do_flush_async() { - // Like send_async(), need to use mutex when flushing as Serializer must be accessed in - // serialized fashion - // - // NOTE: Because this is happening in the background, it's possible for ser to go to null - // after any yield (if a close occurs while blocking); this is why all the checking is - // required - int token = Nonblocking.Mutex.INVALID_TOKEN; - try { - token = yield send_mutex.claim_async(); - - // Dovecot will hang the connection (not send any replies) if IDLE is sent in the - // same buffer as normal commands, so flush the buffer first, enqueue IDLE, and - // flush that behind the first - bool is_synchronized = false; - while (ser != null) { - // prepare for upcoming synchronization point (continuation response could be - // recv'd before flush_async() completes) and reset prior synchronization response - posted_synchronization_tag = ser.next_synchronized_message(); - synchronization_status_response = null; - - Tag? synchronize_tag; - yield ser.flush_async(is_synchronized, out synchronize_tag); - - // if no tag returned, all done, otherwise synchronization required - if (synchronize_tag == null) - break; - - // no longer synchronized - is_synchronized = false; - - // synchronization is not always possible - if (!issue_conditional_event(Event.SYNCHRONIZE)) { - debug("[%s] Unable to synchronize, exiting do_flush_async", to_string()); - - return; - } - - // the expectation that the send_async() command which enqueued the data buffers - // requiring synchronization closed the IDLE first, but it's possible the response - // has not been received from the server yet, so wait now ... even possible the - // connection is still in the IDLING state, so wait for IDLING -> IDLE -> SYNCHRONIZING - while (is_in_idle(true)) { - debug("[%s] Waiting to exit IDLE for synchronization...", to_string()); - yield idle_notifier.wait_async(); - debug("[%s] Finished waiting to exit IDLE for synchronization", to_string()); - } - - // wait for synchronization point to be reached - debug("[%s] Synchronizing...", to_string()); - yield synchronized_notifier.wait_async(); - - // since can be set before reaching wait_async() (due to waiting for IDLE to - // exit), need to manually reset for next time (can't use auto-reset) - synchronized_notifier.reset(); - - // watch for the synchronization request to be thwarted - if (synchronization_status_response != null) { - debug("[%s]: Failed to synchronize command continuation: %s", to_string(), - synchronization_status_response.to_string()); - - // skip pass current message, this one's done - if (ser != null) - ser.fast_forward_queue(); - } else { - debug("[%s] Synchronized, ready to continue", to_string()); - - // now synchronized, ready to continue - is_synchronized = true; - } - } - - // reset synchronization state - posted_synchronization_tag = null; - synchronization_status_response = null; - - // as connection is "quiet" (haven't seen new command in n msec), go into IDLE state - // if (a) allowed by owner, (b) allowed by state machine, and (c) no commands outstanding - if (ser != null && idle_when_quiet && outstanding_cmds == 0 && issue_conditional_event(Event.SEND_IDLE)) { - IdleCommand idle_cmd = new IdleCommand(); - idle_cmd.assign_tag(generate_tag()); - - // store IDLE tag to watch for response later (many responses could arrive before it) - bool added = posted_idle_tags.add(idle_cmd.tag); - assert(added); - - Logging.debug(Logging.Flag.NETWORK, "[%s] Initiating IDLE: %s", to_string(), - idle_cmd.to_string()); - - idle_cmd.serialize(ser, idle_cmd.tag); - - Tag? synchronize_tag; - yield ser.flush_async(false, out synchronize_tag); - - // flushing IDLE should never require synchronization - assert(synchronize_tag == null); - } - } catch (Error err) { - send_failure(err); - } finally { - if (token != Nonblocking.Mutex.INVALID_TOKEN) { - try { - send_mutex.release(ref token); - } catch (Error err2) { - // ignored - } - } - } - } - - private void check_for_connection() throws Error { - if (cx == null) - throw new ImapError.NOT_CONNECTED("Not connected to %s", to_string()); - } - - private void cmd_started_timeout() { - timeout_cmd_count++; - - if (timeout_id == 0) - timeout_id = Timeout.add_seconds(command_timeout_sec, on_cmd_timeout); - } - - private void cmd_completed_timeout() { - if (timeout_cmd_count > 0) - timeout_cmd_count--; - - if (timeout_cmd_count == 0 && timeout_id != 0) { - Source.remove(timeout_id); - timeout_id = 0; - } - } - - private void increase_timeout() { - if (timeout_id != 0) { - Source.remove(timeout_id); - timeout_id = Timeout.add_seconds(command_timeout_sec, on_cmd_timeout); - } - } - - private void cancel_timeout() { - if (timeout_id != 0) - Source.remove(timeout_id); - - timeout_id = 0; - timeout_cmd_count = 0; - } - - private bool on_cmd_timeout() { - debug("[%s] on_cmd_timeout", to_string()); - - // turn off graceful disconnect ... if the connection is hung, don't want to be stalled - // trying to flush the pipe + // turn off graceful disconnect ... if the connection is hung, + // don't want to be stalled trying to flush the pipe TcpConnection? tcp_cx = cx as TcpConnection; if (tcp_cx != null) tcp_cx.set_graceful_disconnect(false); - - timeout_id = 0; - timeout_cmd_count = 0; - - receive_failure(new ImapError.TIMED_OUT("No response to command(s) after %u seconds", - command_timeout_sec)); - - return false; - } - - public string to_string() { - if (cx != null) { - try { - return "%04X/%s/%s".printf(cx_id, - Inet.address_to_string((InetSocketAddress) cx.get_remote_address()), - fsm.get_state_string(fsm.get_state())); - } catch (Error err) { - // fall through - } - } - - return "%04X/%s/%s".printf(cx_id, endpoint.to_string(), fsm.get_state_string(fsm.get_state())); - } - - // - // transition handlers - // - - private bool issue_conditional_event(Event event) { - bool proceed = false; - fsm.issue(event, &proceed); - - return proceed; - } - - private void signal_server_data(void *user, Object? object) { - received_server_data((ServerData) object); - } - - private void signal_status_response(void *user, Object? object) { - StatusResponse? status_response = object as StatusResponse; - if (status_response != null && status_response.is_completion) { - // stop the countdown timer on the associated command - cmd_completed_timeout(); - } - - received_status_response((StatusResponse) object); - } - - private void signal_continuation(void *user, Object? object) { - received_continuation_response((ContinuationResponse) object); - } - - private void signal_entered_idle() { - in_idle(true); - } - - private void signal_left_idle() { - in_idle(false); - } - - private uint do_proceed(uint state, void *user) { - *((bool *) user) = true; - - return state; - } - - private uint do_no_proceed(uint state, void *user) { - *((bool *) user) = false; - - return state; - } - - private uint on_proceed(uint state, uint event, void *user) { - return do_proceed(state, user); - } - - private uint on_no_proceed(uint state, uint event, void *user) { - return do_no_proceed(state, user); - } - - private uint on_connected(uint state, uint event, void *user) { - // don't stay in connected state if IDLE is to be used; schedule an IDLE command (which - // may be rescheduled if commands immediately start being issued, which they most likely - // will) - if (idle_when_quiet) - reschedule_flush_timeout(); - - return State.CONNECTED; - } - - private uint on_disconnected(uint state, uint event, void *user) { - unschedule_flush_timeout(); - - return State.DISCONNECTED; - } - - private uint on_send_idle(uint state, uint event, void *user) { - return do_proceed(State.IDLING, user); - } - - private uint on_synchronize(uint state, uint event, void *user) { - return do_proceed(State.SYNCHRONIZING, user); - } - - private uint on_idle_synchronize(uint state, uint event, void *user) { - // technically this could be accomplished by introducing an IDLE_SYNCHRONZING (and probably - // an IDLING_SYNCHRONIZING) state to indicate that flush_async() is waiting for the FSM - // to transition from IDLING/IDLE to SYNCHRONIZING so it can send a large buffer, but - // that introduces a lot of complication for a rather quick state used infrequently; this - // simple flag does the trick (see on_idle_status_response for how it's used and cleared) - waiting_for_idle_to_synchronize = true; - - return do_proceed(state, user); - } - - private uint on_deidling_synchronize(uint state, uint event, void *user) { - return do_proceed(State.DEIDLING_SYNCHRONIZING, user); - } - - private uint on_status_response(uint state, uint event, void *user, Object? object) { - fsm.do_post_transition(signal_status_response, user, object); - - return state; - } - - private uint on_server_data(uint state, uint event, void *user, Object? object) { - fsm.do_post_transition(signal_server_data, user, object); - - return state; - } - - private uint on_continuation(uint state, uint event, void *user, Object? object) { - fsm.do_post_transition(signal_continuation, user, object); - - return state; - } - - private uint on_idling_deidling_continuation(uint state, uint event, void *user, Object? object) { - ContinuationResponse continuation = (ContinuationResponse) object; - - Logging.debug(Logging.Flag.NETWORK, "[%s R] %s", to_string(), continuation.to_string()); - - // if deidling and a DONE is outstanding, keep waiting for it to complete -- don't go to - // IDLE, as that will cause another spurious DONE to be issued - if (state == State.DEIDLING && outstanding_idle_dones > 0) - return state; - - // only signal entering IDLE state if that's the case - if (state != State.IDLE) - fsm.do_post_transition(signal_entered_idle); - - return State.IDLE; - } - - private uint on_idle_send(uint state, uint event, void *user) { - Logging.debug(Logging.Flag.NETWORK, "[%s] Closing IDLE", to_string()); - - // TODO: Because there is no DISCONNECTING state, need to watch for the Serializer - // disappearing during a disconnect while in a "normal" state - if (ser == null) { - debug("[%s] Unable to close IDLE: no serializer", to_string()); - - return do_no_proceed(state, user); - } - - try { - Logging.debug(Logging.Flag.NETWORK, "[%s S] %s", to_string(), IDLE_DONE); - ser.push_unquoted_string(IDLE_DONE); - ser.push_eol(); - - // track the number of DONE's outstanding, as their responses are pipelined as well - // (this prevents issuing more than one DONE when the idle continuation response comes - // in *after* issuing the DONE) - outstanding_idle_dones++; - } catch (Error err) { - debug("[%s] Unable to close IDLE: %s", to_string(), err.message); - - return do_no_proceed(state, user); - } - - // only signal leaving IDLE state if that's the case - if (state == State.IDLE) - fsm.do_post_transition(signal_left_idle); - - return do_proceed(State.DEIDLING, user); - } - - private uint on_idle_status_response(uint state, uint event, void *user, Object? object) { - StatusResponse status_response = (StatusResponse) object; - - // if not a posted IDLE tag, then treat as external status response - if (!posted_idle_tags.remove(status_response.tag)) { - fsm.do_post_transition(signal_status_response, user, object); - - return state; - } - - // StatusResponse for one of our IDLE commands; either way, no longer in IDLE mode - if (status_response.status == Status.OK) { - Logging.debug(Logging.Flag.NETWORK, "[%s] Leaving IDLE (%d outstanding): %s", to_string(), - posted_idle_tags.size, status_response.to_string()); - } else { - Logging.debug(Logging.Flag.NETWORK, "[%s] Unable to enter IDLE (%d outstanding): %s", to_string(), - posted_idle_tags.size, status_response.to_string()); - } - - // DONE has round-tripped (but watch for underflows, especially if server "forces" an IDLE - // to complete) - outstanding_idle_dones = Numeric.int_floor(outstanding_idle_dones - 1, 0); - - // Only return to CONNECTED if no other IDLE commands are outstanding (and only signal - // if leaving IDLE state for another) - uint next = (posted_idle_tags.size == 0) ? State.CONNECTED : state; - - // don't signal about the StatusResponse, it's in response to a Command generated - // internally (IDLE) and will confuse watchers who receive StatusResponse for a Command - // they didn't issue - - // However, need to signal about leaving idle - if (state == State.IDLE && next != State.IDLE) - fsm.do_post_transition(signal_left_idle); - - // If leaving IDLE for CONNECTED but user has asked to stay in IDLE whenever quiet, reschedule - // flush (which will automatically send IDLE command) - if (next == State.CONNECTED && idle_when_quiet) - reschedule_flush_timeout(); - - // if flush_async() is waiting for a synchronization point and deidling back to CONNECTED, - // go to SYNCHRONIZING so it can push its buffer to the server - if (waiting_for_idle_to_synchronize && next == State.CONNECTED) { - next = State.SYNCHRONIZING; - waiting_for_idle_to_synchronize = false; - } - - return next; - } - - private uint on_idle_continuation(uint state, uint event, void *user, Object? object) { - if (posted_idle_tags.size == 0) { - debug("[%s] Bad continuation received during IDLE: %s", to_string(), - ((ContinuationResponse) object).to_string()); - } - - return state; - } - - private uint on_deidling_synchronizing_status_response(uint state, uint event, void *user, - Object? object) { - // piggyback on on_idle_status_response, but instead of jumping to CONNECTED, jump to - // SYNCHRONIZING (because IDLE has completed) - return (on_idle_status_response(state, event, user, object) == State.CONNECTED) - ? State.SYNCHRONIZING : state; - } - - private uint on_synchronize_status_response(uint state, uint event, void *user, Object? object) { - StatusResponse status_response = (StatusResponse) object; - - // waiting for status response to synchronization message, treat others normally - if (posted_synchronization_tag == null || !posted_synchronization_tag.equal_to(status_response.tag)) { - fsm.do_post_transition(signal_status_response, user, object); - - return state; - } - - // receive status response while waiting for synchronization of a command; this means the - // server has rejected it - debug("[%s] Command continuation rejected: %s", to_string(), status_response.to_string()); - - // save result and notify sleeping flush_async() - synchronization_status_response = status_response; - synchronized_notifier.blind_notify(); - - return State.CONNECTED; - } - - private uint on_synchronize_continuation(uint state, uint event, void *user, Object? object) { - ContinuationResponse continuation = (ContinuationResponse) object; - - if (posted_synchronization_tag == null) { - debug("[%s] Bad command continuation received: %s", to_string(), - continuation.to_string()); - } else { - debug("[%s] Command continuation received for %s: %s", to_string(), - posted_synchronization_tag.to_string(), continuation.to_string()); - } - - // wake up the sleeping flush_async() call so it will continue - synchronization_status_response = null; - synchronized_notifier.blind_notify(); - - // There is no SYNCHRONIZED state, which is kind of fleeting; the moment the flush_async() - // call continues, no longer synchronized - return State.CONNECTED; - } - - private uint on_bad_transition(uint state, uint event, void *user) { - warning("[%s] Bad cx state transition %s", to_string(), fsm.get_event_issued_string(state, event)); - - return on_no_proceed(state, event, user); - } -} + receive_failure( + new ImapError.TIMED_OUT( + "No response to command(s) after %u seconds", + this.command_timer.interval + ) + ); + } + + private void on_idle_timeout() { + Logging.debug(Logging.Flag.NETWORK, "[%s] Initiating IDLE", to_string()); + try { + this.send_command(new IdleCommand()); + } catch (ImapError err) { + debug("[%s] Error sending IDLE: %s", to_string(), err.message); + } + } + +} diff --git a/src/engine/imap/transport/imap-client-session.vala b/src/engine/imap/transport/imap-client-session.vala index 1e44a750..8f9e9775 100644 --- a/src/engine/imap/transport/imap-client-session.vala +++ b/src/engine/imap/transport/imap-client-session.vala @@ -277,10 +277,6 @@ public class Geary.Imap.ClientSession : BaseObject { private uint unselected_keepalive_secs = 0; private uint selected_with_idle_keepalive_secs = 0; - private Gee.HashMap seen_completion_responses = new Gee.HashMap< - Tag, StatusResponse>(); - private Gee.HashMap waiting_for_completion = new Gee.HashMap< - Tag, CommandCallback>(); private Command? state_change_cmd = null; private Nonblocking.Semaphore? connect_waiter = null; private Error? connect_err = null; @@ -783,17 +779,8 @@ public class Geary.Imap.ClientSession : BaseObject { cx = null; } - - // if there are any outstanding commands waiting for responses, wake them up now - if (waiting_for_completion.size > 0) { - debug("[%s] Cancelling %d pending commands", to_string(), waiting_for_completion.size); - foreach (CommandCallback cmd_cb in waiting_for_completion.values) - Scheduler.on_idle(cmd_cb.callback); - - waiting_for_completion.clear(); - } } - + private uint on_connected(uint state, uint event) { debug("[%s] Connected", to_string()); @@ -966,21 +953,6 @@ public class Geary.Imap.ClientSession : BaseObject { // either way, new capabilities should be available caps = capabilities; - - // Attempt compression (usually only available after authentication) - if (caps.has_setting(Capabilities.COMPRESS, Capabilities.DEFLATE_SETTING)) { - StatusResponse resp = yield send_command_async( - new CompressCommand(CompressCommand.ALGORITHM_DEFLATE)); - if (resp.status == Status.OK) { - install_send_converter(new ZlibCompressor(ZlibCompressorFormat.RAW)); - install_recv_converter(new ZlibDecompressor(ZlibCompressorFormat.RAW)); - debug("[%s] Compression started", to_string()); - } else { - debug("[%s] Unable to start compression: %s", to_string(), resp.to_string()); - } - } else { - debug("[%s] No compression available", to_string()); - } Gee.List server_data = new Gee.ArrayList(); ulong data_id = this.server_data_received.connect((data) => { server_data.add(data); }); @@ -1256,18 +1228,6 @@ public class Geary.Imap.ClientSession : BaseObject { debug("[%s] Keepalive error: %s", to_string(), err.message); } } - - // - // Converters - // - - public bool install_send_converter(Converter converter) { - return (cx != null) ? cx.install_send_converter(converter) : false; - } - - public bool install_recv_converter(Converter converter) { - return (cx != null) ? cx.install_recv_converter(converter) : false; - } // // send commands @@ -1750,38 +1710,17 @@ public class Geary.Imap.ClientSession : BaseObject { // // command submission // - + private async StatusResponse command_transaction_async(Command cmd, Cancellable? cancellable) throws Error { - if (cx == null) + if (this.cx == null) throw new ImapError.NOT_CONNECTED("Not connected to %s", imap_endpoint.to_string()); - - yield cx.send_async(cmd, cancellable); - - // send_async() should've tagged the Command, otherwise the completion_pending will fail - assert(cmd.tag.is_tagged()); - - // If the command didn't complete (i.e. a CompletionStatusResponse didn't return from the - // server) in the context of send_async(), wait for it now - if (!seen_completion_responses.has_key(cmd.tag)) { - waiting_for_completion.set(cmd.tag, new CommandCallback(command_transaction_async.callback)); - yield; - } - - // it should be seen now; if not, it's because of disconnection cancelling all the outstanding - // requests - StatusResponse? completion_response; - if (!seen_completion_responses.unset(cmd.tag, out completion_response)) { - assert(cx == null); - - throw new ImapError.NOT_CONNECTED("Not connected to %s", imap_endpoint.to_string()); - } - - assert(completion_response != null); - - return completion_response; + + this.cx.send_command(cmd); + yield cmd.wait_until_complete(cancellable); + return cmd.status; } - + // // network connection event handlers // @@ -1835,24 +1774,14 @@ public class Geary.Imap.ClientSession : BaseObject { err.message); } } - + // update state machine before notifying subscribers, who may turn around and query ClientSession if (status_response.is_completion) { fsm.issue(Event.RECV_COMPLETION, null, status_response, null); - - // Note that this signal could be called in the context of cx.send_async() that sent - // this command to the server ... this mechanism (seen_completion_response and - // waiting_for_completion) assures that in either case issue_command_async() returns - // when the command is completed - seen_completion_responses.set(status_response.tag, status_response); - - CommandCallback? cmd_cb; - if (waiting_for_completion.unset(status_response.tag, out cmd_cb)) - Scheduler.on_idle(cmd_cb.callback); } else { fsm.issue(Event.RECV_STATUS, null, status_response, null); } - + status_response_received(status_response); } diff --git a/src/engine/imap/transport/imap-serializer.vala b/src/engine/imap/transport/imap-serializer.vala index 78696d69..804fa0ce 100644 --- a/src/engine/imap/transport/imap-serializer.vala +++ b/src/engine/imap/transport/imap-serializer.vala @@ -5,232 +5,99 @@ */ /** - * Serializer asynchronously writes serialized IMAP commands to the supplied output stream via a - * queue of buffers. + * Writes serialized IMAP commands to a supplied output stream. * - * Since most IMAP commands are small in size (one line of data, often under 64 bytes), the - * Serializer writes them to a queue of temporary buffers (interspersed with user-supplied buffers - * that are intended to be literal data). The data is only written when {@link flush_async} is - * invoked. - * - * This means that if the caller wants some buffer beyond the steps described above, they should - * pass in a BufferedOutputStream (or one of its subclasses). flush_async() will flush the user's - * OutputStream after writing to it. - * - * Command continuation requires some synchronization between the Serializer and the - * {@link Deserializer}. It also requires some queue management. See {@link fast_forward_queue} - * and {@link next_synchronized_message}. + * Command continuation requires some synchronization between the + * Serializer and the {@link Deserializer}. It also requires some + * queue management. See {@link push_quoted_string} and {@link + * next_synchronized_message}. * * @see Deserializer */ public class Geary.Imap.Serializer : BaseObject { - private class SerializedData { - public Memory.Buffer buffer; - public Tag? literal_data_tag; - - public SerializedData(Memory.Buffer buffer, Tag? literal_data_tag) { - this.buffer = buffer; - this.literal_data_tag = literal_data_tag; - } - } - + private string identifier; - private OutputStream outs; - private ConverterOutputStream couts; - private MemoryOutputStream mouts; - private DataOutputStream douts; - private Geary.Stream.MidstreamConverter midstream = new Geary.Stream.MidstreamConverter("Serializer"); - private Gee.Queue datastream = new Gee.LinkedList(); - - public Serializer(string identifier, OutputStream outs) { + private DataOutputStream output; + + public Serializer(string identifier, OutputStream output) { this.identifier = identifier; - this.outs = outs; - - // prepare the ConverterOutputStream (which wraps the caller's OutputStream and allows for - // midstream conversion) - couts = new ConverterOutputStream(outs, midstream); - couts.set_close_base_stream(false); - - // prepare the DataOutputStream (which generates buffers for the queue) - mouts = new MemoryOutputStream(null, realloc, free); - douts = new DataOutputStream(mouts); - douts.set_close_base_stream(false); + this.output = new DataOutputStream(output); + this.output.set_close_base_stream(false); } - - public bool install_converter(Converter converter) { - return midstream.install(converter); - } - - public void push_ascii(char ch) throws Error { - douts.put_byte(ch, null); - } - + /** - * Pushes the string to the IMAP server with quoting applied whether required or not. Returns - * true if quoting was required. + * Pushes the string to the IMAP server with quoting. + * + * This is applied whether required or not. Returns true if + * quoting was required. */ - public bool push_quoted_string(string str) throws Error { + public bool push_quoted_string(string str, + GLib.Cancellable? cancellable = null) + throws GLib.Error { string quoted; DataFormat.Quoting requirement = DataFormat.convert_to_quoted(str, out quoted); - - douts.put_string(quoted); - + + this.output.put_string(quoted, cancellable); + return (requirement == DataFormat.Quoting.REQUIRED); } - + /** - * This will push the string to IMAP as-is. Use only if you absolutely know what you're doing. + * This will push the string to IMAP as-is. + * + * Use only if you absolutely know what you're doing. */ - public void push_unquoted_string(string str) throws Error { - douts.put_string(str); + public void push_unquoted_string(string str, + GLib.Cancellable? cancellable = null) + throws GLib.Error { + this.output.put_string(str, cancellable); } - - public void push_space() throws Error { - douts.put_byte(' ', null); + + public void push_ascii(char ch, GLib.Cancellable? cancellable = null) + throws GLib.Error { + this.output.put_byte(ch, cancellable); } - - public void push_nil() throws Error { - douts.put_string(NilParameter.VALUE, null); + + public void push_space(GLib.Cancellable? cancellable = null) + throws GLib.Error { + this.output.put_byte(' ', cancellable); } - - public void push_eol() throws Error { - douts.put_string("\r\n", null); + + public void push_nil(GLib.Cancellable? cancellable = null) + throws GLib.Error { + this.output.put_string(NilParameter.VALUE, cancellable); } - - private void enqueue_current_stream() throws IOError { - size_t length = mouts.get_data_size(); - if (length <= 0) - return; - - // close before converting to Memory.ByteBuffer - mouts.close(); - - SerializedData data = new SerializedData( - new Memory.ByteBuffer.from_memory_output_stream(mouts), null); - datastream.add(data); - - mouts = new MemoryOutputStream(null, realloc, free); - douts = new DataOutputStream(mouts); - douts.set_close_base_stream(false); + + public void push_eol(GLib.Cancellable? cancellable = null) + throws GLib.Error { + this.output.put_string("\r\n", cancellable); } - - /* - * Pushes an {link Memory.Buffer} to the serialized stream that must be synchronized - * with the server before transmission. - * - * Literal data may require synchronization with the server and so should only be used when - * necessary. See {link DataFormat.is_quoting_required} to test data. - * - * The supplied buffer must not be mutated once submitted to the {@link Serializer}. - * - * See [[http://tools.ietf.org/html/rfc3501#section-4.3]] and - * [[http://tools.ietf.org/html/rfc3501#section-7.5]] - */ - public void push_synchronized_literal_data(Tag tag, Memory.Buffer buffer) throws Error { - enqueue_current_stream(); - datastream.add(new SerializedData(buffer, tag)); - } - + /** - * Indicates that a complete message has been pushed to the {@link Serializer}. - * - * It's important to delineate messages for the Serializer, as it aids in queue management - * and command continuation (synchronization). + * Pushes literal data to the output stream. */ - public void push_end_of_message() throws Error { - enqueue_current_stream(); - datastream.add(null); + public async void push_literal_data(Memory.Buffer buffer, + GLib.Cancellable? cancellable = null) + throws GLib.Error { + yield this.output.splice_async( + buffer.get_input_stream(), + OutputStreamSpliceFlags.NONE, + Priority.DEFAULT, + cancellable + ); } - + /** - * Returns the {@link Tag} for the message with the next synchronization message Tag. - * - * This can be used to prepare for receiving a command continuation failure before sending - * the request via {@link flush_async}, as the response could return before that call completes. + * Flushes the output stream, ensuring a command has been sent. */ - public Tag? next_synchronized_message() { - foreach (SerializedData? data in datastream) { - if (data != null && data.literal_data_tag != null) - return data.literal_data_tag; - } - - return null; + public async void flush_stream(GLib.Cancellable? cancellable = null) + throws GLib.Error { + yield this.output.flush_async(Priority.DEFAULT, cancellable); } - - /** - * Discards all buffers associated with the current message and moves the queue forward to the - * next one. - * - * This is useful when a command continuation is refused by the server and the command must be - * aborted. - * - * Any data currently in the buffer is *not* enqueued, as by definition it has not been marked - * with {@link push_end_of_message}. - */ - public void fast_forward_queue() { - while (!datastream.is_empty) { - if (datastream.poll() == null) - break; - } - } - - /** - * Push all serialized data and buffers onto the wire. - * - * Caller should pass is_synchronized=true if the connection has been synchronized for a command - * continuation. - * - * If synchronize_tag returns non-null, then the flush has not completed. The connection must - * wait for the server to send a continuation response before continuing. When ready, call - * flush_async() again with is_synchronized set to true. The tag is supplied to watch for - * an error condition from the server (which may reject the synchronization request). - */ - public async void flush_async(bool is_synchronized, out Tag? synchronize_tag, - Cancellable? cancellable = null) throws Error { - synchronize_tag = null; - - // commit the last buffer to the queue (although this is best done with push_end_message) - enqueue_current_stream(); - - // walk the SerializedData queue, pushing each out to the wire unless a synchronization - // point is encountered - while (!datastream.is_empty) { - // see if next data buffer is synchronized - SerializedData? data = datastream.peek(); - if (data != null && data.literal_data_tag != null && !is_synchronized) { - // report the Tag that is associated with the continuation - synchronize_tag = data.literal_data_tag; - - // break out to ensure pipe is flushed - break; - } - - // if not, remove and process - data = datastream.poll(); - if (data == null) { - // end of message, move on - continue; - } - - Logging.debug(Logging.Flag.SERIALIZER, "[%s] %s", to_string(), data.buffer.to_string()); - - // splice buffer's InputStream directly into OutputStream - yield couts.splice_async(data.buffer.get_input_stream(), OutputStreamSpliceFlags.NONE, - Priority.DEFAULT, cancellable); - - // if synchronized before, not any more - is_synchronized = false; - } - - // make sure everything is flushed out now ... some trouble with BufferedOutputStreams - // here, so flush ConverterOutputStream and its base stream - yield couts.flush_async(Priority.DEFAULT, cancellable); - yield couts.base_stream.flush_async(Priority.DEFAULT, cancellable); - } - + public string to_string() { return "ser:%s".printf(identifier); } -} +}