From a43ab81fdfdaef8b2d421598bde88915312bb35d Mon Sep 17 00:00:00 2001 From: Jim Nelson Date: Wed, 4 May 2011 17:20:17 -0700 Subject: [PATCH] Improvements to Serializer and Deserializer The input mode for Deserializer meant that the caller needed to manage the input stream and push data in an appropriate way; this is error-prone. Now Deserializer manages the input stream and the modes it must be read from. Serializer still works in a similar fashion as before, but now it deals with literal data more efficiently, writing it to the output stream directly (via splice_async()) rather than into the in-memory temporary buffer. Serializable's serialize() method is now async, meaning that all serialization can occur asynchronously, which is how we want it going forward. --- src/console/main.vala | 13 +-- src/engine/imap/ClientConnection.vala | 155 ++++++-------------------- src/engine/imap/ClientSession.vala | 5 +- src/engine/imap/Deserializer.vala | 125 ++++++++++++++++++--- src/engine/imap/Parameter.vala | 16 +-- src/engine/imap/ResponseCode.vala | 6 +- src/engine/imap/Serializable.vala | 2 +- src/engine/imap/Serializer.vala | 69 +++++++++--- src/engine/state/Machine.vala | 6 +- src/tests/syntax.vala | 22 +++- 10 files changed, 236 insertions(+), 183 deletions(-) diff --git a/src/console/main.vala b/src/console/main.vala index d6998121..8a9d9f8f 100644 --- a/src/console/main.vala +++ b/src/console/main.vala @@ -235,17 +235,14 @@ class ImapConsole : Gtk.Window { } private void on_connected(Object? source, AsyncResult result) { + cx.sent_command.connect(on_sent_command); + cx.received_status_response.connect(on_received_status_response); + cx.received_server_data.connect(on_received_server_data); + cx.received_bad_response.connect(on_received_bad_response); + try { cx.connect_async.end(result); status("Connected"); - - cx.sent_command.connect(on_sent_command); - cx.received_status_response.connect(on_received_status_response); - cx.received_server_data.connect(on_received_server_data); - cx.received_bad_response.connect(on_received_bad_response); - - // start transmission and reception - cx.xon(); } catch (Error err) { cx = null; diff --git a/src/engine/imap/ClientConnection.vala b/src/engine/imap/ClientConnection.vala index ae6f94d9..d2cacca7 100644 --- a/src/engine/imap/ClientConnection.vala +++ b/src/engine/imap/ClientConnection.vala @@ -12,12 +12,8 @@ public class Geary.Imap.ClientConnection { private uint16 default_port; private SocketClient socket_client = new SocketClient(); private SocketConnection? cx = null; - private DataInputStream? dins = null; - private int ins_priority = Priority.DEFAULT; - private Cancellable ins_cancellable = new Cancellable(); - private bool flow_controlled = true; - private Deserializer des = new Deserializer(); - private uint8[] block_buffer = new uint8[4096]; + private Serializer? ser = null; + private Deserializer? des = null; private int tag_counter = 0; private char tag_prefix = 'a'; @@ -27,9 +23,6 @@ public class Geary.Imap.ClientConnection { public virtual signal void disconnected() { } - public virtual signal void flow_control(bool xon) { - } - public virtual signal void sent_command(Command cmd) { } @@ -42,7 +35,13 @@ public class Geary.Imap.ClientConnection { public virtual signal void received_bad_response(RootParameters root, ImapError err) { } - public virtual signal void receive_failed(Error err) { + public virtual signal void recv_closed() { + } + + public virtual signal void receive_failure(Error err) { + } + + public virtual signal void deserialize_failure() { } public ClientConnection(string host_specifier, uint16 default_port) { @@ -51,8 +50,6 @@ public class Geary.Imap.ClientConnection { socket_client.set_tls(true); socket_client.set_tls_validation_flags(TlsCertificateFlags.UNKNOWN_CA); - - des.parameters_ready.connect(on_parameters_ready); } ~ClientConnection() { @@ -79,10 +76,16 @@ public class Geary.Imap.ClientConnection { throw new IOError.EXISTS("Already connected to %s", to_string()); cx = yield socket_client.connect_to_host_async(host_specifier, default_port, cancellable); - dins = new DataInputStream(cx.input_stream); - dins.set_newline_type(DataStreamNewlineType.CR_LF); + ser = new Serializer(new BufferedOutputStream(cx.output_stream)); + des = new Deserializer(new BufferedInputStream(cx.input_stream)); + des.parameters_ready.connect(on_parameters_ready); + des.receive_failure.connect(on_receive_failure); + des.deserialize_failure.connect(on_deserialize_failure); + des.eos.connect(on_eos); connected(); + + des.xon(); } public async void disconnect_async(Cancellable? cancellable = null) @@ -93,72 +96,12 @@ public class Geary.Imap.ClientConnection { yield cx.close_async(Priority.DEFAULT, cancellable); cx = null; - dins = null; + ser = null; + des = null; disconnected(); } - public void xon(int priority = Priority.DEFAULT) throws Error { - check_for_connection(); - - if (!flow_controlled) - return; - - flow_controlled = false; - ins_priority = priority; - - next_deserialize_step(); - - flow_control(true); - } - - private void next_deserialize_step() { - switch (des.get_mode()) { - case Deserializer.Mode.LINE: - dins.read_line_async.begin(ins_priority, ins_cancellable, on_read_line); - break; - - case Deserializer.Mode.BLOCK: - long count = long.min(block_buffer.length, des.get_max_data_length()); - dins.read_async.begin(block_buffer[0:count], ins_priority, ins_cancellable, - on_read_block); - break; - - default: - error("Failed"); - } - } - - private void on_read_line(Object? source, AsyncResult result) { - try { - string line = dins.read_line_async.end(result); - des.push_line(line); - } catch (Error err) { - if (!(err is IOError.CANCELLED)) - receive_failed(err); - - return; - } - - if (!flow_controlled) - next_deserialize_step(); - } - - private void on_read_block(Object? source, AsyncResult result) { - try { - ssize_t read = dins.read_async.end(result); - des.push_data(block_buffer[0:read]); - } catch (Error err) { - if (!(err is IOError.CANCELLED)) - receive_failed(err); - - return; - } - - if (!flow_controlled) - next_deserialize_step(); - } - private void on_parameters_ready(RootParameters root) { try { bool is_status_response; @@ -173,17 +116,16 @@ public class Geary.Imap.ClientConnection { } } - public void xoff() throws Error { - check_for_connection(); - - if (flow_controlled) - return; - - // turn off the spigot - // TODO: Don't cancel the read, merely don't post the next window - flow_controlled = true; - ins_cancellable.cancel(); - ins_cancellable = new Cancellable(); + private void on_receive_failure(Error err) { + receive_failure(err); + } + + private void on_deserialize_failure() { + deserialize_failure(); + } + + private void on_eos() { + recv_closed(); } /** @@ -206,47 +148,16 @@ public class Geary.Imap.ClientConnection { Cancellable? cancellable = null) throws Error { check_for_connection(); - Serializer ser = new Serializer(); cmd.serialize(ser); - assert(ser.has_content()); - yield write_all_async(ser, priority, cancellable); + // TODO: At this point, we flush each command as it's written; at some point we'll have + // a queuing strategy that means serialized data is pushed out to the wire only at certain + // times + yield ser.flush_async(priority, cancellable); sent_command(cmd); } - public async void send_multiple_async(Gee.List cmds, int priority = Priority.DEFAULT, - Cancellable? cancellable = null) throws Error { - if (cmds.size == 0) - return; - - check_for_connection(); - - Serializer ser = new Serializer(); - foreach (Command cmd in cmds) - cmd.serialize(ser); - assert(ser.has_content()); - - yield write_all_async(ser, priority, cancellable); - - // Variable named due to this bug: https://bugzilla.gnome.org/show_bug.cgi?id=596861 - foreach (Command cmd2 in cmds) - sent_command(cmd2); - } - - // Can't pass the raw buffer due to this bug: https://bugzilla.gnome.org/show_bug.cgi?id=639054 - private async void write_all_async(Serializer ser, int priority, Cancellable? cancellable) - throws Error { - ssize_t index = 0; - size_t length = ser.get_content_length(); - while (index < length) { - index += yield cx.output_stream.write_async(ser.get_content()[index:length], - priority, cancellable); - if (index < length) - debug("PARTIAL WRITE TO %s: %lu/%lu bytes", to_string(), index, length); - } - } - private void check_for_connection() throws Error { if (cx == null) throw new IOError.CLOSED("Not connected to %s", to_string()); diff --git a/src/engine/imap/ClientSession.vala b/src/engine/imap/ClientSession.vala index a3d4a88d..7db6d791 100644 --- a/src/engine/imap/ClientSession.vala +++ b/src/engine/imap/ClientSession.vala @@ -45,13 +45,10 @@ public class Geary.Imap.ClientSession : Object, Geary.Account { cx.received_status_response.connect(on_received_status_response); cx.received_server_data.connect(on_received_server_data); cx.received_bad_response.connect(on_received_bad_response); - cx.receive_failed.connect(on_receive_failed); + cx.receive_failure.connect(on_receive_failed); yield cx.connect_async(cancellable); - // start receiving traffic from the server - cx.xon(); - // wait for the initial OK response from the server cb_queue.offer(new CommandCallback(connect_async.callback)); awaiting_connect_response = true; diff --git a/src/engine/imap/Deserializer.vala b/src/engine/imap/Deserializer.vala index ce8824a4..2bad91a4 100644 --- a/src/engine/imap/Deserializer.vala +++ b/src/engine/imap/Deserializer.vala @@ -4,8 +4,20 @@ * (version 2.1 or later). See the COPYING file in this distribution. */ +/** + * The Deserializer performs asynchronous I/O on a supplied input stream and transforms the raw + * bytes into IMAP Parameters (which can then be converted into ServerResponses or ServerData). + * The Deserializer will only begin reading from the stream when xon() is called. Calling xoff() + * will flow control the stream, halting reading without closing the stream itself. Since all + * results from the Deserializer are reported via signals, those signals should be connected to + * prior to calling xon(), or the caller risks missing early messages. (Note that since + * Deserializer uses async I/O, this isn't technically possible unless the signals are connected + * after the Idle loop has a chance to run; however, this is an implementation detail and shouldn't + * be relied upon.) + */ + public class Geary.Imap.Deserializer { - public enum Mode { + private enum Mode { LINE, BLOCK, FAILED @@ -67,18 +79,31 @@ public class Geary.Imap.Deserializer { "Geary.Imap.Deserializer", State.TAG, State.COUNT, Event.COUNT, state_to_string, event_to_string); + private DataInputStream dins; private Geary.State.Machine fsm; private ListParameter current; private RootParameters root = new RootParameters(); private StringBuilder? current_string = null; private LiteralParameter? current_literal = null; private long literal_length_remaining = 0; + private uint8[] block_buffer = new uint8[4096]; + private bool flow_controlled = true; + private int ins_priority = Priority.DEFAULT; + + public signal void flow_control(bool xon); public signal void parameters_ready(RootParameters root); - public signal void failed(); + public signal void eos(); - public Deserializer() { + public signal void receive_failure(Error err); + + public signal void deserialize_failure(); + + public Deserializer(InputStream ins) { + dins = new DataInputStream(ins); + dins.set_newline_type(DataStreamNewlineType.CR_LF); + current = root; Geary.State.Mapping[] mappings = { @@ -104,22 +129,100 @@ public class Geary.Imap.Deserializer { fsm = new Geary.State.Machine(machine_desc, mappings, on_bad_transition); } + public void xon(int priority = Priority.DEFAULT) { + if (!flow_controlled || get_mode() == Mode.FAILED) + return; + + flow_controlled = false; + ins_priority = priority; + + next_deserialize_step(); + + flow_control(true); + } + + public void xoff() { + if (flow_controlled || get_mode() == Mode.FAILED) + return; + + flow_controlled = true; + + flow_control(false); + } + + private void next_deserialize_step() { + switch (get_mode()) { + case Mode.LINE: + dins.read_line_async.begin(ins_priority, null, on_read_line); + break; + + case Mode.BLOCK: + assert(literal_length_remaining > 0); + long count = long.min(block_buffer.length, literal_length_remaining); + dins.read_async.begin(block_buffer[0:count], ins_priority, null, on_read_block); + break; + + default: + assert_not_reached(); + } + } + + private void on_read_line(Object? source, AsyncResult result) { + try { + string? line = dins.read_line_async.end(result); + if (line == null) { + eos(); + + return; + } + + push_line(line); + } catch (Error err) { + receive_failure(err); + + return; + } + + if (!flow_controlled) + next_deserialize_step(); + } + + private void on_read_block(Object? source, AsyncResult result) { + try { + ssize_t read = dins.read_async.end(result); + if (read == 0) { + eos(); + + return; + } + + push_data(block_buffer[0:read]); + } catch (Error err) { + receive_failure(err); + + return; + } + + if (!flow_controlled) + next_deserialize_step(); + } + // Push a line (without the CRLF!). - public Mode push_line(string line) { + private Mode push_line(string line) { assert(get_mode() == Mode.LINE); int index = 0; unichar ch; while (line.get_next_char(ref index, out ch)) { if (fsm.issue(Event.CHAR, &ch) == State.FAILED) { - failed(); + deserialize_failure(); return Mode.FAILED; } } if (fsm.issue(Event.EOL) == State.FAILED) { - failed(); + deserialize_failure(); return Mode.FAILED; } @@ -127,17 +230,13 @@ public class Geary.Imap.Deserializer { return get_mode(); } - public long get_max_data_length() { - return literal_length_remaining; - } - // Push a block of literal data - public Mode push_data(uint8[] data) { + private Mode push_data(owned uint8[] data) { assert(get_mode() == Mode.BLOCK); LiteralData literal_data = LiteralData(data); if (fsm.issue(Event.DATA, &literal_data) == State.FAILED) { - failed(); + deserialize_failure(); return Mode.FAILED; } @@ -145,7 +244,7 @@ public class Geary.Imap.Deserializer { return get_mode(); } - public Mode get_mode() { + private Mode get_mode() { switch (fsm.get_state()) { case State.LITERAL_DATA: return Mode.BLOCK; diff --git a/src/engine/imap/Parameter.vala b/src/engine/imap/Parameter.vala index 0b4f88ee..3462e24d 100644 --- a/src/engine/imap/Parameter.vala +++ b/src/engine/imap/Parameter.vala @@ -5,7 +5,7 @@ */ public abstract class Geary.Imap.Parameter : Object, Serializable { - public abstract void serialize(Serializer ser) throws Error; + public abstract async void serialize(Serializer ser) throws Error; // to_string() returns a representation of the Parameter suitable for logging and debugging, // but should not be relied upon for wire or persistent representation. @@ -31,7 +31,7 @@ public class Geary.Imap.StringParameter : Geary.Imap.Parameter { return value; } - public override void serialize(Serializer ser) throws Error { + public override async void serialize(Serializer ser) throws Error { ser.push_string(value); } } @@ -61,10 +61,10 @@ public class Geary.Imap.LiteralParameter : Geary.Imap.Parameter { return "{literal/%ldb}".printf(size); } - public override void serialize(Serializer ser) throws Error { + public override async void serialize(Serializer ser) throws Error { ser.push_string("{%ld}".printf(size)); ser.push_eol(); - ser.push_input_stream_literal_data(mins); + yield ser.push_input_stream_literal_data_async(mins); // seek to start mins.seek(0, SeekType.SET); @@ -148,10 +148,10 @@ public class Geary.Imap.ListParameter : Geary.Imap.Parameter { } } - public override void serialize(Serializer ser) throws Error { - ser.push_string("("); + public override async void serialize(Serializer ser) throws Error { + ser.push_ascii('('); serialize_list(ser); - ser.push_string(")"); + ser.push_ascii(')'); } } @@ -170,7 +170,7 @@ public class Geary.Imap.RootParameters : Geary.Imap.ListParameter { return stringize_list(); } - public override void serialize(Serializer ser) throws Error { + public override async void serialize(Serializer ser) throws Error { serialize_list(ser); ser.push_eol(); } diff --git a/src/engine/imap/ResponseCode.vala b/src/engine/imap/ResponseCode.vala index d25ba7cf..41d695fc 100644 --- a/src/engine/imap/ResponseCode.vala +++ b/src/engine/imap/ResponseCode.vala @@ -13,10 +13,10 @@ public class Geary.Imap.ResponseCode : Geary.Imap.ListParameter { return "[%s]".printf(stringize_list()); } - public override void serialize(Serializer ser) throws Error { - ser.push_string("["); + public override async void serialize(Serializer ser) throws Error { + ser.push_ascii('['); serialize_list(ser); - ser.push_string("]"); + ser.push_ascii(']'); } } diff --git a/src/engine/imap/Serializable.vala b/src/engine/imap/Serializable.vala index 2d5cbb06..f7374408 100644 --- a/src/engine/imap/Serializable.vala +++ b/src/engine/imap/Serializable.vala @@ -5,6 +5,6 @@ */ public interface Geary.Imap.Serializable { - public abstract void serialize(Serializer ser) throws Error; + public abstract async void serialize(Serializer ser) throws Error; } diff --git a/src/engine/imap/Serializer.vala b/src/engine/imap/Serializer.vala index e89055a3..ade00bdd 100644 --- a/src/engine/imap/Serializer.vala +++ b/src/engine/imap/Serializer.vala @@ -4,25 +4,34 @@ * (version 2.1 or later). See the COPYING file in this distribution. */ +/** + * The Serializer asynchronously writes serialized IMAP commands to the supplied output stream. + * Since most IMAP commands are small in size (one line of data, often under 64 bytes), the + * Serializer writes them to a temporary buffer, only writing to the actual stream when literal data + * is written (which can often be large and coming off of disk) or commit_async() is called, which + * should be invoked when convenient, to prevent the buffer from growing too large. + * + * Because of this situation, the serialized commands will not necessarily reach the output stream + * unless commit_async() is called, which pushes the in-memory bytes to it. Since the + * output stream itself may be buffered, flush_async() should be called to verify the bytes have + * reached the wire. + * + * flush_async() implies commit_async(), but the reverse is not true. + */ + public class Geary.Imap.Serializer { + private OutputStream outs; private MemoryOutputStream mouts; private DataOutputStream douts; - public Serializer() { + public Serializer(OutputStream outs) { + this.outs = outs; mouts = new MemoryOutputStream(null, realloc, free); douts = new DataOutputStream(mouts); } - public unowned uint8[] get_content() { - return mouts.get_data(); - } - - public size_t get_content_length() { - return mouts.get_data_size(); - } - - public bool has_content() { - return get_content_length() > 0; + public void push_ascii(char ch) throws Error { + douts.put_byte(ch, null); } public void push_string(string str) throws Error { @@ -37,14 +46,40 @@ public class Geary.Imap.Serializer { douts.put_string("\r\n", null); } - public void push_literal_data(uint8[] data) throws Error { - size_t written; - douts.write_all(data, out written); - assert(written == data.length); + public async void push_input_stream_literal_data_async(InputStream ins, + int priority = Priority.DEFAULT, Cancellable? cancellable = null) throws Error { + // commit the in-memory buffer to the output stream + yield commit_async(priority, cancellable); + + // splice the literal data directly to the output stream + yield outs.splice_async(ins, OutputStreamSpliceFlags.NONE, priority, cancellable); } - public void push_input_stream_literal_data(InputStream ins) throws Error { - douts.splice(ins, OutputStreamSpliceFlags.NONE); + // commit_async() takes the stored (in-memory) serialized data and writes it asynchronously + // to the wrapped OutputStream. Note that this is *not* a flush, as it's possible the + // serialized data will be stored in a buffer in the OutputStream. Use flush_async() to force + // data onto the wire. + public async void commit_async(int priority = Priority.DEFAULT, Cancellable? cancellable = null) + throws Error { + size_t length = mouts.get_data_size(); + if (length == 0) + return; + + ssize_t index = 0; + do { + index += yield outs.write_async(mouts.get_data()[index:length], priority, cancellable); + } while (index < length); + + mouts = new MemoryOutputStream(null, realloc, free); + douts = new DataOutputStream(mouts); + } + + // This pushes all serialized data onto the wire. This calls commit_async() before + // flushing. + public async void flush_async(int priority = Priority.DEFAULT, Cancellable? cancellable = null) + throws Error { + yield commit_async(priority, cancellable); + yield outs.flush_async(priority, cancellable); } } diff --git a/src/engine/state/Machine.vala b/src/engine/state/Machine.vala index 1436b85a..0116083d 100644 --- a/src/engine/state/Machine.vala +++ b/src/engine/state/Machine.vala @@ -51,7 +51,7 @@ public class Geary.State.Machine { this.logging = logging; } - public bool get_logging() { + public bool is_logging() { return logging; } @@ -61,7 +61,7 @@ public class Geary.State.Machine { unowned Mapping? mapping = transitions[state, event]; - Transition transition = (mapping != null) ? mapping.transition : default_transition; + Transition? transition = (mapping != null) ? mapping.transition : default_transition; if (transition == null) { string msg = "%s: No transition defined at %s for %s".printf(to_string(), descriptor.get_state_string(state), descriptor.get_event_string(event)); @@ -86,7 +86,7 @@ public class Geary.State.Machine { assert(locked); locked = false; - if (get_logging()) { + if (is_logging()) { message("%s: State transition from %s to %s due to event %s", to_string(), descriptor.get_state_string(old_state), descriptor.get_state_string(state), descriptor.get_event_string(event)); diff --git a/src/tests/syntax.vala b/src/tests/syntax.vala index b5abcc02..abda8775 100644 --- a/src/tests/syntax.vala +++ b/src/tests/syntax.vala @@ -4,6 +4,8 @@ * (version 2.1 or later). See the COPYING file in this distribution. */ +MainLoop? main_loop = null; + void print(int depth, Gee.List params) { string pad = string.nfill(depth * 4, ' '); @@ -24,6 +26,10 @@ void on_params_ready(Geary.Imap.RootParameters root) { print(0, root.get_all()); } +void on_eos() { + main_loop.quit(); +} + int main(string[] args) { if (args.length < 2) { stderr.printf("usage: syntax \n"); @@ -31,8 +37,7 @@ int main(string[] args) { return 1; } - Geary.Imap.Deserializer des = new Geary.Imap.Deserializer(); - des.parameters_ready.connect(on_params_ready); + main_loop = new MainLoop(); // turn argument into single line for deserializer string line = ""; @@ -41,10 +46,19 @@ int main(string[] args) { if (ctr < (args.length - 1)) line += " "; } + line += "\r\n"; + + MemoryInputStream mins = new MemoryInputStream(); + mins.add_data(line.data, null); + + Geary.Imap.Deserializer des = new Geary.Imap.Deserializer(mins); + des.parameters_ready.connect(on_params_ready); + des.eos.connect(on_eos); stdout.printf("INPUT: >%s<\n", line); - Geary.Imap.Deserializer.Mode mode = des.push_line(line); - stdout.printf("INPUT MODE: %s\n", mode.to_string()); + des.xon(); + + main_loop.run(); return 0; }