diff --git a/src/engine/imap/command/imap-authenticate-command.vala b/src/engine/imap/command/imap-authenticate-command.vala index 70b5328b..80e3b962 100644 --- a/src/engine/imap/command/imap-authenticate-command.vala +++ b/src/engine/imap/command/imap-authenticate-command.vala @@ -21,10 +21,16 @@ public class Geary.Imap.AuthenticateCommand : Command { public string method { get; private set; } + private LiteralParameter? response_literal = null; + private bool serialised = false; + private Geary.Nonblocking.Spinlock error_lock; + private GLib.Cancellable error_cancellable = new GLib.Cancellable(); + private AuthenticateCommand(string method, string data) { base(NAME, { method, data }); this.method = method; + this.error_lock = new Geary.Nonblocking.Spinlock(this.error_cancellable); } public AuthenticateCommand.oauth2(string user, string token) { @@ -34,19 +40,65 @@ public class Geary.Imap.AuthenticateCommand : Command { this(OAUTH2_METHOD, encoded_token); } - public ContinuationParameter - continuation_requested(ContinuationResponse response) - throws ImapError { - if (this.method != AuthenticateCommand.OAUTH2_METHOD) { - throw new ImapError.INVALID("Unexpected continuation request"); + /** Waits after serialisation has completed for authentication. */ + public override async void serialize(Serializer ser, + GLib.Cancellable cancellable) + throws GLib.Error { + yield base.serialize(ser, cancellable); + this.serialised = true; + + // Need to manually flush here since the connection will be + // waiting this to complete before do so. + yield ser.flush_stream(cancellable); + + // Wait to either get a response or a continuation request + yield this.error_lock.wait_async(cancellable); + if (this.response_literal != null) { + yield this.response_literal.serialize_data(ser, cancellable); + ser.push_eol(cancellable); + yield ser.flush_stream(cancellable); } - // Continuation will be a Base64 encoded JSON blob and which - // indicates a login failure. We don't really care about that - // (do we?) though since once we acknowledge it with a - // zero-length response the server will respond with an IMAP - // error. - return new ContinuationParameter(new uint8[0]); + yield wait_until_complete(cancellable); + } + + public override void cancel_serialization() { + base.cancel_serialization(); + this.error_cancellable.cancel(); + } + + public override void completed(StatusResponse new_status) + throws ImapError { + this.error_lock.blind_notify(); + base.completed(new_status); + } + + public override void continuation_requested(ContinuationResponse response) + throws ImapError { + if (!this.serialised) { + // Allow any args sent as literals to be processed + // normally + base.continuation_requested(response); + } else { + if (this.method != AuthenticateCommand.OAUTH2_METHOD || + this.response_literal != null) { + cancel_serialization(); + throw new ImapError.INVALID( + "Unexpected AUTHENTICATE continuation request" + ); + } + + // Continuation will be a Base64 encoded JSON blob and which + // indicates a login failure. We don't really care about that + // (do we?) though since once we acknowledge it with a + // zero-length response the server will respond with an IMAP + // error. + this.response_literal = new LiteralParameter( + Geary.Memory.EmptyBuffer.instance + ); + // Notify serialisation to continue + this.error_lock.blind_notify(); + } } public override string to_string() { diff --git a/src/engine/imap/command/imap-command.vala b/src/engine/imap/command/imap-command.vala index a6e48179..d688cc4a 100644 --- a/src/engine/imap/command/imap-command.vala +++ b/src/engine/imap/command/imap-command.vala @@ -45,6 +45,17 @@ public class Geary.Imap.Command : BaseObject { protected ListParameter args { get; private set; default = new RootParameters(); } + + /** The status response for the command, once it has been received. */ + public StatusResponse? status { get; private set; default = null; } + + private Geary.Nonblocking.Semaphore complete_lock = + new Geary.Nonblocking.Semaphore(); + + private Geary.Nonblocking.Spinlock? literal_spinlock = null; + private GLib.Cancellable? literal_cancellable = null; + + /** * Constructs a new command with an unassigned tag. * @@ -62,52 +73,208 @@ public class Geary.Imap.Command : BaseObject { } } } - - /** - * - */ - } - - private void stock_params() { - add(tag); - add(new AtomParameter(name)); - if (args != null) { - foreach (string arg in args) - add(Parameter.get_for_string(arg)); - } - } - - /** - * Assign a {@link Tag} to a {@link Command} with an unassigned placeholder Tag. - * - * Can only be called on a Command that holds an unassigned Tag. Thus, this can only be called - * once at most, and zero times if Command.assigned() was used to generate the Command. - * Fires an assertion if either of these cases is true, or if the supplied Tag is unassigned. - */ - public void assign_tag(Tag tag) { - assert(!this.tag.is_assigned()); - assert(tag.is_assigned()); - - this.tag = tag; - - // Tag is always at index zero. - try { - Parameter param = replace(0, tag); - assert(param is Tag); - } catch (ImapError err) { - error("Unable to assign Tag for command %s: %s", to_string(), err.message); - } - } - + public bool has_name(string name) { return Ascii.stri_equal(this.name, name); } - - public override void serialize(Serializer ser, Tag tag) throws Error { - assert(tag.is_assigned()); - - base.serialize(ser, tag); - ser.push_end_of_message(); + + /** + * Assign a Tag to this command, if currently unassigned. + * + * Can only be called on a Command that holds an unassigned Tag. + * Thus, this can only be called once at most, and zero times if + * Command.assigned() was used to generate the Command. Fires an + * assertion if either of these cases is true, or if the supplied + * Tag is unassigned. + */ + public void assign_tag(Tag new_tag) throws ImapError { + if (this.tag.is_assigned()) { + throw new ImapError.SERVER_ERROR( + "%s: Command tag is already assigned", to_brief_string() + ); + } + if (!new_tag.is_assigned()) { + throw new ImapError.SERVER_ERROR( + "%s: New tag is not assigned", to_brief_string() + ); + } + + this.tag = new_tag; + } + + /** + * Serialises this command for transmission to the server. + * + * This will serialise its tag, name and arguments (if + * any). Arguments are treated as strings and escaped as needed, + * including being encoded as a literal. If any literals are + * required, this method will yield until a command continuation + * has been received, when it will resume the same process. + */ + public virtual async void serialize(Serializer ser, + GLib.Cancellable cancellable) + throws GLib.Error { + this.tag.serialize(ser, cancellable); + ser.push_space(cancellable); + ser.push_unquoted_string(this.name, cancellable); + + if (this.args != null) { + foreach (Parameter arg in this.args.get_all()) { + ser.push_space(cancellable); + arg.serialize(ser, cancellable); + + LiteralParameter literal = arg as LiteralParameter; + if (literal != null) { + // Need to manually flush after serialising the + // literal param, so it actually gets to the + // server + yield ser.flush_stream(cancellable); + + if (this.literal_spinlock == null) { + // Lazily create these since they usually + // won't be needed + this.literal_cancellable = new GLib.Cancellable(); + this.literal_spinlock = new Geary.Nonblocking.Spinlock( + this.literal_cancellable + ); + } + + // Will get notified via continuation_requested + // when server indicated the literal can be sent. + yield this.literal_spinlock.wait_async(cancellable); + yield literal.serialize_data(ser, cancellable); + } + } + } + + ser.push_eol(cancellable); + } + + /** + * Cancels any existing serialisation in progress. + * + * When this method is called, any non I/O related process + * blocking the blocking {@link serialize} must be cancelled. + */ + public virtual void cancel_serialization() { + if (this.literal_cancellable != null) { + this.literal_cancellable.cancel(); + } + } + + /** + * Yields until the command has been completed or cancelled. + * + * Throws an error if cancelled, or if the command's response was + * bad. + */ + public async void wait_until_complete(GLib.Cancellable cancellable) + throws GLib.Error { + yield this.complete_lock.wait_async(cancellable); + check_status(); + } + + /** + * Called when a tagged status response is received for this command. + * + * This will update the command's {@link status} property, then + * throw an error it does not indicate a successful completion. + */ + public virtual void completed(StatusResponse new_status) + throws ImapError { + if (this.status != null) { + cancel_serialization(); + throw new ImapError.SERVER_ERROR( + "%s: Duplicate status response received: %s", + to_brief_string(), + status.to_string() + ); + } + + this.status = new_status; + this.complete_lock.blind_notify(); + cancel_serialization(); + check_status(); + } + + /** + * Called when tagged server data is received for this command. + */ + public virtual void data_received(ServerData data) + throws ImapError { + if (this.status != null) { + cancel_serialization(); + throw new ImapError.SERVER_ERROR( + "%s: Server data received when command already complete: %s", + to_brief_string(), + data.to_string() + ); + } + // Nothing to do otherwise + } + + /** + * Called when a continuation was requested by the server. + * + * This will notify the command's literal spinlock so that if + * {@link serialize} is waiting to send a literal, it will do so + * now. + */ + public virtual void + continuation_requested(ContinuationResponse continuation) + throws ImapError { + if (this.status != null) { + cancel_serialization(); + throw new ImapError.SERVER_ERROR( + "%s: Continuation requested when command already complete", + to_brief_string() + ); + } + + if (this.literal_spinlock == null) { + cancel_serialization(); + throw new ImapError.SERVER_ERROR( + "%s: Continuation requested but no literals available", + to_brief_string() + ); + } + + this.literal_spinlock.blind_notify(); + } + + public virtual string to_string() { + string args = this.args.to_string(); + return (Geary.String.is_empty(args)) + ? "%s %s".printf(this.tag.to_string(), this.name) + : "%s %s %s".printf(this.tag.to_string(), this.name, args); + } + + private void check_status() throws ImapError { + if (this.status == null) { + throw new ImapError.SERVER_ERROR( + "%s: No command response was received", + to_brief_string() + ); + } + + if (!this.status.is_completion) { + throw new ImapError.SERVER_ERROR( + "%s: Command status response is not a completion: %s", + to_brief_string(), + this.status.to_string() + ); + } + + if (this.status.status != Status.OK) { + throw new ImapError.SERVER_ERROR( + "%s: Command failed: %s", + to_brief_string(), + this.status.to_string() + ); + } + } + + private string to_brief_string() { + return "%s %s".printf(this.tag.to_string(), this.name); } } - diff --git a/src/engine/imap/command/imap-idle-command.vala b/src/engine/imap/command/imap-idle-command.vala index e781b249..a2fac042 100644 --- a/src/engine/imap/command/imap-idle-command.vala +++ b/src/engine/imap/command/imap-idle-command.vala @@ -11,10 +11,71 @@ */ public class Geary.Imap.IdleCommand : Command { - public const string NAME = "idle"; - - public IdleCommand() { - base (NAME); - } -} + public const string NAME = "IDLE"; + + private const string DONE = "DONE"; + + /** Determines if the server has acknowledged the IDLE request. */ + public bool idle_started { get; private set; default = false; } + + private bool serialised = false; + private Geary.Nonblocking.Spinlock? exit_lock; + private GLib.Cancellable? exit_cancellable = new GLib.Cancellable(); + + + public IdleCommand() { + base(NAME); + this.exit_lock = new Geary.Nonblocking.Spinlock(this.exit_cancellable); + } + + public void exit_idle() { + this.exit_lock.blind_notify(); + } + + /** Waits after serialisation has completed for {@link exit_idle}. */ + public override async void serialize(Serializer ser, + GLib.Cancellable cancellable) + throws GLib.Error { + // Need to manually flush here since Dovecot doesn't like + // getting IDLE in the same buffer as other commands. + yield ser.flush_stream(cancellable); + + yield base.serialize(ser, cancellable); + this.serialised = true; + + // Need to manually flush again since the connection will be + // waiting this to complete before do so. + yield ser.flush_stream(cancellable); + + // Now wait for exit_idle() to be called, the server to send a + // status response, or everything to be cancelled. + yield this.exit_lock.wait_async(cancellable); + + // If we aren't closed already, try sending DONE to exit IDLE + if (this.status == null) { + ser.push_unquoted_string(DONE); + ser.push_eol(cancellable); + yield ser.flush_stream(cancellable); + } + + yield wait_until_complete(cancellable); + } + + public override void cancel_serialization() { + base.cancel_serialization(); + this.exit_cancellable.cancel(); + } + + public override void continuation_requested(ContinuationResponse response) + throws ImapError { + if (!this.serialised) { + // Allow any args sent as literals to be processed + // normally + base.continuation_requested(response); + } else { + this.idle_started = true; + } + } + +} diff --git a/src/engine/imap/parameter/imap-literal-parameter.vala b/src/engine/imap/parameter/imap-literal-parameter.vala index 6c9164c6..4d394697 100644 --- a/src/engine/imap/parameter/imap-literal-parameter.vala +++ b/src/engine/imap/parameter/imap-literal-parameter.vala @@ -64,4 +64,13 @@ public class Geary.Imap.LiteralParameter : Geary.Imap.Parameter { ser.push_eol(cancellable); } + /** + * Serialises the literal parameter data. + */ + public async void serialize_data(Serializer ser, + GLib.Cancellable cancellable) + throws GLib.Error { + yield ser.push_literal_data(buffer, cancellable); + } + } diff --git a/src/engine/imap/transport/imap-client-connection.vala b/src/engine/imap/transport/imap-client-connection.vala index 1b9eb635..3e4e4429 100644 --- a/src/engine/imap/transport/imap-client-connection.vala +++ b/src/engine/imap/transport/imap-client-connection.vala @@ -36,76 +36,19 @@ public class Geary.Imap.ClientConnection : BaseObject { public const uint RECOMMENDED_TIMEOUT_SEC = ClientSession.RECOMMENDED_KEEPALIVE_SEC + 15; /** - * The default timeout for an issued command to result in a response code from the server. - * - * @see command_timeout_sec + * Default timeout to wait for a server response for a command. */ public const uint DEFAULT_COMMAND_TIMEOUT_SEC = 30; - - private const int FLUSH_TIMEOUT_MSEC = 10; - - // At least one server out there requires this to be in caps - private const string IDLE_DONE = "DONE"; - - private enum State { - UNCONNECTED, - CONNECTED, - IDLING, - IDLE, - DEIDLING, - DEIDLING_SYNCHRONIZING, - SYNCHRONIZING, - DISCONNECTED, - - COUNT - } - - private static string state_to_string(uint state) { - return ((State) state).to_string(); - } - - private enum Event { - CONNECTED, - DISCONNECTED, - - // Use issue_conditional_event() for SEND events, using the result to determine whether - // or not to continue; the transition handlers do no signalling or I/O - SEND, - SEND_IDLE, - - // To initiate a command continuation request - SYNCHRONIZE, - - // RECVD_* will emit appropriate signals inside their transition handlers; do *not* use - // issue_conditional_event() for these events - RECVD_STATUS_RESPONSE, - RECVD_SERVER_DATA, - RECVD_CONTINUATION_RESPONSE, - - COUNT - } - - private static string event_to_string(uint event) { - return ((Event) event).to_string(); - } - - private static Geary.State.MachineDescriptor machine_desc = new Geary.State.MachineDescriptor( - "Geary.Imap.ClientConnection", State.UNCONNECTED, State.COUNT, Event.COUNT, - state_to_string, event_to_string); - + + /** + * Default timeout to wait for another command before going idle. + */ + public const uint DEFAULT_IDLE_TIMEOUT_SEC = 2; + // Used solely for debugging private static int next_cx_id = 0; - - /** - * The timeout in seconds before an uncompleted {@link Command} is considered abandoned. - * - * ClientConnection does not time out the initial greeting from the server (as there's no - * command associated with it). That's the responsibility of the caller. - * - * A timed-out command will result in the connection being forcibly closed. - */ - public uint command_timeout_sec { get; set; default = DEFAULT_COMMAND_TIMEOUT_SEC; } - + + /** * This identifier is used only for debugging, to differentiate connections from one another * in logs and debug output. @@ -131,27 +74,26 @@ public class Geary.Imap.ClientConnection : BaseObject { public bool idle_when_quiet = false; private Geary.Endpoint endpoint; - private Geary.State.Machine fsm; private SocketConnection? cx = null; private IOStream? ios = null; private Serializer? ser = null; private BufferedOutputStream? ser_buffer = null; private Deserializer? des = null; - private Geary.Nonblocking.Mutex send_mutex = new Geary.Nonblocking.Mutex(); - private Geary.Nonblocking.Semaphore synchronized_notifier = new Geary.Nonblocking.Semaphore(); - private Geary.Nonblocking.Event idle_notifier = new Geary.Nonblocking.Event(); + private int tag_counter = 0; private char tag_prefix = 'a'; - private uint flush_timeout_id = 0; - private Gee.HashSet posted_idle_tags = new Gee.HashSet(); - private int outstanding_idle_dones = 0; - private Tag? posted_synchronization_tag = null; - private StatusResponse? synchronization_status_response = null; - private bool waiting_for_idle_to_synchronize = false; - private uint timeout_id = 0; - private uint timeout_cmd_count = 0; - private int outstanding_cmds = 0; - + + private Geary.Nonblocking.Queue pending_queue = + new Geary.Nonblocking.Queue.fifo(); + private Gee.Queue sent_queue = new Gee.LinkedList(); + private Command? current_command = null; + + private TimeoutManager command_timer; + private TimeoutManager idle_timer; + + private GLib.Cancellable? open_cancellable = null; + + public virtual signal void connected() { Logging.debug(Logging.Flag.NETWORK, "[%s] connected to %s", to_string(), endpoint.to_string()); @@ -161,32 +103,15 @@ public class Geary.Imap.ClientConnection : BaseObject { Logging.debug(Logging.Flag.NETWORK, "[%s] disconnected from %s", to_string(), endpoint.to_string()); } - + public virtual signal void sent_command(Command cmd) { Logging.debug(Logging.Flag.NETWORK, "[%s S] %s", to_string(), cmd.to_string()); - - // track outstanding Command count to force switching to IDLE only when nothing outstanding - outstanding_cmds++; } - - public virtual signal void in_idle(bool idling) { - Logging.debug(Logging.Flag.NETWORK, "[%s] in idle: %s", to_string(), idling.to_string()); - - // fire the Event every time the IDLE state changes - idle_notifier.blind_notify(); - } - + public virtual signal void received_status_response(StatusResponse status_response) { Logging.debug(Logging.Flag.NETWORK, "[%s R] %s", to_string(), status_response.to_string()); - - // look for command completion, if no outstanding, schedule a flush timeout to switch to - // IDLE mode - if (status_response.is_completion) { - if (--outstanding_cmds == 0) - reschedule_flush_timeout(); - } } - + public virtual signal void received_server_data(ServerData server_data) { Logging.debug(Logging.Flag.NETWORK, "[%s R] %s", to_string(), server_data.to_string()); } @@ -224,95 +149,20 @@ public class Geary.Imap.ClientConnection : BaseObject { public virtual signal void close_error(Error err) { Logging.debug(Logging.Flag.NETWORK, "[%s] close error: %s", to_string(), err.message); } - - public ClientConnection(Geary.Endpoint endpoint) { + + + public ClientConnection(Geary.Endpoint endpoint, + uint command_timeout_sec = DEFAULT_COMMAND_TIMEOUT_SEC, + uint idle_timeout_sec = DEFAULT_IDLE_TIMEOUT_SEC) { this.endpoint = endpoint; - cx_id = next_cx_id++; - - Geary.State.Mapping[] mappings = { - new Geary.State.Mapping(State.UNCONNECTED, Event.CONNECTED, on_connected), - new Geary.State.Mapping(State.UNCONNECTED, Event.DISCONNECTED, on_disconnected), - - new Geary.State.Mapping(State.CONNECTED, Event.SEND, on_proceed), - new Geary.State.Mapping(State.CONNECTED, Event.SEND_IDLE, on_send_idle), - new Geary.State.Mapping(State.CONNECTED, Event.SYNCHRONIZE, on_synchronize), - new Geary.State.Mapping(State.CONNECTED, Event.RECVD_STATUS_RESPONSE, on_status_response), - new Geary.State.Mapping(State.CONNECTED, Event.RECVD_SERVER_DATA, on_server_data), - new Geary.State.Mapping(State.CONNECTED, Event.RECVD_CONTINUATION_RESPONSE, on_continuation), - new Geary.State.Mapping(State.CONNECTED, Event.DISCONNECTED, on_disconnected), - - new Geary.State.Mapping(State.IDLING, Event.SEND, on_idle_send), - new Geary.State.Mapping(State.IDLING, Event.SEND_IDLE, on_no_proceed), - new Geary.State.Mapping(State.IDLING, Event.SYNCHRONIZE, on_idle_synchronize), - new Geary.State.Mapping(State.IDLING, Event.RECVD_STATUS_RESPONSE, on_idle_status_response), - new Geary.State.Mapping(State.IDLING, Event.RECVD_SERVER_DATA, on_server_data), - new Geary.State.Mapping(State.IDLING, Event.RECVD_CONTINUATION_RESPONSE, on_idling_deidling_continuation), - new Geary.State.Mapping(State.IDLING, Event.DISCONNECTED, on_disconnected), - - new Geary.State.Mapping(State.IDLE, Event.SEND, on_idle_send), - new Geary.State.Mapping(State.IDLE, Event.SEND_IDLE, on_no_proceed), - new Geary.State.Mapping(State.IDLE, Event.SYNCHRONIZE, on_idle_synchronize), - new Geary.State.Mapping(State.IDLE, Event.RECVD_STATUS_RESPONSE, on_idle_status_response), - new Geary.State.Mapping(State.IDLE, Event.RECVD_SERVER_DATA, on_server_data), - new Geary.State.Mapping(State.IDLE, Event.RECVD_CONTINUATION_RESPONSE, on_idle_continuation), - new Geary.State.Mapping(State.IDLE, Event.DISCONNECTED, on_disconnected), - - new Geary.State.Mapping(State.DEIDLING, Event.SEND, on_proceed), - new Geary.State.Mapping(State.DEIDLING, Event.SEND_IDLE, on_send_idle), - new Geary.State.Mapping(State.DEIDLING, Event.SYNCHRONIZE, on_deidling_synchronize), - new Geary.State.Mapping(State.DEIDLING, Event.RECVD_STATUS_RESPONSE, on_idle_status_response), - new Geary.State.Mapping(State.DEIDLING, Event.RECVD_SERVER_DATA, on_server_data), - new Geary.State.Mapping(State.DEIDLING, Event.RECVD_CONTINUATION_RESPONSE, on_idling_deidling_continuation), - new Geary.State.Mapping(State.DEIDLING, Event.DISCONNECTED, on_disconnected), - - new Geary.State.Mapping(State.DEIDLING_SYNCHRONIZING, Event.SEND, on_proceed), - new Geary.State.Mapping(State.DEIDLING_SYNCHRONIZING, Event.SEND_IDLE, on_no_proceed), - new Geary.State.Mapping(State.DEIDLING_SYNCHRONIZING, Event.RECVD_STATUS_RESPONSE, - on_deidling_synchronizing_status_response), - new Geary.State.Mapping(State.DEIDLING_SYNCHRONIZING, Event.RECVD_SERVER_DATA, on_server_data), - new Geary.State.Mapping(State.DEIDLING_SYNCHRONIZING, Event.RECVD_CONTINUATION_RESPONSE, - on_synchronize_continuation), - new Geary.State.Mapping(State.DEIDLING_SYNCHRONIZING, Event.DISCONNECTED, on_disconnected), - - new Geary.State.Mapping(State.SYNCHRONIZING, Event.SEND, on_proceed), - new Geary.State.Mapping(State.SYNCHRONIZING, Event.SEND_IDLE, on_no_proceed), - new Geary.State.Mapping(State.SYNCHRONIZING, Event.RECVD_STATUS_RESPONSE, - on_synchronize_status_response), - new Geary.State.Mapping(State.SYNCHRONIZING, Event.RECVD_SERVER_DATA, on_server_data), - new Geary.State.Mapping(State.SYNCHRONIZING, Event.RECVD_CONTINUATION_RESPONSE, - on_synchronize_continuation), - new Geary.State.Mapping(State.SYNCHRONIZING, Event.DISCONNECTED, on_disconnected), - - // TODO: A DISCONNECTING state would be helpful here, allowing for responses and data - // received from the server after a send error caused a disconnect to be signalled to - // subscribers before moving to the DISCONNECTED state. That would require more work, - // allowing for the caller (ClientSession) to close the receive channel and wait for - // everything to flush out before it shifted to a DISCONNECTED state as well. - new Geary.State.Mapping(State.DISCONNECTED, Event.SEND, on_no_proceed), - new Geary.State.Mapping(State.DISCONNECTED, Event.SEND_IDLE, on_no_proceed), - new Geary.State.Mapping(State.DISCONNECTED, Event.SYNCHRONIZE, on_no_proceed), - new Geary.State.Mapping(State.DISCONNECTED, Event.RECVD_STATUS_RESPONSE, Geary.State.nop), - new Geary.State.Mapping(State.DISCONNECTED, Event.RECVD_SERVER_DATA, Geary.State.nop), - new Geary.State.Mapping(State.DISCONNECTED, Event.RECVD_CONTINUATION_RESPONSE, Geary.State.nop), - new Geary.State.Mapping(State.DISCONNECTED, Event.DISCONNECTED, Geary.State.nop) - }; - - fsm = new Geary.State.Machine(machine_desc, mappings, on_bad_transition); - fsm.set_logging(false); - } - - /** - * Generates a unique tag for the IMAP connection in the form of "<000-999>". - */ - private Tag generate_tag() { - // watch for odometer rollover - if (++tag_counter >= 1000) { - tag_counter = 0; - tag_prefix = (tag_prefix != 'z') ? tag_prefix + 1 : 'a'; - } - - // TODO This could be optimized, but we'll leave it for now. - return new Tag("%c%03d".printf(tag_prefix, tag_counter)); + this.cx_id = next_cx_id++; + + this.command_timer = new TimeoutManager.seconds( + command_timeout_sec, on_command_timeout + ); + this.idle_timer = new TimeoutManager.seconds( + idle_timeout_sec, on_idle_timeout + ); } public SocketAddress? get_remote_address() { @@ -340,53 +190,31 @@ public class Geary.Imap.ClientConnection : BaseObject { return null; } - + /** - * Returns true if the connection is in an IDLE state. The or_idling parameter means to return - * true if the connection is working toward an IDLE state (but additional responses are being - * returned from the server before getting there). + * Determines if the connection has an outstanding IDLE command. */ - public bool is_in_idle(bool or_idling) { - switch (fsm.get_state()) { - case State.IDLE: - return true; - - case State.IDLING: - return or_idling; - - default: - return false; - } + public bool is_in_idle() { + return (this.current_command is IdleCommand); } - - public bool install_send_converter(Converter converter) { - return ser.install_converter(converter); - } - - public bool install_recv_converter(Converter converter) { - return des.install_converter(converter); - } - + /** * Returns silently if a connection is already established. */ public async void connect_async(Cancellable? cancellable = null) throws Error { - if (cx != null) { + if (this.cx != null) { debug("Already connected/connecting to %s", to_string()); - return; } - - cx = yield endpoint.connect_async(cancellable); - ios = cx; - outstanding_cmds = 0; - - // issue CONNECTED event and fire signal because the moment the channels are hooked up, - // data can start flowing - fsm.issue(Event.CONNECTED); - + + this.cx = yield endpoint.connect_async(cancellable); + this.ios = cx; + + this.pending_queue.clear(); + this.sent_queue.clear(); + connected(); - + try { yield open_channels_async(); } catch (Error err) { @@ -397,36 +225,34 @@ public class Geary.Imap.ClientConnection : BaseObject { } catch (Error close_err) { // ignored } - - fsm.issue(Event.DISCONNECTED); - - cx = null; - ios = null; - + + this.cx = null; + this.ios = null; + receive_failure(err); - + throw err; } + + if (this.idle_when_quiet) { + this.idle_timer.start(); + } } - + public async void disconnect_async(Cancellable? cancellable = null) throws Error { if (cx == null) return; - + + this.command_timer.reset(); + this.idle_timer.reset(); + // To guard against reentrancy SocketConnection close_cx = cx; cx = null; - // unschedule before yielding to stop the Deserializer - unschedule_flush_timeout(); - - // cancel all outstanding commmand timeouts - cancel_timeout(); - // close the Serializer and Deserializer yield close_channels_async(cancellable); - outstanding_cmds = 0; - + // close the actual streams and the connection itself Error? close_err = null; try { @@ -440,63 +266,13 @@ public class Geary.Imap.ClientConnection : BaseObject { } finally { ios = null; - fsm.issue(Event.DISCONNECTED); - if (close_err != null) close_error(close_err); - + disconnected(); } } - private async void open_channels_async() throws Error { - assert(ios != null); - assert(ser == null); - assert(des == null); - - // Not buffering the Deserializer because it uses a DataInputStream, which is buffered - ser_buffer = new BufferedOutputStream(ios.output_stream); - ser_buffer.set_close_base_stream(false); - - // Use ClientConnection cx_id for debugging aid with Serializer/Deserializer - string id = "%04d".printf(cx_id); - ser = new Serializer(id, ser_buffer); - des = new Deserializer(id, ios.input_stream); - - des.parameters_ready.connect(on_parameters_ready); - des.bytes_received.connect(on_bytes_received); - des.receive_failure.connect(on_receive_failure); - des.deserialize_failure.connect(on_deserialize_failure); - des.eos.connect(on_eos); - - yield des.start_async(); - } - - /** Disconnect and deallocates the Serializer and Deserializer. */ - private async void close_channels_async(Cancellable? cancellable) throws Error { - // disconnect from Deserializer before yielding to stop it - if (des != null) { - des.parameters_ready.disconnect(on_parameters_ready); - des.bytes_received.disconnect(on_bytes_received); - des.receive_failure.disconnect(on_receive_failure); - des.deserialize_failure.disconnect(on_deserialize_failure); - des.eos.disconnect(on_eos); - - yield des.stop_async(); - } - des = null; - ser = null; - // Close the Serializer's buffered stream after it as been - // deallocated so it can't possibly write to the stream again, - // and so the stream's async thread doesn't attempt to flush - // its buffers from its finaliser at some later unspecified - // point, possibly writing to an invalid underlying stream. - if (ser_buffer != null) { - yield ser_buffer.close_async(GLib.Priority.DEFAULT, cancellable); - ser_buffer = null; - } - } - public async void starttls_async(Cancellable? cancellable = null) throws Error { if (cx == null) throw new ImapError.NOT_SUPPORTED("[%s] Unable to enable TLS: no connection", to_string()); @@ -520,637 +296,316 @@ public class Geary.Imap.ClientConnection : BaseObject { // re-open Serializer/Deserializer with the new streams yield open_channels_async(); } - + + public void send_command(Command new_command) throws ImapError { + check_connection(); + + this.pending_queue.send(new_command); + + // If the current command is an IDLE, tell it to exit so we + // can get on with life. + IdleCommand? idle = this.current_command as IdleCommand; + if (idle != null) { + idle.exit_idle(); + } + } + + public string to_string() { + return "%04X/%s/%s".printf( + cx_id, + endpoint.to_string(), + this.cx != null ? "Connected" : "Disconnected" + ); + } + + private async void open_channels_async() throws Error { + assert(ios != null); + assert(ser == null); + assert(des == null); + + this.open_cancellable = new GLib.Cancellable(); + + // Not buffering the Deserializer because it uses a DataInputStream, which is buffered + ser_buffer = new BufferedOutputStream(ios.output_stream); + ser_buffer.set_close_base_stream(false); + + // Use ClientConnection cx_id for debugging aid with Serializer/Deserializer + string id = "%04d".printf(cx_id); + ser = new Serializer(id, ser_buffer); + des = new Deserializer(id, ios.input_stream); + + des.parameters_ready.connect(on_parameters_ready); + des.bytes_received.connect(on_bytes_received); + des.receive_failure.connect(on_receive_failure); + des.deserialize_failure.connect(on_deserialize_failure); + des.eos.connect(on_eos); + + // Start this running in the "background", it will stop when + // open_cancellable is cancelled + this.send_loop.begin(); + + yield des.start_async(); + } + + /** Disconnect and deallocates the Serializer and Deserializer. */ + private async void close_channels_async(Cancellable? cancellable) throws Error { + this.open_cancellable.cancel(); + + // disconnect from Deserializer before yielding to stop it + if (des != null) { + des.parameters_ready.disconnect(on_parameters_ready); + des.bytes_received.disconnect(on_bytes_received); + des.receive_failure.disconnect(on_receive_failure); + des.deserialize_failure.disconnect(on_deserialize_failure); + des.eos.disconnect(on_eos); + + yield des.stop_async(); + } + des = null; + ser = null; + // Close the Serializer's buffered stream after it as been + // deallocated so it can't possibly write to the stream again, + // and so the stream's async thread doesn't attempt to flush + // its buffers from its finaliser at some later unspecified + // point, possibly writing to an invalid underlying stream. + if (ser_buffer != null) { + yield ser_buffer.close_async(GLib.Priority.DEFAULT, cancellable); + ser_buffer = null; + } + } + + // Generates a unique tag for the IMAP connection in the form of + // "<000-999>". + private Tag generate_tag() { + // watch for odometer rollover + if (++tag_counter >= 1000) { + tag_counter = 0; + tag_prefix = (tag_prefix != 'z') ? tag_prefix + 1 : 'a'; + } + + // TODO This could be optimized, but we'll leave it for now. + return new Tag("%c%03d".printf(tag_prefix, tag_counter)); + } + + /** Long lived method to send commands as they are queued. */ + private async void send_loop() { + while (!this.open_cancellable.is_cancelled()) { + try { + GLib.Cancellable cancellable = this.open_cancellable; + Command pending = yield this.pending_queue.receive( + this.open_cancellable + ); + + // Only send IDLE commands if they are the last in the + // queue, there's no point otherwise. + bool pending_idle = pending is IdleCommand; + if (!pending_idle || this.pending_queue.is_empty) { + yield flush_command(pending, cancellable); + } + + // Check the queue is still empty after sending the + // command, since that might have changed. + if (this.pending_queue.is_empty) { + yield this.ser.flush_stream(cancellable); + if (this.idle_when_quiet && !pending_idle) { + this.idle_timer.start(); + } + } + } catch (GLib.Error err) { + if (!(err is GLib.IOError.CANCELLED)) { + send_failure(err); + } + } + } + } + + // Only ever call this from flush_commands, to ensure serial + // assignment of tags and only one command gets flushed at a + // time. This blocks asynchronously while serialising a command, + // including while waiting for continuation request responses when + // sending literals. + private async void flush_command(Command command, Cancellable cancellable) + throws GLib.Error { + // Assign a new tag; Commands with pre-assigned Tags + // should not be re-sent. (Do this inside the critical + // section to ensure commands go out in Tag order; + // this is not an IMAP requirement but makes tracing + // commands easier.) + command.assign_tag(generate_tag()); + + this.current_command = command; + this.sent_queue.add(command); + GLib.Error? ser_error = null; + try { + yield command.serialize(this.ser, cancellable); + } catch (GLib.Error err) { + ser_error = err; + } + + this.current_command = null; + + if (ser_error != null) { + this.sent_queue.remove(command); + throw ser_error; + } + + // We want the timeout to trigger ASAP if the + // connection goes away, so don't reset it if it is + // already running. + if (!this.command_timer.is_running) { + this.command_timer.start(); + } + + sent_command(command); + } + + private Command? get_sent_command(Tag tag) { + Command? sent = null; + if (tag.is_tagged()) { + foreach (Command queued in this.sent_queue) { + if (tag.equal_to(queued.tag)) { + sent = queued; + break; + } + } + } + return sent; + } + + private void check_connection() throws ImapError { + if (this.cx == null) { + throw new ImapError.NOT_CONNECTED( + "Not connected to %s", to_string() + ); + } + } + private void on_parameters_ready(RootParameters root) { + // Reset the command timer, since we know for the + // moment that the connection is good. + this.command_timer.reset(); + ServerResponse response; try { response = ServerResponse.migrate_from_server(root); + + StatusResponse? status = response as StatusResponse; + if (status != null) { + on_status_response(status); + return; + } + + ServerData? data = response as ServerData; + if (data != null) { + on_server_data(data); + return; + } + + ContinuationResponse? continuation = response as ContinuationResponse; + if (continuation != null) { + on_continuation_response(continuation); + return; + } } catch (ImapError err) { received_bad_response(root, err); - return; } - - StatusResponse? status_response = response as StatusResponse; - if (status_response != null) { - fsm.issue(Event.RECVD_STATUS_RESPONSE, null, status_response); - - return; - } - - ServerData? server_data = response as ServerData; - if (server_data != null) { - fsm.issue(Event.RECVD_SERVER_DATA, null, server_data); - - return; - } - - ContinuationResponse? continuation_response = response as ContinuationResponse; - if (continuation_response != null) { - fsm.issue(Event.RECVD_CONTINUATION_RESPONSE, null, continuation_response); - - return; - } - - error("[%s] Unknown ServerResponse of type %s received: %s:", to_string(), response.get_type().name(), - response.to_string()); + + warning( + "[%s] Unknown ServerResponse of type %s received: %s:", + to_string(), response.get_type().name(), + response.to_string() + ); } - + + private void on_status_response(StatusResponse status) + throws ImapError { + if (status.is_completion) { + Command? sent = get_sent_command(status.tag); + if (sent == null) { + throw new ImapError.SERVER_ERROR( + "Unexpected status response: %s", status.to_string() + ); + } + this.sent_queue.remove(sent); + sent.completed(status); + } + + received_status_response(status); + } + + private void on_server_data(ServerData data) + throws ImapError { + Command? sent = get_sent_command(data.tag); + if (sent != null) { + sent.data_received(data); + } + + received_server_data(data); + } + + private void on_continuation_response(ContinuationResponse continuation) + throws ImapError { + Command? current = this.current_command; + if (current == null) { + throw new ImapError.SERVER_ERROR( + "Unexpected continuation request response: %s", + continuation.to_string() + ); + } + current.continuation_requested(continuation); + + received_continuation_response(continuation); + } + private void on_bytes_received(size_t bytes) { - // as long as receiving someone on the connection, keep the outstanding command timeouts - // alive ... this primarily prevents against the case where a command that generates a long - // download doesn't timeout the commands behind it - increase_timeout(); - + // Reset the command timer, since we know for the + // moment that the connection is good. + this.command_timer.reset(); + received_bytes(bytes); } - + private void on_receive_failure(Error err) { receive_failure(err); } - + private void on_deserialize_failure() { - deserialize_failure(new ImapError.PARSE_ERROR("Unable to deserialize from %s", to_string())); + deserialize_failure( + new ImapError.PARSE_ERROR( + "Unable to deserialize from %s", to_string() + ) + ); } - + private void on_eos() { recv_closed(); } - - public async void send_async(Command cmd, Cancellable? cancellable = null) throws Error { - check_for_connection(); - - // need to run this in critical section because Serializer requires it (don't want to be - // pushing data while a flush_async() is occurring) - int token = yield send_mutex.claim_async(cancellable); - - // This needs to happen inside mutex because flush async also manipulates FSM - if (!issue_conditional_event(Event.SEND)) { - debug("[%s] Send async not allowed", to_string()); - - send_mutex.release(ref token); - - throw new ImapError.NOT_CONNECTED("Send not allowed: connection in %s state", - fsm.get_state_string(fsm.get_state())); - } - - // Always assign a new tag; Commands with pre-assigned Tags should not be re-sent. - // (Do this inside the critical section to ensure commands go out in Tag order; this is not - // an IMAP requirement but makes tracing commands easier.) - cmd.assign_tag(generate_tag()); - - // set the timeout on this command; note that a zero-second timeout means no timeout, - // and that there's no timeout on serialization - cmd_started_timeout(); - - Error? ser_err = null; - try { - // watch for disconnect while waiting for mutex - if (ser != null) { - cmd.serialize(ser, cmd.tag); - } else { - ser_err = new ImapError.NOT_CONNECTED("Send not allowed: connection in %s state", - fsm.get_state_string(fsm.get_state())); - } - } catch (Error err) { - debug("[%s] Error serializing command: %s", to_string(), err.message); - ser_err = err; - } - - send_mutex.release(ref token); - - if (ser_err != null) { - send_failure(ser_err); - - throw ser_err; - } - - // Reset flush timer so it only fires after n msec after last command pushed out to stream - reschedule_flush_timeout(); - - // TODO: technically lying a little bit here; since ClientSession keepalives are rescheduled - // by this signal, will want to tighten this up a bit in the future - sent_command(cmd); - } - /** - * Sends a reply to an unsolicited continuation request. - * - * Do not use this if you need to send literal data as part of a - * command, add it as a {@link LiteralParameter} to the command - * instead. - */ - public async void send_continuation_reply(ContinuationParameter reply, - Cancellable? cancellable = null) - throws Error { - check_for_connection(); - // need to run this in critical section because Serializer requires it (don't want to be - // pushing data while a flush_async() is occurring) - int token = yield send_mutex.claim_async(cancellable); + private void on_command_timeout() { + debug("[%s] Sending command timed out", to_string()); - Error? ser_err = null; - try { - // watch for disconnect while waiting for mutex - if (ser != null) { - reply.serialize_continuation(ser); - } else { - ser_err = new ImapError.NOT_CONNECTED("Send not allowed: connection in %s state", - fsm.get_state_string(fsm.get_state())); - } - } catch (Error err) { - debug("[%s] Error serializing command: %s", to_string(), err.message); - ser_err = err; - } - - this.send_mutex.release(ref token); - - if (ser_err != null) { - send_failure(ser_err); - - throw ser_err; - } - - // Reset flush timer so it only fires after n msec after last command pushed out to stream - reschedule_flush_timeout(); - } - - private void reschedule_flush_timeout() { - unschedule_flush_timeout(); - - if (flush_timeout_id == 0) - flush_timeout_id = Timeout.add_full(Priority.LOW, FLUSH_TIMEOUT_MSEC, on_flush_timeout); - } - - private void unschedule_flush_timeout() { - if (flush_timeout_id != 0) { - Source.remove(flush_timeout_id); - flush_timeout_id = 0; - } - } - - private bool on_flush_timeout() { - do_flush_async.begin(); - - flush_timeout_id = 0; - - return false; - } - - private async void do_flush_async() { - // Like send_async(), need to use mutex when flushing as Serializer must be accessed in - // serialized fashion - // - // NOTE: Because this is happening in the background, it's possible for ser to go to null - // after any yield (if a close occurs while blocking); this is why all the checking is - // required - int token = Nonblocking.Mutex.INVALID_TOKEN; - try { - token = yield send_mutex.claim_async(); - - // Dovecot will hang the connection (not send any replies) if IDLE is sent in the - // same buffer as normal commands, so flush the buffer first, enqueue IDLE, and - // flush that behind the first - bool is_synchronized = false; - while (ser != null) { - // prepare for upcoming synchronization point (continuation response could be - // recv'd before flush_async() completes) and reset prior synchronization response - posted_synchronization_tag = ser.next_synchronized_message(); - synchronization_status_response = null; - - Tag? synchronize_tag; - yield ser.flush_async(is_synchronized, out synchronize_tag); - - // if no tag returned, all done, otherwise synchronization required - if (synchronize_tag == null) - break; - - // no longer synchronized - is_synchronized = false; - - // synchronization is not always possible - if (!issue_conditional_event(Event.SYNCHRONIZE)) { - debug("[%s] Unable to synchronize, exiting do_flush_async", to_string()); - - return; - } - - // the expectation that the send_async() command which enqueued the data buffers - // requiring synchronization closed the IDLE first, but it's possible the response - // has not been received from the server yet, so wait now ... even possible the - // connection is still in the IDLING state, so wait for IDLING -> IDLE -> SYNCHRONIZING - while (is_in_idle(true)) { - debug("[%s] Waiting to exit IDLE for synchronization...", to_string()); - yield idle_notifier.wait_async(); - debug("[%s] Finished waiting to exit IDLE for synchronization", to_string()); - } - - // wait for synchronization point to be reached - debug("[%s] Synchronizing...", to_string()); - yield synchronized_notifier.wait_async(); - - // since can be set before reaching wait_async() (due to waiting for IDLE to - // exit), need to manually reset for next time (can't use auto-reset) - synchronized_notifier.reset(); - - // watch for the synchronization request to be thwarted - if (synchronization_status_response != null) { - debug("[%s]: Failed to synchronize command continuation: %s", to_string(), - synchronization_status_response.to_string()); - - // skip pass current message, this one's done - if (ser != null) - ser.fast_forward_queue(); - } else { - debug("[%s] Synchronized, ready to continue", to_string()); - - // now synchronized, ready to continue - is_synchronized = true; - } - } - - // reset synchronization state - posted_synchronization_tag = null; - synchronization_status_response = null; - - // as connection is "quiet" (haven't seen new command in n msec), go into IDLE state - // if (a) allowed by owner, (b) allowed by state machine, and (c) no commands outstanding - if (ser != null && idle_when_quiet && outstanding_cmds == 0 && issue_conditional_event(Event.SEND_IDLE)) { - IdleCommand idle_cmd = new IdleCommand(); - idle_cmd.assign_tag(generate_tag()); - - // store IDLE tag to watch for response later (many responses could arrive before it) - bool added = posted_idle_tags.add(idle_cmd.tag); - assert(added); - - Logging.debug(Logging.Flag.NETWORK, "[%s] Initiating IDLE: %s", to_string(), - idle_cmd.to_string()); - - idle_cmd.serialize(ser, idle_cmd.tag); - - Tag? synchronize_tag; - yield ser.flush_async(false, out synchronize_tag); - - // flushing IDLE should never require synchronization - assert(synchronize_tag == null); - } - } catch (Error err) { - send_failure(err); - } finally { - if (token != Nonblocking.Mutex.INVALID_TOKEN) { - try { - send_mutex.release(ref token); - } catch (Error err2) { - // ignored - } - } - } - } - - private void check_for_connection() throws Error { - if (cx == null) - throw new ImapError.NOT_CONNECTED("Not connected to %s", to_string()); - } - - private void cmd_started_timeout() { - timeout_cmd_count++; - - if (timeout_id == 0) - timeout_id = Timeout.add_seconds(command_timeout_sec, on_cmd_timeout); - } - - private void cmd_completed_timeout() { - if (timeout_cmd_count > 0) - timeout_cmd_count--; - - if (timeout_cmd_count == 0 && timeout_id != 0) { - Source.remove(timeout_id); - timeout_id = 0; - } - } - - private void increase_timeout() { - if (timeout_id != 0) { - Source.remove(timeout_id); - timeout_id = Timeout.add_seconds(command_timeout_sec, on_cmd_timeout); - } - } - - private void cancel_timeout() { - if (timeout_id != 0) - Source.remove(timeout_id); - - timeout_id = 0; - timeout_cmd_count = 0; - } - - private bool on_cmd_timeout() { - debug("[%s] on_cmd_timeout", to_string()); - - // turn off graceful disconnect ... if the connection is hung, don't want to be stalled - // trying to flush the pipe + // turn off graceful disconnect ... if the connection is hung, + // don't want to be stalled trying to flush the pipe TcpConnection? tcp_cx = cx as TcpConnection; if (tcp_cx != null) tcp_cx.set_graceful_disconnect(false); - - timeout_id = 0; - timeout_cmd_count = 0; - - receive_failure(new ImapError.TIMED_OUT("No response to command(s) after %u seconds", - command_timeout_sec)); - - return false; - } - - public string to_string() { - if (cx != null) { - try { - return "%04X/%s/%s".printf(cx_id, - Inet.address_to_string((InetSocketAddress) cx.get_remote_address()), - fsm.get_state_string(fsm.get_state())); - } catch (Error err) { - // fall through - } - } - - return "%04X/%s/%s".printf(cx_id, endpoint.to_string(), fsm.get_state_string(fsm.get_state())); - } - - // - // transition handlers - // - - private bool issue_conditional_event(Event event) { - bool proceed = false; - fsm.issue(event, &proceed); - - return proceed; - } - - private void signal_server_data(void *user, Object? object) { - received_server_data((ServerData) object); - } - - private void signal_status_response(void *user, Object? object) { - StatusResponse? status_response = object as StatusResponse; - if (status_response != null && status_response.is_completion) { - // stop the countdown timer on the associated command - cmd_completed_timeout(); - } - - received_status_response((StatusResponse) object); - } - - private void signal_continuation(void *user, Object? object) { - received_continuation_response((ContinuationResponse) object); - } - - private void signal_entered_idle() { - in_idle(true); - } - - private void signal_left_idle() { - in_idle(false); - } - - private uint do_proceed(uint state, void *user) { - *((bool *) user) = true; - - return state; - } - - private uint do_no_proceed(uint state, void *user) { - *((bool *) user) = false; - - return state; - } - - private uint on_proceed(uint state, uint event, void *user) { - return do_proceed(state, user); - } - - private uint on_no_proceed(uint state, uint event, void *user) { - return do_no_proceed(state, user); - } - - private uint on_connected(uint state, uint event, void *user) { - // don't stay in connected state if IDLE is to be used; schedule an IDLE command (which - // may be rescheduled if commands immediately start being issued, which they most likely - // will) - if (idle_when_quiet) - reschedule_flush_timeout(); - - return State.CONNECTED; - } - - private uint on_disconnected(uint state, uint event, void *user) { - unschedule_flush_timeout(); - - return State.DISCONNECTED; - } - - private uint on_send_idle(uint state, uint event, void *user) { - return do_proceed(State.IDLING, user); - } - - private uint on_synchronize(uint state, uint event, void *user) { - return do_proceed(State.SYNCHRONIZING, user); - } - - private uint on_idle_synchronize(uint state, uint event, void *user) { - // technically this could be accomplished by introducing an IDLE_SYNCHRONZING (and probably - // an IDLING_SYNCHRONIZING) state to indicate that flush_async() is waiting for the FSM - // to transition from IDLING/IDLE to SYNCHRONIZING so it can send a large buffer, but - // that introduces a lot of complication for a rather quick state used infrequently; this - // simple flag does the trick (see on_idle_status_response for how it's used and cleared) - waiting_for_idle_to_synchronize = true; - - return do_proceed(state, user); - } - - private uint on_deidling_synchronize(uint state, uint event, void *user) { - return do_proceed(State.DEIDLING_SYNCHRONIZING, user); - } - - private uint on_status_response(uint state, uint event, void *user, Object? object) { - fsm.do_post_transition(signal_status_response, user, object); - - return state; - } - - private uint on_server_data(uint state, uint event, void *user, Object? object) { - fsm.do_post_transition(signal_server_data, user, object); - - return state; - } - - private uint on_continuation(uint state, uint event, void *user, Object? object) { - fsm.do_post_transition(signal_continuation, user, object); - - return state; - } - - private uint on_idling_deidling_continuation(uint state, uint event, void *user, Object? object) { - ContinuationResponse continuation = (ContinuationResponse) object; - - Logging.debug(Logging.Flag.NETWORK, "[%s R] %s", to_string(), continuation.to_string()); - - // if deidling and a DONE is outstanding, keep waiting for it to complete -- don't go to - // IDLE, as that will cause another spurious DONE to be issued - if (state == State.DEIDLING && outstanding_idle_dones > 0) - return state; - - // only signal entering IDLE state if that's the case - if (state != State.IDLE) - fsm.do_post_transition(signal_entered_idle); - - return State.IDLE; - } - - private uint on_idle_send(uint state, uint event, void *user) { - Logging.debug(Logging.Flag.NETWORK, "[%s] Closing IDLE", to_string()); - - // TODO: Because there is no DISCONNECTING state, need to watch for the Serializer - // disappearing during a disconnect while in a "normal" state - if (ser == null) { - debug("[%s] Unable to close IDLE: no serializer", to_string()); - - return do_no_proceed(state, user); - } - - try { - Logging.debug(Logging.Flag.NETWORK, "[%s S] %s", to_string(), IDLE_DONE); - ser.push_unquoted_string(IDLE_DONE); - ser.push_eol(); - - // track the number of DONE's outstanding, as their responses are pipelined as well - // (this prevents issuing more than one DONE when the idle continuation response comes - // in *after* issuing the DONE) - outstanding_idle_dones++; - } catch (Error err) { - debug("[%s] Unable to close IDLE: %s", to_string(), err.message); - - return do_no_proceed(state, user); - } - - // only signal leaving IDLE state if that's the case - if (state == State.IDLE) - fsm.do_post_transition(signal_left_idle); - - return do_proceed(State.DEIDLING, user); - } - - private uint on_idle_status_response(uint state, uint event, void *user, Object? object) { - StatusResponse status_response = (StatusResponse) object; - - // if not a posted IDLE tag, then treat as external status response - if (!posted_idle_tags.remove(status_response.tag)) { - fsm.do_post_transition(signal_status_response, user, object); - - return state; - } - - // StatusResponse for one of our IDLE commands; either way, no longer in IDLE mode - if (status_response.status == Status.OK) { - Logging.debug(Logging.Flag.NETWORK, "[%s] Leaving IDLE (%d outstanding): %s", to_string(), - posted_idle_tags.size, status_response.to_string()); - } else { - Logging.debug(Logging.Flag.NETWORK, "[%s] Unable to enter IDLE (%d outstanding): %s", to_string(), - posted_idle_tags.size, status_response.to_string()); - } - - // DONE has round-tripped (but watch for underflows, especially if server "forces" an IDLE - // to complete) - outstanding_idle_dones = Numeric.int_floor(outstanding_idle_dones - 1, 0); - - // Only return to CONNECTED if no other IDLE commands are outstanding (and only signal - // if leaving IDLE state for another) - uint next = (posted_idle_tags.size == 0) ? State.CONNECTED : state; - - // don't signal about the StatusResponse, it's in response to a Command generated - // internally (IDLE) and will confuse watchers who receive StatusResponse for a Command - // they didn't issue - - // However, need to signal about leaving idle - if (state == State.IDLE && next != State.IDLE) - fsm.do_post_transition(signal_left_idle); - - // If leaving IDLE for CONNECTED but user has asked to stay in IDLE whenever quiet, reschedule - // flush (which will automatically send IDLE command) - if (next == State.CONNECTED && idle_when_quiet) - reschedule_flush_timeout(); - - // if flush_async() is waiting for a synchronization point and deidling back to CONNECTED, - // go to SYNCHRONIZING so it can push its buffer to the server - if (waiting_for_idle_to_synchronize && next == State.CONNECTED) { - next = State.SYNCHRONIZING; - waiting_for_idle_to_synchronize = false; - } - - return next; - } - - private uint on_idle_continuation(uint state, uint event, void *user, Object? object) { - if (posted_idle_tags.size == 0) { - debug("[%s] Bad continuation received during IDLE: %s", to_string(), - ((ContinuationResponse) object).to_string()); - } - - return state; - } - - private uint on_deidling_synchronizing_status_response(uint state, uint event, void *user, - Object? object) { - // piggyback on on_idle_status_response, but instead of jumping to CONNECTED, jump to - // SYNCHRONIZING (because IDLE has completed) - return (on_idle_status_response(state, event, user, object) == State.CONNECTED) - ? State.SYNCHRONIZING : state; - } - - private uint on_synchronize_status_response(uint state, uint event, void *user, Object? object) { - StatusResponse status_response = (StatusResponse) object; - - // waiting for status response to synchronization message, treat others normally - if (posted_synchronization_tag == null || !posted_synchronization_tag.equal_to(status_response.tag)) { - fsm.do_post_transition(signal_status_response, user, object); - - return state; - } - - // receive status response while waiting for synchronization of a command; this means the - // server has rejected it - debug("[%s] Command continuation rejected: %s", to_string(), status_response.to_string()); - - // save result and notify sleeping flush_async() - synchronization_status_response = status_response; - synchronized_notifier.blind_notify(); - - return State.CONNECTED; - } - - private uint on_synchronize_continuation(uint state, uint event, void *user, Object? object) { - ContinuationResponse continuation = (ContinuationResponse) object; - - if (posted_synchronization_tag == null) { - debug("[%s] Bad command continuation received: %s", to_string(), - continuation.to_string()); - } else { - debug("[%s] Command continuation received for %s: %s", to_string(), - posted_synchronization_tag.to_string(), continuation.to_string()); - } - - // wake up the sleeping flush_async() call so it will continue - synchronization_status_response = null; - synchronized_notifier.blind_notify(); - - // There is no SYNCHRONIZED state, which is kind of fleeting; the moment the flush_async() - // call continues, no longer synchronized - return State.CONNECTED; - } - - private uint on_bad_transition(uint state, uint event, void *user) { - warning("[%s] Bad cx state transition %s", to_string(), fsm.get_event_issued_string(state, event)); - - return on_no_proceed(state, event, user); - } -} + receive_failure( + new ImapError.TIMED_OUT( + "No response to command(s) after %u seconds", + this.command_timer.interval + ) + ); + } + + private void on_idle_timeout() { + Logging.debug(Logging.Flag.NETWORK, "[%s] Initiating IDLE", to_string()); + try { + this.send_command(new IdleCommand()); + } catch (ImapError err) { + debug("[%s] Error sending IDLE: %s", to_string(), err.message); + } + } + +} diff --git a/src/engine/imap/transport/imap-client-session.vala b/src/engine/imap/transport/imap-client-session.vala index 1e44a750..8f9e9775 100644 --- a/src/engine/imap/transport/imap-client-session.vala +++ b/src/engine/imap/transport/imap-client-session.vala @@ -277,10 +277,6 @@ public class Geary.Imap.ClientSession : BaseObject { private uint unselected_keepalive_secs = 0; private uint selected_with_idle_keepalive_secs = 0; - private Gee.HashMap seen_completion_responses = new Gee.HashMap< - Tag, StatusResponse>(); - private Gee.HashMap waiting_for_completion = new Gee.HashMap< - Tag, CommandCallback>(); private Command? state_change_cmd = null; private Nonblocking.Semaphore? connect_waiter = null; private Error? connect_err = null; @@ -783,17 +779,8 @@ public class Geary.Imap.ClientSession : BaseObject { cx = null; } - - // if there are any outstanding commands waiting for responses, wake them up now - if (waiting_for_completion.size > 0) { - debug("[%s] Cancelling %d pending commands", to_string(), waiting_for_completion.size); - foreach (CommandCallback cmd_cb in waiting_for_completion.values) - Scheduler.on_idle(cmd_cb.callback); - - waiting_for_completion.clear(); - } } - + private uint on_connected(uint state, uint event) { debug("[%s] Connected", to_string()); @@ -966,21 +953,6 @@ public class Geary.Imap.ClientSession : BaseObject { // either way, new capabilities should be available caps = capabilities; - - // Attempt compression (usually only available after authentication) - if (caps.has_setting(Capabilities.COMPRESS, Capabilities.DEFLATE_SETTING)) { - StatusResponse resp = yield send_command_async( - new CompressCommand(CompressCommand.ALGORITHM_DEFLATE)); - if (resp.status == Status.OK) { - install_send_converter(new ZlibCompressor(ZlibCompressorFormat.RAW)); - install_recv_converter(new ZlibDecompressor(ZlibCompressorFormat.RAW)); - debug("[%s] Compression started", to_string()); - } else { - debug("[%s] Unable to start compression: %s", to_string(), resp.to_string()); - } - } else { - debug("[%s] No compression available", to_string()); - } Gee.List server_data = new Gee.ArrayList(); ulong data_id = this.server_data_received.connect((data) => { server_data.add(data); }); @@ -1256,18 +1228,6 @@ public class Geary.Imap.ClientSession : BaseObject { debug("[%s] Keepalive error: %s", to_string(), err.message); } } - - // - // Converters - // - - public bool install_send_converter(Converter converter) { - return (cx != null) ? cx.install_send_converter(converter) : false; - } - - public bool install_recv_converter(Converter converter) { - return (cx != null) ? cx.install_recv_converter(converter) : false; - } // // send commands @@ -1750,38 +1710,17 @@ public class Geary.Imap.ClientSession : BaseObject { // // command submission // - + private async StatusResponse command_transaction_async(Command cmd, Cancellable? cancellable) throws Error { - if (cx == null) + if (this.cx == null) throw new ImapError.NOT_CONNECTED("Not connected to %s", imap_endpoint.to_string()); - - yield cx.send_async(cmd, cancellable); - - // send_async() should've tagged the Command, otherwise the completion_pending will fail - assert(cmd.tag.is_tagged()); - - // If the command didn't complete (i.e. a CompletionStatusResponse didn't return from the - // server) in the context of send_async(), wait for it now - if (!seen_completion_responses.has_key(cmd.tag)) { - waiting_for_completion.set(cmd.tag, new CommandCallback(command_transaction_async.callback)); - yield; - } - - // it should be seen now; if not, it's because of disconnection cancelling all the outstanding - // requests - StatusResponse? completion_response; - if (!seen_completion_responses.unset(cmd.tag, out completion_response)) { - assert(cx == null); - - throw new ImapError.NOT_CONNECTED("Not connected to %s", imap_endpoint.to_string()); - } - - assert(completion_response != null); - - return completion_response; + + this.cx.send_command(cmd); + yield cmd.wait_until_complete(cancellable); + return cmd.status; } - + // // network connection event handlers // @@ -1835,24 +1774,14 @@ public class Geary.Imap.ClientSession : BaseObject { err.message); } } - + // update state machine before notifying subscribers, who may turn around and query ClientSession if (status_response.is_completion) { fsm.issue(Event.RECV_COMPLETION, null, status_response, null); - - // Note that this signal could be called in the context of cx.send_async() that sent - // this command to the server ... this mechanism (seen_completion_response and - // waiting_for_completion) assures that in either case issue_command_async() returns - // when the command is completed - seen_completion_responses.set(status_response.tag, status_response); - - CommandCallback? cmd_cb; - if (waiting_for_completion.unset(status_response.tag, out cmd_cb)) - Scheduler.on_idle(cmd_cb.callback); } else { fsm.issue(Event.RECV_STATUS, null, status_response, null); } - + status_response_received(status_response); } diff --git a/src/engine/imap/transport/imap-serializer.vala b/src/engine/imap/transport/imap-serializer.vala index 78696d69..804fa0ce 100644 --- a/src/engine/imap/transport/imap-serializer.vala +++ b/src/engine/imap/transport/imap-serializer.vala @@ -5,232 +5,99 @@ */ /** - * Serializer asynchronously writes serialized IMAP commands to the supplied output stream via a - * queue of buffers. + * Writes serialized IMAP commands to a supplied output stream. * - * Since most IMAP commands are small in size (one line of data, often under 64 bytes), the - * Serializer writes them to a queue of temporary buffers (interspersed with user-supplied buffers - * that are intended to be literal data). The data is only written when {@link flush_async} is - * invoked. - * - * This means that if the caller wants some buffer beyond the steps described above, they should - * pass in a BufferedOutputStream (or one of its subclasses). flush_async() will flush the user's - * OutputStream after writing to it. - * - * Command continuation requires some synchronization between the Serializer and the - * {@link Deserializer}. It also requires some queue management. See {@link fast_forward_queue} - * and {@link next_synchronized_message}. + * Command continuation requires some synchronization between the + * Serializer and the {@link Deserializer}. It also requires some + * queue management. See {@link push_quoted_string} and {@link + * next_synchronized_message}. * * @see Deserializer */ public class Geary.Imap.Serializer : BaseObject { - private class SerializedData { - public Memory.Buffer buffer; - public Tag? literal_data_tag; - - public SerializedData(Memory.Buffer buffer, Tag? literal_data_tag) { - this.buffer = buffer; - this.literal_data_tag = literal_data_tag; - } - } - + private string identifier; - private OutputStream outs; - private ConverterOutputStream couts; - private MemoryOutputStream mouts; - private DataOutputStream douts; - private Geary.Stream.MidstreamConverter midstream = new Geary.Stream.MidstreamConverter("Serializer"); - private Gee.Queue datastream = new Gee.LinkedList(); - - public Serializer(string identifier, OutputStream outs) { + private DataOutputStream output; + + public Serializer(string identifier, OutputStream output) { this.identifier = identifier; - this.outs = outs; - - // prepare the ConverterOutputStream (which wraps the caller's OutputStream and allows for - // midstream conversion) - couts = new ConverterOutputStream(outs, midstream); - couts.set_close_base_stream(false); - - // prepare the DataOutputStream (which generates buffers for the queue) - mouts = new MemoryOutputStream(null, realloc, free); - douts = new DataOutputStream(mouts); - douts.set_close_base_stream(false); + this.output = new DataOutputStream(output); + this.output.set_close_base_stream(false); } - - public bool install_converter(Converter converter) { - return midstream.install(converter); - } - - public void push_ascii(char ch) throws Error { - douts.put_byte(ch, null); - } - + /** - * Pushes the string to the IMAP server with quoting applied whether required or not. Returns - * true if quoting was required. + * Pushes the string to the IMAP server with quoting. + * + * This is applied whether required or not. Returns true if + * quoting was required. */ - public bool push_quoted_string(string str) throws Error { + public bool push_quoted_string(string str, + GLib.Cancellable? cancellable = null) + throws GLib.Error { string quoted; DataFormat.Quoting requirement = DataFormat.convert_to_quoted(str, out quoted); - - douts.put_string(quoted); - + + this.output.put_string(quoted, cancellable); + return (requirement == DataFormat.Quoting.REQUIRED); } - + /** - * This will push the string to IMAP as-is. Use only if you absolutely know what you're doing. + * This will push the string to IMAP as-is. + * + * Use only if you absolutely know what you're doing. */ - public void push_unquoted_string(string str) throws Error { - douts.put_string(str); + public void push_unquoted_string(string str, + GLib.Cancellable? cancellable = null) + throws GLib.Error { + this.output.put_string(str, cancellable); } - - public void push_space() throws Error { - douts.put_byte(' ', null); + + public void push_ascii(char ch, GLib.Cancellable? cancellable = null) + throws GLib.Error { + this.output.put_byte(ch, cancellable); } - - public void push_nil() throws Error { - douts.put_string(NilParameter.VALUE, null); + + public void push_space(GLib.Cancellable? cancellable = null) + throws GLib.Error { + this.output.put_byte(' ', cancellable); } - - public void push_eol() throws Error { - douts.put_string("\r\n", null); + + public void push_nil(GLib.Cancellable? cancellable = null) + throws GLib.Error { + this.output.put_string(NilParameter.VALUE, cancellable); } - - private void enqueue_current_stream() throws IOError { - size_t length = mouts.get_data_size(); - if (length <= 0) - return; - - // close before converting to Memory.ByteBuffer - mouts.close(); - - SerializedData data = new SerializedData( - new Memory.ByteBuffer.from_memory_output_stream(mouts), null); - datastream.add(data); - - mouts = new MemoryOutputStream(null, realloc, free); - douts = new DataOutputStream(mouts); - douts.set_close_base_stream(false); + + public void push_eol(GLib.Cancellable? cancellable = null) + throws GLib.Error { + this.output.put_string("\r\n", cancellable); } - - /* - * Pushes an {link Memory.Buffer} to the serialized stream that must be synchronized - * with the server before transmission. - * - * Literal data may require synchronization with the server and so should only be used when - * necessary. See {link DataFormat.is_quoting_required} to test data. - * - * The supplied buffer must not be mutated once submitted to the {@link Serializer}. - * - * See [[http://tools.ietf.org/html/rfc3501#section-4.3]] and - * [[http://tools.ietf.org/html/rfc3501#section-7.5]] - */ - public void push_synchronized_literal_data(Tag tag, Memory.Buffer buffer) throws Error { - enqueue_current_stream(); - datastream.add(new SerializedData(buffer, tag)); - } - + /** - * Indicates that a complete message has been pushed to the {@link Serializer}. - * - * It's important to delineate messages for the Serializer, as it aids in queue management - * and command continuation (synchronization). + * Pushes literal data to the output stream. */ - public void push_end_of_message() throws Error { - enqueue_current_stream(); - datastream.add(null); + public async void push_literal_data(Memory.Buffer buffer, + GLib.Cancellable? cancellable = null) + throws GLib.Error { + yield this.output.splice_async( + buffer.get_input_stream(), + OutputStreamSpliceFlags.NONE, + Priority.DEFAULT, + cancellable + ); } - + /** - * Returns the {@link Tag} for the message with the next synchronization message Tag. - * - * This can be used to prepare for receiving a command continuation failure before sending - * the request via {@link flush_async}, as the response could return before that call completes. + * Flushes the output stream, ensuring a command has been sent. */ - public Tag? next_synchronized_message() { - foreach (SerializedData? data in datastream) { - if (data != null && data.literal_data_tag != null) - return data.literal_data_tag; - } - - return null; + public async void flush_stream(GLib.Cancellable? cancellable = null) + throws GLib.Error { + yield this.output.flush_async(Priority.DEFAULT, cancellable); } - - /** - * Discards all buffers associated with the current message and moves the queue forward to the - * next one. - * - * This is useful when a command continuation is refused by the server and the command must be - * aborted. - * - * Any data currently in the buffer is *not* enqueued, as by definition it has not been marked - * with {@link push_end_of_message}. - */ - public void fast_forward_queue() { - while (!datastream.is_empty) { - if (datastream.poll() == null) - break; - } - } - - /** - * Push all serialized data and buffers onto the wire. - * - * Caller should pass is_synchronized=true if the connection has been synchronized for a command - * continuation. - * - * If synchronize_tag returns non-null, then the flush has not completed. The connection must - * wait for the server to send a continuation response before continuing. When ready, call - * flush_async() again with is_synchronized set to true. The tag is supplied to watch for - * an error condition from the server (which may reject the synchronization request). - */ - public async void flush_async(bool is_synchronized, out Tag? synchronize_tag, - Cancellable? cancellable = null) throws Error { - synchronize_tag = null; - - // commit the last buffer to the queue (although this is best done with push_end_message) - enqueue_current_stream(); - - // walk the SerializedData queue, pushing each out to the wire unless a synchronization - // point is encountered - while (!datastream.is_empty) { - // see if next data buffer is synchronized - SerializedData? data = datastream.peek(); - if (data != null && data.literal_data_tag != null && !is_synchronized) { - // report the Tag that is associated with the continuation - synchronize_tag = data.literal_data_tag; - - // break out to ensure pipe is flushed - break; - } - - // if not, remove and process - data = datastream.poll(); - if (data == null) { - // end of message, move on - continue; - } - - Logging.debug(Logging.Flag.SERIALIZER, "[%s] %s", to_string(), data.buffer.to_string()); - - // splice buffer's InputStream directly into OutputStream - yield couts.splice_async(data.buffer.get_input_stream(), OutputStreamSpliceFlags.NONE, - Priority.DEFAULT, cancellable); - - // if synchronized before, not any more - is_synchronized = false; - } - - // make sure everything is flushed out now ... some trouble with BufferedOutputStreams - // here, so flush ConverterOutputStream and its base stream - yield couts.flush_async(Priority.DEFAULT, cancellable); - yield couts.base_stream.flush_async(Priority.DEFAULT, cancellable); - } - + public string to_string() { return "ser:%s".printf(identifier); } -} +}