Don't send commands while another has an active command continuation.

Issue #26 underscored Geary's buggy behaviour in sending commands while
an IDLE command was active - when a new command was queued, DONE would be
sent but then the new command would also be sent without waiting for the
status response for the IDLE command from the server, in violation of
RFC 3501 sections 2.2.1 and 5.5.

A quick fix was to ensure that the new command was held while DEIDLING,
but because of the way the synchronization process between
ClientConnection and the Serialiser works, there is a race that allows
any further commands added to be sent while the IDLE DONE had not yet
been flushed: DONE gets queued, additional command gets queued, then
both get flushed.

Fixing this required re-working how ClientConnection and Serializer
interacted, which was a major re-write. Serializer no longer buffers
any data - it simply writes it out to its underlying stream.
ClientConnection now queues commands and writes them out one by one,
requiring Command implementations to manage stalling the queue until
their expected completions have been received. Literals are hence
written by their command's serialisation process, which gets prompted
by an event from ClientConnection. The two commands that require
non-literal continuations, IDLE and AUTHENTICATE, now manage that
themselves.

As a result, only one command can be serialised at a time, and no
additional commands can be serialised while another is still waiting for
a continuation. Also, command-specific details can be implemented by
command itself, without needing workarounds in the serialiser,
connection, or session.
This commit is contained in:
Michael James Gratton 2018-07-10 16:52:12 +10:00
parent ab9d0c27ad
commit b74190e010
7 changed files with 779 additions and 1239 deletions

View file

@ -21,10 +21,16 @@ public class Geary.Imap.AuthenticateCommand : Command {
public string method { get; private set; }
private LiteralParameter? response_literal = null;
private bool serialised = false;
private Geary.Nonblocking.Spinlock error_lock;
private GLib.Cancellable error_cancellable = new GLib.Cancellable();
private AuthenticateCommand(string method, string data) {
base(NAME, { method, data });
this.method = method;
this.error_lock = new Geary.Nonblocking.Spinlock(this.error_cancellable);
}
public AuthenticateCommand.oauth2(string user, string token) {
@ -34,19 +40,65 @@ public class Geary.Imap.AuthenticateCommand : Command {
this(OAUTH2_METHOD, encoded_token);
}
public ContinuationParameter
continuation_requested(ContinuationResponse response)
throws ImapError {
if (this.method != AuthenticateCommand.OAUTH2_METHOD) {
throw new ImapError.INVALID("Unexpected continuation request");
/** Waits after serialisation has completed for authentication. */
public override async void serialize(Serializer ser,
GLib.Cancellable cancellable)
throws GLib.Error {
yield base.serialize(ser, cancellable);
this.serialised = true;
// Need to manually flush here since the connection will be
// waiting this to complete before do so.
yield ser.flush_stream(cancellable);
// Wait to either get a response or a continuation request
yield this.error_lock.wait_async(cancellable);
if (this.response_literal != null) {
yield this.response_literal.serialize_data(ser, cancellable);
ser.push_eol(cancellable);
yield ser.flush_stream(cancellable);
}
// Continuation will be a Base64 encoded JSON blob and which
// indicates a login failure. We don't really care about that
// (do we?) though since once we acknowledge it with a
// zero-length response the server will respond with an IMAP
// error.
return new ContinuationParameter(new uint8[0]);
yield wait_until_complete(cancellable);
}
public override void cancel_serialization() {
base.cancel_serialization();
this.error_cancellable.cancel();
}
public override void completed(StatusResponse new_status)
throws ImapError {
this.error_lock.blind_notify();
base.completed(new_status);
}
public override void continuation_requested(ContinuationResponse response)
throws ImapError {
if (!this.serialised) {
// Allow any args sent as literals to be processed
// normally
base.continuation_requested(response);
} else {
if (this.method != AuthenticateCommand.OAUTH2_METHOD ||
this.response_literal != null) {
cancel_serialization();
throw new ImapError.INVALID(
"Unexpected AUTHENTICATE continuation request"
);
}
// Continuation will be a Base64 encoded JSON blob and which
// indicates a login failure. We don't really care about that
// (do we?) though since once we acknowledge it with a
// zero-length response the server will respond with an IMAP
// error.
this.response_literal = new LiteralParameter(
Geary.Memory.EmptyBuffer.instance
);
// Notify serialisation to continue
this.error_lock.blind_notify();
}
}
public override string to_string() {

View file

@ -45,6 +45,17 @@ public class Geary.Imap.Command : BaseObject {
protected ListParameter args {
get; private set; default = new RootParameters();
}
/** The status response for the command, once it has been received. */
public StatusResponse? status { get; private set; default = null; }
private Geary.Nonblocking.Semaphore complete_lock =
new Geary.Nonblocking.Semaphore();
private Geary.Nonblocking.Spinlock? literal_spinlock = null;
private GLib.Cancellable? literal_cancellable = null;
/**
* Constructs a new command with an unassigned tag.
*
@ -62,52 +73,208 @@ public class Geary.Imap.Command : BaseObject {
}
}
}
/**
*
*/
}
private void stock_params() {
add(tag);
add(new AtomParameter(name));
if (args != null) {
foreach (string arg in args)
add(Parameter.get_for_string(arg));
}
}
/**
* Assign a {@link Tag} to a {@link Command} with an unassigned placeholder Tag.
*
* Can only be called on a Command that holds an unassigned Tag. Thus, this can only be called
* once at most, and zero times if Command.assigned() was used to generate the Command.
* Fires an assertion if either of these cases is true, or if the supplied Tag is unassigned.
*/
public void assign_tag(Tag tag) {
assert(!this.tag.is_assigned());
assert(tag.is_assigned());
this.tag = tag;
// Tag is always at index zero.
try {
Parameter param = replace(0, tag);
assert(param is Tag);
} catch (ImapError err) {
error("Unable to assign Tag for command %s: %s", to_string(), err.message);
}
}
public bool has_name(string name) {
return Ascii.stri_equal(this.name, name);
}
public override void serialize(Serializer ser, Tag tag) throws Error {
assert(tag.is_assigned());
base.serialize(ser, tag);
ser.push_end_of_message();
/**
* Assign a Tag to this command, if currently unassigned.
*
* Can only be called on a Command that holds an unassigned Tag.
* Thus, this can only be called once at most, and zero times if
* Command.assigned() was used to generate the Command. Fires an
* assertion if either of these cases is true, or if the supplied
* Tag is unassigned.
*/
public void assign_tag(Tag new_tag) throws ImapError {
if (this.tag.is_assigned()) {
throw new ImapError.SERVER_ERROR(
"%s: Command tag is already assigned", to_brief_string()
);
}
if (!new_tag.is_assigned()) {
throw new ImapError.SERVER_ERROR(
"%s: New tag is not assigned", to_brief_string()
);
}
this.tag = new_tag;
}
/**
* Serialises this command for transmission to the server.
*
* This will serialise its tag, name and arguments (if
* any). Arguments are treated as strings and escaped as needed,
* including being encoded as a literal. If any literals are
* required, this method will yield until a command continuation
* has been received, when it will resume the same process.
*/
public virtual async void serialize(Serializer ser,
GLib.Cancellable cancellable)
throws GLib.Error {
this.tag.serialize(ser, cancellable);
ser.push_space(cancellable);
ser.push_unquoted_string(this.name, cancellable);
if (this.args != null) {
foreach (Parameter arg in this.args.get_all()) {
ser.push_space(cancellable);
arg.serialize(ser, cancellable);
LiteralParameter literal = arg as LiteralParameter;
if (literal != null) {
// Need to manually flush after serialising the
// literal param, so it actually gets to the
// server
yield ser.flush_stream(cancellable);
if (this.literal_spinlock == null) {
// Lazily create these since they usually
// won't be needed
this.literal_cancellable = new GLib.Cancellable();
this.literal_spinlock = new Geary.Nonblocking.Spinlock(
this.literal_cancellable
);
}
// Will get notified via continuation_requested
// when server indicated the literal can be sent.
yield this.literal_spinlock.wait_async(cancellable);
yield literal.serialize_data(ser, cancellable);
}
}
}
ser.push_eol(cancellable);
}
/**
* Cancels any existing serialisation in progress.
*
* When this method is called, any non I/O related process
* blocking the blocking {@link serialize} must be cancelled.
*/
public virtual void cancel_serialization() {
if (this.literal_cancellable != null) {
this.literal_cancellable.cancel();
}
}
/**
* Yields until the command has been completed or cancelled.
*
* Throws an error if cancelled, or if the command's response was
* bad.
*/
public async void wait_until_complete(GLib.Cancellable cancellable)
throws GLib.Error {
yield this.complete_lock.wait_async(cancellable);
check_status();
}
/**
* Called when a tagged status response is received for this command.
*
* This will update the command's {@link status} property, then
* throw an error it does not indicate a successful completion.
*/
public virtual void completed(StatusResponse new_status)
throws ImapError {
if (this.status != null) {
cancel_serialization();
throw new ImapError.SERVER_ERROR(
"%s: Duplicate status response received: %s",
to_brief_string(),
status.to_string()
);
}
this.status = new_status;
this.complete_lock.blind_notify();
cancel_serialization();
check_status();
}
/**
* Called when tagged server data is received for this command.
*/
public virtual void data_received(ServerData data)
throws ImapError {
if (this.status != null) {
cancel_serialization();
throw new ImapError.SERVER_ERROR(
"%s: Server data received when command already complete: %s",
to_brief_string(),
data.to_string()
);
}
// Nothing to do otherwise
}
/**
* Called when a continuation was requested by the server.
*
* This will notify the command's literal spinlock so that if
* {@link serialize} is waiting to send a literal, it will do so
* now.
*/
public virtual void
continuation_requested(ContinuationResponse continuation)
throws ImapError {
if (this.status != null) {
cancel_serialization();
throw new ImapError.SERVER_ERROR(
"%s: Continuation requested when command already complete",
to_brief_string()
);
}
if (this.literal_spinlock == null) {
cancel_serialization();
throw new ImapError.SERVER_ERROR(
"%s: Continuation requested but no literals available",
to_brief_string()
);
}
this.literal_spinlock.blind_notify();
}
public virtual string to_string() {
string args = this.args.to_string();
return (Geary.String.is_empty(args))
? "%s %s".printf(this.tag.to_string(), this.name)
: "%s %s %s".printf(this.tag.to_string(), this.name, args);
}
private void check_status() throws ImapError {
if (this.status == null) {
throw new ImapError.SERVER_ERROR(
"%s: No command response was received",
to_brief_string()
);
}
if (!this.status.is_completion) {
throw new ImapError.SERVER_ERROR(
"%s: Command status response is not a completion: %s",
to_brief_string(),
this.status.to_string()
);
}
if (this.status.status != Status.OK) {
throw new ImapError.SERVER_ERROR(
"%s: Command failed: %s",
to_brief_string(),
this.status.to_string()
);
}
}
private string to_brief_string() {
return "%s %s".printf(this.tag.to_string(), this.name);
}
}

View file

@ -11,10 +11,71 @@
*/
public class Geary.Imap.IdleCommand : Command {
public const string NAME = "idle";
public IdleCommand() {
base (NAME);
}
}
public const string NAME = "IDLE";
private const string DONE = "DONE";
/** Determines if the server has acknowledged the IDLE request. */
public bool idle_started { get; private set; default = false; }
private bool serialised = false;
private Geary.Nonblocking.Spinlock? exit_lock;
private GLib.Cancellable? exit_cancellable = new GLib.Cancellable();
public IdleCommand() {
base(NAME);
this.exit_lock = new Geary.Nonblocking.Spinlock(this.exit_cancellable);
}
public void exit_idle() {
this.exit_lock.blind_notify();
}
/** Waits after serialisation has completed for {@link exit_idle}. */
public override async void serialize(Serializer ser,
GLib.Cancellable cancellable)
throws GLib.Error {
// Need to manually flush here since Dovecot doesn't like
// getting IDLE in the same buffer as other commands.
yield ser.flush_stream(cancellable);
yield base.serialize(ser, cancellable);
this.serialised = true;
// Need to manually flush again since the connection will be
// waiting this to complete before do so.
yield ser.flush_stream(cancellable);
// Now wait for exit_idle() to be called, the server to send a
// status response, or everything to be cancelled.
yield this.exit_lock.wait_async(cancellable);
// If we aren't closed already, try sending DONE to exit IDLE
if (this.status == null) {
ser.push_unquoted_string(DONE);
ser.push_eol(cancellable);
yield ser.flush_stream(cancellable);
}
yield wait_until_complete(cancellable);
}
public override void cancel_serialization() {
base.cancel_serialization();
this.exit_cancellable.cancel();
}
public override void continuation_requested(ContinuationResponse response)
throws ImapError {
if (!this.serialised) {
// Allow any args sent as literals to be processed
// normally
base.continuation_requested(response);
} else {
this.idle_started = true;
}
}
}

View file

@ -64,4 +64,13 @@ public class Geary.Imap.LiteralParameter : Geary.Imap.Parameter {
ser.push_eol(cancellable);
}
/**
* Serialises the literal parameter data.
*/
public async void serialize_data(Serializer ser,
GLib.Cancellable cancellable)
throws GLib.Error {
yield ser.push_literal_data(buffer, cancellable);
}
}

File diff suppressed because it is too large Load diff

View file

@ -277,10 +277,6 @@ public class Geary.Imap.ClientSession : BaseObject {
private uint unselected_keepalive_secs = 0;
private uint selected_with_idle_keepalive_secs = 0;
private Gee.HashMap<Tag, StatusResponse> seen_completion_responses = new Gee.HashMap<
Tag, StatusResponse>();
private Gee.HashMap<Tag, CommandCallback> waiting_for_completion = new Gee.HashMap<
Tag, CommandCallback>();
private Command? state_change_cmd = null;
private Nonblocking.Semaphore? connect_waiter = null;
private Error? connect_err = null;
@ -783,17 +779,8 @@ public class Geary.Imap.ClientSession : BaseObject {
cx = null;
}
// if there are any outstanding commands waiting for responses, wake them up now
if (waiting_for_completion.size > 0) {
debug("[%s] Cancelling %d pending commands", to_string(), waiting_for_completion.size);
foreach (CommandCallback cmd_cb in waiting_for_completion.values)
Scheduler.on_idle(cmd_cb.callback);
waiting_for_completion.clear();
}
}
private uint on_connected(uint state, uint event) {
debug("[%s] Connected", to_string());
@ -966,21 +953,6 @@ public class Geary.Imap.ClientSession : BaseObject {
// either way, new capabilities should be available
caps = capabilities;
// Attempt compression (usually only available after authentication)
if (caps.has_setting(Capabilities.COMPRESS, Capabilities.DEFLATE_SETTING)) {
StatusResponse resp = yield send_command_async(
new CompressCommand(CompressCommand.ALGORITHM_DEFLATE));
if (resp.status == Status.OK) {
install_send_converter(new ZlibCompressor(ZlibCompressorFormat.RAW));
install_recv_converter(new ZlibDecompressor(ZlibCompressorFormat.RAW));
debug("[%s] Compression started", to_string());
} else {
debug("[%s] Unable to start compression: %s", to_string(), resp.to_string());
}
} else {
debug("[%s] No compression available", to_string());
}
Gee.List<ServerData> server_data = new Gee.ArrayList<ServerData>();
ulong data_id = this.server_data_received.connect((data) => { server_data.add(data); });
@ -1256,18 +1228,6 @@ public class Geary.Imap.ClientSession : BaseObject {
debug("[%s] Keepalive error: %s", to_string(), err.message);
}
}
//
// Converters
//
public bool install_send_converter(Converter converter) {
return (cx != null) ? cx.install_send_converter(converter) : false;
}
public bool install_recv_converter(Converter converter) {
return (cx != null) ? cx.install_recv_converter(converter) : false;
}
//
// send commands
@ -1750,38 +1710,17 @@ public class Geary.Imap.ClientSession : BaseObject {
//
// command submission
//
private async StatusResponse command_transaction_async(Command cmd, Cancellable? cancellable)
throws Error {
if (cx == null)
if (this.cx == null)
throw new ImapError.NOT_CONNECTED("Not connected to %s", imap_endpoint.to_string());
yield cx.send_async(cmd, cancellable);
// send_async() should've tagged the Command, otherwise the completion_pending will fail
assert(cmd.tag.is_tagged());
// If the command didn't complete (i.e. a CompletionStatusResponse didn't return from the
// server) in the context of send_async(), wait for it now
if (!seen_completion_responses.has_key(cmd.tag)) {
waiting_for_completion.set(cmd.tag, new CommandCallback(command_transaction_async.callback));
yield;
}
// it should be seen now; if not, it's because of disconnection cancelling all the outstanding
// requests
StatusResponse? completion_response;
if (!seen_completion_responses.unset(cmd.tag, out completion_response)) {
assert(cx == null);
throw new ImapError.NOT_CONNECTED("Not connected to %s", imap_endpoint.to_string());
}
assert(completion_response != null);
return completion_response;
this.cx.send_command(cmd);
yield cmd.wait_until_complete(cancellable);
return cmd.status;
}
//
// network connection event handlers
//
@ -1835,24 +1774,14 @@ public class Geary.Imap.ClientSession : BaseObject {
err.message);
}
}
// update state machine before notifying subscribers, who may turn around and query ClientSession
if (status_response.is_completion) {
fsm.issue(Event.RECV_COMPLETION, null, status_response, null);
// Note that this signal could be called in the context of cx.send_async() that sent
// this command to the server ... this mechanism (seen_completion_response and
// waiting_for_completion) assures that in either case issue_command_async() returns
// when the command is completed
seen_completion_responses.set(status_response.tag, status_response);
CommandCallback? cmd_cb;
if (waiting_for_completion.unset(status_response.tag, out cmd_cb))
Scheduler.on_idle(cmd_cb.callback);
} else {
fsm.issue(Event.RECV_STATUS, null, status_response, null);
}
status_response_received(status_response);
}

View file

@ -5,232 +5,99 @@
*/
/**
* Serializer asynchronously writes serialized IMAP commands to the supplied output stream via a
* queue of buffers.
* Writes serialized IMAP commands to a supplied output stream.
*
* Since most IMAP commands are small in size (one line of data, often under 64 bytes), the
* Serializer writes them to a queue of temporary buffers (interspersed with user-supplied buffers
* that are intended to be literal data). The data is only written when {@link flush_async} is
* invoked.
*
* This means that if the caller wants some buffer beyond the steps described above, they should
* pass in a BufferedOutputStream (or one of its subclasses). flush_async() will flush the user's
* OutputStream after writing to it.
*
* Command continuation requires some synchronization between the Serializer and the
* {@link Deserializer}. It also requires some queue management. See {@link fast_forward_queue}
* and {@link next_synchronized_message}.
* Command continuation requires some synchronization between the
* Serializer and the {@link Deserializer}. It also requires some
* queue management. See {@link push_quoted_string} and {@link
* next_synchronized_message}.
*
* @see Deserializer
*/
public class Geary.Imap.Serializer : BaseObject {
private class SerializedData {
public Memory.Buffer buffer;
public Tag? literal_data_tag;
public SerializedData(Memory.Buffer buffer, Tag? literal_data_tag) {
this.buffer = buffer;
this.literal_data_tag = literal_data_tag;
}
}
private string identifier;
private OutputStream outs;
private ConverterOutputStream couts;
private MemoryOutputStream mouts;
private DataOutputStream douts;
private Geary.Stream.MidstreamConverter midstream = new Geary.Stream.MidstreamConverter("Serializer");
private Gee.Queue<SerializedData?> datastream = new Gee.LinkedList<SerializedData?>();
public Serializer(string identifier, OutputStream outs) {
private DataOutputStream output;
public Serializer(string identifier, OutputStream output) {
this.identifier = identifier;
this.outs = outs;
// prepare the ConverterOutputStream (which wraps the caller's OutputStream and allows for
// midstream conversion)
couts = new ConverterOutputStream(outs, midstream);
couts.set_close_base_stream(false);
// prepare the DataOutputStream (which generates buffers for the queue)
mouts = new MemoryOutputStream(null, realloc, free);
douts = new DataOutputStream(mouts);
douts.set_close_base_stream(false);
this.output = new DataOutputStream(output);
this.output.set_close_base_stream(false);
}
public bool install_converter(Converter converter) {
return midstream.install(converter);
}
public void push_ascii(char ch) throws Error {
douts.put_byte(ch, null);
}
/**
* Pushes the string to the IMAP server with quoting applied whether required or not. Returns
* true if quoting was required.
* Pushes the string to the IMAP server with quoting.
*
* This is applied whether required or not. Returns true if
* quoting was required.
*/
public bool push_quoted_string(string str) throws Error {
public bool push_quoted_string(string str,
GLib.Cancellable? cancellable = null)
throws GLib.Error {
string quoted;
DataFormat.Quoting requirement = DataFormat.convert_to_quoted(str, out quoted);
douts.put_string(quoted);
this.output.put_string(quoted, cancellable);
return (requirement == DataFormat.Quoting.REQUIRED);
}
/**
* This will push the string to IMAP as-is. Use only if you absolutely know what you're doing.
* This will push the string to IMAP as-is.
*
* Use only if you absolutely know what you're doing.
*/
public void push_unquoted_string(string str) throws Error {
douts.put_string(str);
public void push_unquoted_string(string str,
GLib.Cancellable? cancellable = null)
throws GLib.Error {
this.output.put_string(str, cancellable);
}
public void push_space() throws Error {
douts.put_byte(' ', null);
public void push_ascii(char ch, GLib.Cancellable? cancellable = null)
throws GLib.Error {
this.output.put_byte(ch, cancellable);
}
public void push_nil() throws Error {
douts.put_string(NilParameter.VALUE, null);
public void push_space(GLib.Cancellable? cancellable = null)
throws GLib.Error {
this.output.put_byte(' ', cancellable);
}
public void push_eol() throws Error {
douts.put_string("\r\n", null);
public void push_nil(GLib.Cancellable? cancellable = null)
throws GLib.Error {
this.output.put_string(NilParameter.VALUE, cancellable);
}
private void enqueue_current_stream() throws IOError {
size_t length = mouts.get_data_size();
if (length <= 0)
return;
// close before converting to Memory.ByteBuffer
mouts.close();
SerializedData data = new SerializedData(
new Memory.ByteBuffer.from_memory_output_stream(mouts), null);
datastream.add(data);
mouts = new MemoryOutputStream(null, realloc, free);
douts = new DataOutputStream(mouts);
douts.set_close_base_stream(false);
public void push_eol(GLib.Cancellable? cancellable = null)
throws GLib.Error {
this.output.put_string("\r\n", cancellable);
}
/*
* Pushes an {link Memory.Buffer} to the serialized stream that must be synchronized
* with the server before transmission.
*
* Literal data may require synchronization with the server and so should only be used when
* necessary. See {link DataFormat.is_quoting_required} to test data.
*
* The supplied buffer must not be mutated once submitted to the {@link Serializer}.
*
* See [[http://tools.ietf.org/html/rfc3501#section-4.3]] and
* [[http://tools.ietf.org/html/rfc3501#section-7.5]]
*/
public void push_synchronized_literal_data(Tag tag, Memory.Buffer buffer) throws Error {
enqueue_current_stream();
datastream.add(new SerializedData(buffer, tag));
}
/**
* Indicates that a complete message has been pushed to the {@link Serializer}.
*
* It's important to delineate messages for the Serializer, as it aids in queue management
* and command continuation (synchronization).
* Pushes literal data to the output stream.
*/
public void push_end_of_message() throws Error {
enqueue_current_stream();
datastream.add(null);
public async void push_literal_data(Memory.Buffer buffer,
GLib.Cancellable? cancellable = null)
throws GLib.Error {
yield this.output.splice_async(
buffer.get_input_stream(),
OutputStreamSpliceFlags.NONE,
Priority.DEFAULT,
cancellable
);
}
/**
* Returns the {@link Tag} for the message with the next synchronization message Tag.
*
* This can be used to prepare for receiving a command continuation failure before sending
* the request via {@link flush_async}, as the response could return before that call completes.
* Flushes the output stream, ensuring a command has been sent.
*/
public Tag? next_synchronized_message() {
foreach (SerializedData? data in datastream) {
if (data != null && data.literal_data_tag != null)
return data.literal_data_tag;
}
return null;
public async void flush_stream(GLib.Cancellable? cancellable = null)
throws GLib.Error {
yield this.output.flush_async(Priority.DEFAULT, cancellable);
}
/**
* Discards all buffers associated with the current message and moves the queue forward to the
* next one.
*
* This is useful when a command continuation is refused by the server and the command must be
* aborted.
*
* Any data currently in the buffer is *not* enqueued, as by definition it has not been marked
* with {@link push_end_of_message}.
*/
public void fast_forward_queue() {
while (!datastream.is_empty) {
if (datastream.poll() == null)
break;
}
}
/**
* Push all serialized data and buffers onto the wire.
*
* Caller should pass is_synchronized=true if the connection has been synchronized for a command
* continuation.
*
* If synchronize_tag returns non-null, then the flush has not completed. The connection must
* wait for the server to send a continuation response before continuing. When ready, call
* flush_async() again with is_synchronized set to true. The tag is supplied to watch for
* an error condition from the server (which may reject the synchronization request).
*/
public async void flush_async(bool is_synchronized, out Tag? synchronize_tag,
Cancellable? cancellable = null) throws Error {
synchronize_tag = null;
// commit the last buffer to the queue (although this is best done with push_end_message)
enqueue_current_stream();
// walk the SerializedData queue, pushing each out to the wire unless a synchronization
// point is encountered
while (!datastream.is_empty) {
// see if next data buffer is synchronized
SerializedData? data = datastream.peek();
if (data != null && data.literal_data_tag != null && !is_synchronized) {
// report the Tag that is associated with the continuation
synchronize_tag = data.literal_data_tag;
// break out to ensure pipe is flushed
break;
}
// if not, remove and process
data = datastream.poll();
if (data == null) {
// end of message, move on
continue;
}
Logging.debug(Logging.Flag.SERIALIZER, "[%s] %s", to_string(), data.buffer.to_string());
// splice buffer's InputStream directly into OutputStream
yield couts.splice_async(data.buffer.get_input_stream(), OutputStreamSpliceFlags.NONE,
Priority.DEFAULT, cancellable);
// if synchronized before, not any more
is_synchronized = false;
}
// make sure everything is flushed out now ... some trouble with BufferedOutputStreams
// here, so flush ConverterOutputStream and its base stream
yield couts.flush_async(Priority.DEFAULT, cancellable);
yield couts.base_stream.flush_async(Priority.DEFAULT, cancellable);
}
public string to_string() {
return "ser:%s".printf(identifier);
}
}
}