Improvements to Serializer and Deserializer

The input mode for Deserializer meant that the caller needed to manage the input stream and push data in an appropriate way; this is error-prone.  Now Deserializer manages the input stream and the modes it must be read from.  Serializer still works in a similar fashion as before, but now it deals with literal data more efficiently, writing it to the output stream directly (via splice_async()) rather than into the in-memory temporary buffer.  Serializable's serialize() method is now async, meaning that all serialization can occur asynchronously, which is how we want it going forward.
This commit is contained in:
Jim Nelson 2011-05-04 17:20:17 -07:00
parent cbe3440fef
commit a43ab81fdf
10 changed files with 236 additions and 183 deletions

View file

@ -235,17 +235,14 @@ class ImapConsole : Gtk.Window {
}
private void on_connected(Object? source, AsyncResult result) {
try {
cx.connect_async.end(result);
status("Connected");
cx.sent_command.connect(on_sent_command);
cx.received_status_response.connect(on_received_status_response);
cx.received_server_data.connect(on_received_server_data);
cx.received_bad_response.connect(on_received_bad_response);
// start transmission and reception
cx.xon();
try {
cx.connect_async.end(result);
status("Connected");
} catch (Error err) {
cx = null;

View file

@ -12,12 +12,8 @@ public class Geary.Imap.ClientConnection {
private uint16 default_port;
private SocketClient socket_client = new SocketClient();
private SocketConnection? cx = null;
private DataInputStream? dins = null;
private int ins_priority = Priority.DEFAULT;
private Cancellable ins_cancellable = new Cancellable();
private bool flow_controlled = true;
private Deserializer des = new Deserializer();
private uint8[] block_buffer = new uint8[4096];
private Serializer? ser = null;
private Deserializer? des = null;
private int tag_counter = 0;
private char tag_prefix = 'a';
@ -27,9 +23,6 @@ public class Geary.Imap.ClientConnection {
public virtual signal void disconnected() {
}
public virtual signal void flow_control(bool xon) {
}
public virtual signal void sent_command(Command cmd) {
}
@ -42,7 +35,13 @@ public class Geary.Imap.ClientConnection {
public virtual signal void received_bad_response(RootParameters root, ImapError err) {
}
public virtual signal void receive_failed(Error err) {
public virtual signal void recv_closed() {
}
public virtual signal void receive_failure(Error err) {
}
public virtual signal void deserialize_failure() {
}
public ClientConnection(string host_specifier, uint16 default_port) {
@ -51,8 +50,6 @@ public class Geary.Imap.ClientConnection {
socket_client.set_tls(true);
socket_client.set_tls_validation_flags(TlsCertificateFlags.UNKNOWN_CA);
des.parameters_ready.connect(on_parameters_ready);
}
~ClientConnection() {
@ -79,10 +76,16 @@ public class Geary.Imap.ClientConnection {
throw new IOError.EXISTS("Already connected to %s", to_string());
cx = yield socket_client.connect_to_host_async(host_specifier, default_port, cancellable);
dins = new DataInputStream(cx.input_stream);
dins.set_newline_type(DataStreamNewlineType.CR_LF);
ser = new Serializer(new BufferedOutputStream(cx.output_stream));
des = new Deserializer(new BufferedInputStream(cx.input_stream));
des.parameters_ready.connect(on_parameters_ready);
des.receive_failure.connect(on_receive_failure);
des.deserialize_failure.connect(on_deserialize_failure);
des.eos.connect(on_eos);
connected();
des.xon();
}
public async void disconnect_async(Cancellable? cancellable = null)
@ -93,72 +96,12 @@ public class Geary.Imap.ClientConnection {
yield cx.close_async(Priority.DEFAULT, cancellable);
cx = null;
dins = null;
ser = null;
des = null;
disconnected();
}
public void xon(int priority = Priority.DEFAULT) throws Error {
check_for_connection();
if (!flow_controlled)
return;
flow_controlled = false;
ins_priority = priority;
next_deserialize_step();
flow_control(true);
}
private void next_deserialize_step() {
switch (des.get_mode()) {
case Deserializer.Mode.LINE:
dins.read_line_async.begin(ins_priority, ins_cancellable, on_read_line);
break;
case Deserializer.Mode.BLOCK:
long count = long.min(block_buffer.length, des.get_max_data_length());
dins.read_async.begin(block_buffer[0:count], ins_priority, ins_cancellable,
on_read_block);
break;
default:
error("Failed");
}
}
private void on_read_line(Object? source, AsyncResult result) {
try {
string line = dins.read_line_async.end(result);
des.push_line(line);
} catch (Error err) {
if (!(err is IOError.CANCELLED))
receive_failed(err);
return;
}
if (!flow_controlled)
next_deserialize_step();
}
private void on_read_block(Object? source, AsyncResult result) {
try {
ssize_t read = dins.read_async.end(result);
des.push_data(block_buffer[0:read]);
} catch (Error err) {
if (!(err is IOError.CANCELLED))
receive_failed(err);
return;
}
if (!flow_controlled)
next_deserialize_step();
}
private void on_parameters_ready(RootParameters root) {
try {
bool is_status_response;
@ -173,17 +116,16 @@ public class Geary.Imap.ClientConnection {
}
}
public void xoff() throws Error {
check_for_connection();
private void on_receive_failure(Error err) {
receive_failure(err);
}
if (flow_controlled)
return;
private void on_deserialize_failure() {
deserialize_failure();
}
// turn off the spigot
// TODO: Don't cancel the read, merely don't post the next window
flow_controlled = true;
ins_cancellable.cancel();
ins_cancellable = new Cancellable();
private void on_eos() {
recv_closed();
}
/**
@ -206,47 +148,16 @@ public class Geary.Imap.ClientConnection {
Cancellable? cancellable = null) throws Error {
check_for_connection();
Serializer ser = new Serializer();
cmd.serialize(ser);
assert(ser.has_content());
yield write_all_async(ser, priority, cancellable);
// TODO: At this point, we flush each command as it's written; at some point we'll have
// a queuing strategy that means serialized data is pushed out to the wire only at certain
// times
yield ser.flush_async(priority, cancellable);
sent_command(cmd);
}
public async void send_multiple_async(Gee.List<Command> cmds, int priority = Priority.DEFAULT,
Cancellable? cancellable = null) throws Error {
if (cmds.size == 0)
return;
check_for_connection();
Serializer ser = new Serializer();
foreach (Command cmd in cmds)
cmd.serialize(ser);
assert(ser.has_content());
yield write_all_async(ser, priority, cancellable);
// Variable named due to this bug: https://bugzilla.gnome.org/show_bug.cgi?id=596861
foreach (Command cmd2 in cmds)
sent_command(cmd2);
}
// Can't pass the raw buffer due to this bug: https://bugzilla.gnome.org/show_bug.cgi?id=639054
private async void write_all_async(Serializer ser, int priority, Cancellable? cancellable)
throws Error {
ssize_t index = 0;
size_t length = ser.get_content_length();
while (index < length) {
index += yield cx.output_stream.write_async(ser.get_content()[index:length],
priority, cancellable);
if (index < length)
debug("PARTIAL WRITE TO %s: %lu/%lu bytes", to_string(), index, length);
}
}
private void check_for_connection() throws Error {
if (cx == null)
throw new IOError.CLOSED("Not connected to %s", to_string());

View file

@ -45,13 +45,10 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
cx.received_status_response.connect(on_received_status_response);
cx.received_server_data.connect(on_received_server_data);
cx.received_bad_response.connect(on_received_bad_response);
cx.receive_failed.connect(on_receive_failed);
cx.receive_failure.connect(on_receive_failed);
yield cx.connect_async(cancellable);
// start receiving traffic from the server
cx.xon();
// wait for the initial OK response from the server
cb_queue.offer(new CommandCallback(connect_async.callback));
awaiting_connect_response = true;

View file

@ -4,8 +4,20 @@
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* The Deserializer performs asynchronous I/O on a supplied input stream and transforms the raw
* bytes into IMAP Parameters (which can then be converted into ServerResponses or ServerData).
* The Deserializer will only begin reading from the stream when xon() is called. Calling xoff()
* will flow control the stream, halting reading without closing the stream itself. Since all
* results from the Deserializer are reported via signals, those signals should be connected to
* prior to calling xon(), or the caller risks missing early messages. (Note that since
* Deserializer uses async I/O, this isn't technically possible unless the signals are connected
* after the Idle loop has a chance to run; however, this is an implementation detail and shouldn't
* be relied upon.)
*/
public class Geary.Imap.Deserializer {
public enum Mode {
private enum Mode {
LINE,
BLOCK,
FAILED
@ -67,18 +79,31 @@ public class Geary.Imap.Deserializer {
"Geary.Imap.Deserializer", State.TAG, State.COUNT, Event.COUNT,
state_to_string, event_to_string);
private DataInputStream dins;
private Geary.State.Machine fsm;
private ListParameter current;
private RootParameters root = new RootParameters();
private StringBuilder? current_string = null;
private LiteralParameter? current_literal = null;
private long literal_length_remaining = 0;
private uint8[] block_buffer = new uint8[4096];
private bool flow_controlled = true;
private int ins_priority = Priority.DEFAULT;
public signal void flow_control(bool xon);
public signal void parameters_ready(RootParameters root);
public signal void failed();
public signal void eos();
public signal void receive_failure(Error err);
public signal void deserialize_failure();
public Deserializer(InputStream ins) {
dins = new DataInputStream(ins);
dins.set_newline_type(DataStreamNewlineType.CR_LF);
public Deserializer() {
current = root;
Geary.State.Mapping[] mappings = {
@ -104,22 +129,100 @@ public class Geary.Imap.Deserializer {
fsm = new Geary.State.Machine(machine_desc, mappings, on_bad_transition);
}
public void xon(int priority = Priority.DEFAULT) {
if (!flow_controlled || get_mode() == Mode.FAILED)
return;
flow_controlled = false;
ins_priority = priority;
next_deserialize_step();
flow_control(true);
}
public void xoff() {
if (flow_controlled || get_mode() == Mode.FAILED)
return;
flow_controlled = true;
flow_control(false);
}
private void next_deserialize_step() {
switch (get_mode()) {
case Mode.LINE:
dins.read_line_async.begin(ins_priority, null, on_read_line);
break;
case Mode.BLOCK:
assert(literal_length_remaining > 0);
long count = long.min(block_buffer.length, literal_length_remaining);
dins.read_async.begin(block_buffer[0:count], ins_priority, null, on_read_block);
break;
default:
assert_not_reached();
}
}
private void on_read_line(Object? source, AsyncResult result) {
try {
string? line = dins.read_line_async.end(result);
if (line == null) {
eos();
return;
}
push_line(line);
} catch (Error err) {
receive_failure(err);
return;
}
if (!flow_controlled)
next_deserialize_step();
}
private void on_read_block(Object? source, AsyncResult result) {
try {
ssize_t read = dins.read_async.end(result);
if (read == 0) {
eos();
return;
}
push_data(block_buffer[0:read]);
} catch (Error err) {
receive_failure(err);
return;
}
if (!flow_controlled)
next_deserialize_step();
}
// Push a line (without the CRLF!).
public Mode push_line(string line) {
private Mode push_line(string line) {
assert(get_mode() == Mode.LINE);
int index = 0;
unichar ch;
while (line.get_next_char(ref index, out ch)) {
if (fsm.issue(Event.CHAR, &ch) == State.FAILED) {
failed();
deserialize_failure();
return Mode.FAILED;
}
}
if (fsm.issue(Event.EOL) == State.FAILED) {
failed();
deserialize_failure();
return Mode.FAILED;
}
@ -127,17 +230,13 @@ public class Geary.Imap.Deserializer {
return get_mode();
}
public long get_max_data_length() {
return literal_length_remaining;
}
// Push a block of literal data
public Mode push_data(uint8[] data) {
private Mode push_data(owned uint8[] data) {
assert(get_mode() == Mode.BLOCK);
LiteralData literal_data = LiteralData(data);
if (fsm.issue(Event.DATA, &literal_data) == State.FAILED) {
failed();
deserialize_failure();
return Mode.FAILED;
}
@ -145,7 +244,7 @@ public class Geary.Imap.Deserializer {
return get_mode();
}
public Mode get_mode() {
private Mode get_mode() {
switch (fsm.get_state()) {
case State.LITERAL_DATA:
return Mode.BLOCK;

View file

@ -5,7 +5,7 @@
*/
public abstract class Geary.Imap.Parameter : Object, Serializable {
public abstract void serialize(Serializer ser) throws Error;
public abstract async void serialize(Serializer ser) throws Error;
// to_string() returns a representation of the Parameter suitable for logging and debugging,
// but should not be relied upon for wire or persistent representation.
@ -31,7 +31,7 @@ public class Geary.Imap.StringParameter : Geary.Imap.Parameter {
return value;
}
public override void serialize(Serializer ser) throws Error {
public override async void serialize(Serializer ser) throws Error {
ser.push_string(value);
}
}
@ -61,10 +61,10 @@ public class Geary.Imap.LiteralParameter : Geary.Imap.Parameter {
return "{literal/%ldb}".printf(size);
}
public override void serialize(Serializer ser) throws Error {
public override async void serialize(Serializer ser) throws Error {
ser.push_string("{%ld}".printf(size));
ser.push_eol();
ser.push_input_stream_literal_data(mins);
yield ser.push_input_stream_literal_data_async(mins);
// seek to start
mins.seek(0, SeekType.SET);
@ -148,10 +148,10 @@ public class Geary.Imap.ListParameter : Geary.Imap.Parameter {
}
}
public override void serialize(Serializer ser) throws Error {
ser.push_string("(");
public override async void serialize(Serializer ser) throws Error {
ser.push_ascii('(');
serialize_list(ser);
ser.push_string(")");
ser.push_ascii(')');
}
}
@ -170,7 +170,7 @@ public class Geary.Imap.RootParameters : Geary.Imap.ListParameter {
return stringize_list();
}
public override void serialize(Serializer ser) throws Error {
public override async void serialize(Serializer ser) throws Error {
serialize_list(ser);
ser.push_eol();
}

View file

@ -13,10 +13,10 @@ public class Geary.Imap.ResponseCode : Geary.Imap.ListParameter {
return "[%s]".printf(stringize_list());
}
public override void serialize(Serializer ser) throws Error {
ser.push_string("[");
public override async void serialize(Serializer ser) throws Error {
ser.push_ascii('[');
serialize_list(ser);
ser.push_string("]");
ser.push_ascii(']');
}
}

View file

@ -5,6 +5,6 @@
*/
public interface Geary.Imap.Serializable {
public abstract void serialize(Serializer ser) throws Error;
public abstract async void serialize(Serializer ser) throws Error;
}

View file

@ -4,25 +4,34 @@
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* The Serializer asynchronously writes serialized IMAP commands to the supplied output stream.
* Since most IMAP commands are small in size (one line of data, often under 64 bytes), the
* Serializer writes them to a temporary buffer, only writing to the actual stream when literal data
* is written (which can often be large and coming off of disk) or commit_async() is called, which
* should be invoked when convenient, to prevent the buffer from growing too large.
*
* Because of this situation, the serialized commands will not necessarily reach the output stream
* unless commit_async() is called, which pushes the in-memory bytes to it. Since the
* output stream itself may be buffered, flush_async() should be called to verify the bytes have
* reached the wire.
*
* flush_async() implies commit_async(), but the reverse is not true.
*/
public class Geary.Imap.Serializer {
private OutputStream outs;
private MemoryOutputStream mouts;
private DataOutputStream douts;
public Serializer() {
public Serializer(OutputStream outs) {
this.outs = outs;
mouts = new MemoryOutputStream(null, realloc, free);
douts = new DataOutputStream(mouts);
}
public unowned uint8[] get_content() {
return mouts.get_data();
}
public size_t get_content_length() {
return mouts.get_data_size();
}
public bool has_content() {
return get_content_length() > 0;
public void push_ascii(char ch) throws Error {
douts.put_byte(ch, null);
}
public void push_string(string str) throws Error {
@ -37,14 +46,40 @@ public class Geary.Imap.Serializer {
douts.put_string("\r\n", null);
}
public void push_literal_data(uint8[] data) throws Error {
size_t written;
douts.write_all(data, out written);
assert(written == data.length);
public async void push_input_stream_literal_data_async(InputStream ins,
int priority = Priority.DEFAULT, Cancellable? cancellable = null) throws Error {
// commit the in-memory buffer to the output stream
yield commit_async(priority, cancellable);
// splice the literal data directly to the output stream
yield outs.splice_async(ins, OutputStreamSpliceFlags.NONE, priority, cancellable);
}
public void push_input_stream_literal_data(InputStream ins) throws Error {
douts.splice(ins, OutputStreamSpliceFlags.NONE);
// commit_async() takes the stored (in-memory) serialized data and writes it asynchronously
// to the wrapped OutputStream. Note that this is *not* a flush, as it's possible the
// serialized data will be stored in a buffer in the OutputStream. Use flush_async() to force
// data onto the wire.
public async void commit_async(int priority = Priority.DEFAULT, Cancellable? cancellable = null)
throws Error {
size_t length = mouts.get_data_size();
if (length == 0)
return;
ssize_t index = 0;
do {
index += yield outs.write_async(mouts.get_data()[index:length], priority, cancellable);
} while (index < length);
mouts = new MemoryOutputStream(null, realloc, free);
douts = new DataOutputStream(mouts);
}
// This pushes all serialized data onto the wire. This calls commit_async() before
// flushing.
public async void flush_async(int priority = Priority.DEFAULT, Cancellable? cancellable = null)
throws Error {
yield commit_async(priority, cancellable);
yield outs.flush_async(priority, cancellable);
}
}

View file

@ -51,7 +51,7 @@ public class Geary.State.Machine {
this.logging = logging;
}
public bool get_logging() {
public bool is_logging() {
return logging;
}
@ -61,7 +61,7 @@ public class Geary.State.Machine {
unowned Mapping? mapping = transitions[state, event];
Transition transition = (mapping != null) ? mapping.transition : default_transition;
Transition? transition = (mapping != null) ? mapping.transition : default_transition;
if (transition == null) {
string msg = "%s: No transition defined at %s for %s".printf(to_string(),
descriptor.get_state_string(state), descriptor.get_event_string(event));
@ -86,7 +86,7 @@ public class Geary.State.Machine {
assert(locked);
locked = false;
if (get_logging()) {
if (is_logging()) {
message("%s: State transition from %s to %s due to event %s", to_string(),
descriptor.get_state_string(old_state), descriptor.get_state_string(state),
descriptor.get_event_string(event));

View file

@ -4,6 +4,8 @@
* (version 2.1 or later). See the COPYING file in this distribution.
*/
MainLoop? main_loop = null;
void print(int depth, Gee.List<Geary.Imap.Parameter> params) {
string pad = string.nfill(depth * 4, ' ');
@ -24,6 +26,10 @@ void on_params_ready(Geary.Imap.RootParameters root) {
print(0, root.get_all());
}
void on_eos() {
main_loop.quit();
}
int main(string[] args) {
if (args.length < 2) {
stderr.printf("usage: syntax <imap command>\n");
@ -31,8 +37,7 @@ int main(string[] args) {
return 1;
}
Geary.Imap.Deserializer des = new Geary.Imap.Deserializer();
des.parameters_ready.connect(on_params_ready);
main_loop = new MainLoop();
// turn argument into single line for deserializer
string line = "";
@ -41,10 +46,19 @@ int main(string[] args) {
if (ctr < (args.length - 1))
line += " ";
}
line += "\r\n";
MemoryInputStream mins = new MemoryInputStream();
mins.add_data(line.data, null);
Geary.Imap.Deserializer des = new Geary.Imap.Deserializer(mins);
des.parameters_ready.connect(on_params_ready);
des.eos.connect(on_eos);
stdout.printf("INPUT: >%s<\n", line);
Geary.Imap.Deserializer.Mode mode = des.push_line(line);
stdout.printf("INPUT MODE: %s\n", mode.to_string());
des.xon();
main_loop.run();
return 0;
}