Create ClientSessionManager.

The ClientSessionManager maintains a pool of connections to an IMAP server, reusing them whenever possible.  It allows for multiple folders to be managed and monitored by an application without having to change contexts constantly (which also introduces side-effects, esp. with expunging deleted messages).
This commit is contained in:
Jim Nelson 2011-05-30 18:18:52 -07:00
parent aa094cb813
commit 737f235d60
12 changed files with 352 additions and 88 deletions

View file

@ -18,6 +18,7 @@ ENGINE_SRC := \
src/engine/common/MessageData.vala \
src/engine/imap/ClientConnection.vala \
src/engine/imap/ClientSession.vala \
src/engine/imap/ClientSessionManager.vala \
src/engine/imap/DataFormat.vala \
src/engine/imap/Mailbox.vala \
src/engine/imap/Parameter.vala \
@ -45,7 +46,8 @@ ENGINE_SRC := \
src/engine/rfc822/MailboxAddress.vala \
src/engine/rfc822/MessageData.vala \
src/engine/util/String.vala \
src/engine/util/Memory.vala
src/engine/util/Memory.vala \
src/engine/util/Delegate.vala
CLIENT_SRC := \
src/client/main.vala \

View file

@ -41,6 +41,11 @@ public class FolderListStore : Gtk.TreeStore {
set(iter, Column.NAME, folder);
}
public void add_folders(Gee.Collection<string> folders) {
foreach (string folder in folders)
add_folder(folder);
}
public string? get_folder_at(Gtk.TreePath path) {
Gtk.TreeIter iter;
if (!get_iter(out iter, path))

View file

@ -66,8 +66,7 @@ public class MainWindow : Gtk.Window {
Gee.Collection<string>? folders = yield account.list("/");
if (folders != null) {
debug("%d folders found", folders.size);
foreach (string folder in folders)
folder_list_store.add_folder(folder);
folder_list_store.add_folders(folders);
} else {
debug("no folders");
}
@ -164,17 +163,11 @@ public class MainWindow : Gtk.Window {
Geary.Folder folder = yield account.open(folder_name);
Geary.MessageStream? msg_stream = folder.read(1, 100);
if (msg_stream == null)
error("Unable to read from folder");
Gee.List<Geary.Message>? msgs = yield msg_stream.read();
Gee.List<Geary.Message>? msgs = yield folder.read(1, 100);
if (msgs != null && msgs.size > 0) {
foreach (Geary.Message msg in msgs)
message_list_store.append_message(msg);
}
yield folder.close();
}
private void on_select_folder_completed(Object? source, AsyncResult result) {

View file

@ -106,7 +106,7 @@ class ImapConsole : Gtk.Window {
string[] args = new string[0];
for (int ctr = 1; ctr < tokens.length; ctr++) {
string arg = tokens[ctr].strip();
if (!String.is_empty(arg))
if (!Geary.String.is_empty(arg))
args += arg;
}

View file

@ -6,11 +6,8 @@
public class Geary.Engine : Object {
public static async Account? login(string server, string user, string pass) throws Error {
Imap.ClientSession account = new Imap.ClientSession(server, Imap.ClientConnection.DEFAULT_PORT_TLS);
yield account.connect_async();
yield account.login_async(user, pass);
return account;
return new Imap.ClientSessionManager(server, Imap.ClientConnection.DEFAULT_PORT_TLS, user,
pass);
}
}

View file

@ -12,12 +12,15 @@ public interface Geary.Account : Object {
}
public interface Geary.Folder : Object {
public abstract MessageStream? read(int low, int count);
public enum CloseReason {
LOCAL_CLOSE,
REMOTE_CLOSE,
FOLDER_CLOSED
}
public abstract async void close(Cancellable? cancellable = null) throws Error;
}
public interface Geary.MessageStream : Object {
public abstract async Gee.List<Message>? read(Cancellable? cancellable = null) throws Error;
public signal void closed(CloseReason reason);
public abstract async Gee.List<Message>? read(int low, int count, Cancellable? cancellable = null)
throws Error;
}

View file

@ -4,11 +4,27 @@
* (version 2.1 or later). See the COPYING file in this distribution.
*/
public class Geary.Imap.ClientSession : Object, Geary.Account {
public class Geary.Imap.ClientSession {
// 30 min keepalive required to maintain session; back off by 30 sec for breathing room
public const int MIN_KEEPALIVE_SEC = (30 * 60) - 30;
public const int DEFAULT_KEEPALIVE_SEC = 60;
public enum Context {
UNCONNECTED,
UNAUTHORIZED,
AUTHORIZED,
SELECTED,
EXAMINED,
IN_PROGRESS
}
public enum DisconnectReason {
LOCAL_CLOSE,
LOCAL_ERROR,
REMOTE_CLOSE,
REMOTE_ERROR
}
// Need this because delegates with targets cannot be stored in ADTs.
private class CommandCallback {
public SourceFunc callback;
@ -149,7 +165,8 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
private uint default_port;
private Geary.State.Machine fsm;
private ClientConnection? cx = null;
private Mailbox? current_mailbox = null;
private string? current_mailbox = null;
private bool current_mailbox_readonly = false;
private Gee.Queue<CommandCallback> cb_queue = new Gee.LinkedList<CommandCallback>();
private Gee.Queue<CommandResponse> cmd_response_queue = new Gee.LinkedList<CommandResponse>();
private CommandResponse current_cmd_response = new CommandResponse();
@ -161,6 +178,26 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
private AsyncParams? connect_params = null;
private AsyncParams? disconnect_params = null;
public virtual signal void connected() {
}
public virtual signal void authorized() {
}
public virtual signal void logged_out() {
}
public virtual signal void disconnected(DisconnectReason reason) {
}
/**
* If the mailbox name is null it indicates the type of state change that has occurred
* (authorized -> selected/examined or vice-versa). If new_name is null readonly should be
* ignored.
*/
public virtual signal void current_mailbox_changed(string? old_name, string? new_name, bool readonly) {
}
public virtual signal void unsolicited_expunged(MessageNumber msg) {
}
@ -247,6 +284,7 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
new Geary.State.Mapping(State.SELECTED, Event.RECV_ERROR, on_recv_error),
new Geary.State.Mapping(State.CLOSING_MAILBOX, Event.SEND_CMD, on_send_command),
new Geary.State.Mapping(State.CLOSING_MAILBOX, Event.CLOSE_MAILBOX, Geary.State.nop),
new Geary.State.Mapping(State.CLOSING_MAILBOX, Event.DISCONNECT, on_disconnect),
new Geary.State.Mapping(State.CLOSING_MAILBOX, Event.CLOSED_MAILBOX, on_closed_mailbox),
new Geary.State.Mapping(State.CLOSING_MAILBOX, Event.CLOSE_MAILBOX_FAILED, on_close_mailbox_failed),
@ -303,6 +341,47 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
return (cx != null) ? cx.generate_tag() : null;
}
public string? get_current_mailbox() {
return current_mailbox;
}
public bool is_current_mailbox_readonly() {
return current_mailbox_readonly;
}
public Context get_context(out string? current_mailbox) {
current_mailbox = null;
switch (fsm.get_state()) {
case State.DISCONNECTED:
case State.LOGGED_OUT:
case State.LOGGING_OUT:
case State.DISCONNECTING:
case State.BROKEN:
return Context.UNCONNECTED;
case State.NOAUTH:
return Context.UNAUTHORIZED;
case State.AUTHORIZED:
return Context.AUTHORIZED;
case State.SELECTED:
current_mailbox = this.current_mailbox;
return current_mailbox_readonly ? Context.EXAMINED : Context.SELECTED;
case State.CONNECTING:
case State.AUTHORIZING:
case State.SELECTING:
case State.CLOSING_MAILBOX:
return Context.IN_PROGRESS;
default:
assert_not_reached();
}
}
//
// connect
//
@ -372,8 +451,10 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
try {
StringParameter status_param = (StringParameter) connect_response.get_as(
1, typeof(StringParameter));
issue_status(Status.from_parameter(status_param), Event.CONNECTED, Event.CONNECT_DENIED,
connect_response);
if (issue_status(Status.from_parameter(status_param), Event.CONNECTED, Event.CONNECT_DENIED,
connect_response)) {
connected();
}
} catch (ImapError imap_err) {
connect_params.err = imap_err;
fsm.issue(Event.CONNECT_DENIED);
@ -426,7 +507,8 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
}
private void on_login_completed(Object? source, AsyncResult result) {
generic_issue_command_completed(result, Event.LOGIN_SUCCESS, Event.LOGIN_FAILED);
if (generic_issue_command_completed(result, Event.LOGIN_SUCCESS, Event.LOGIN_FAILED))
authorized();
}
private uint on_login_success(uint state, uint event, void *user) {
@ -445,15 +527,22 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
//
/**
* Returns true if keepalives are activated, false if already enabled.
* If seconds is negative or zero, keepalives will be disabled. (This is not recommended.)
*
* Although keepalives can be enabled at any time, if they're enabled and trigger sending
* a command prior to connection, error signals may be fired.
*/
public bool enable_keepalives(int seconds = DEFAULT_KEEPALIVE_SEC) {
public void enable_keepalives(int seconds = DEFAULT_KEEPALIVE_SEC) {
if (seconds <= 0) {
disable_keepalives();
return;
}
if (keepalive_id != 0)
return false;
Source.remove(keepalive_id);
keepalive_id = Timeout.add_seconds(seconds, on_keepalive);
return true;
}
/**
@ -556,16 +645,18 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
// select/examine
//
public async Mailbox select_async(string mailbox, Cancellable? cancellable = null) throws Error {
public async string select_async(string mailbox, Cancellable? cancellable = null) throws Error {
return yield select_examine_async(mailbox, true, cancellable);
}
public async Mailbox examine_async(string mailbox, Cancellable? cancellable = null) throws Error {
public async string examine_async(string mailbox, Cancellable? cancellable = null) throws Error {
return yield select_examine_async(mailbox, false, cancellable);
}
private async Mailbox select_examine_async(string mailbox, bool is_select, Cancellable? cancellable)
public async string select_examine_async(string mailbox, bool is_select, Cancellable? cancellable)
throws Error {
string? old_mailbox = current_mailbox;
SelectParams params = new SelectParams(mailbox, is_select, cancellable,
select_examine_async.callback);
fsm.issue(Event.SELECT, null, params);
@ -576,7 +667,11 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
if (params.err != null)
throw params.err;
// TODO: We may want to move this signal into the async completion handler rather than
// fire it here because async callbacks are scheduled on the event loop and their order
// of execution is not guaranteed
assert(current_mailbox != null);
current_mailbox_changed(old_mailbox, current_mailbox, current_mailbox_readonly);
return current_mailbox;
}
@ -586,7 +681,7 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
SelectParams params = (SelectParams) object;
if (current_mailbox != null && current_mailbox.name == params.mailbox)
if (current_mailbox != null && current_mailbox == params.mailbox)
return state;
// TODO: Currently don't handle situation where one mailbox is selected and another is
@ -615,7 +710,8 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
SelectParams params = (SelectParams) object;
assert(current_mailbox == null);
current_mailbox = new Mailbox(params.mailbox, this);
current_mailbox = params.mailbox;
current_mailbox_readonly = !params.is_select;
return State.SELECTED;
}
@ -636,6 +732,8 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
//
public async void close_mailbox_async(Cancellable? cancellable = null) throws Error {
string? old_mailbox = current_mailbox;
AsyncParams params = new AsyncParams(cancellable, close_mailbox_async.callback);
fsm.issue(Event.CLOSE_MAILBOX, null, params);
@ -644,6 +742,16 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
if (params.err != null)
throw params.err;
assert(current_mailbox == null);
// possible for a close_mailbox to occur when already closed, but don't fire signal in
// that case
//
// TODO: See note in select_examine_async() for why it might be better to fire this signal
// in the async completion handler rather than here
if (old_mailbox != null)
current_mailbox_changed(old_mailbox, null, false);
}
private uint on_close_mailbox(uint state, uint event, void *user, Object? object) {
@ -674,7 +782,7 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
AsyncParams params = (AsyncParams) object;
params.err = new ImapError.COMMAND_FAILED("Unable to close mailbox \"%s\": %s",
current_mailbox.name, params.cmd_response.to_string());
current_mailbox, params.cmd_response.to_string());
return State.SELECTED;
}
@ -710,7 +818,8 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
}
private void on_logout_completed(Object? source, AsyncResult result) {
generic_issue_command_completed(result, Event.LOGOUT_SUCCESS, Event.LOGOUT_FAILED);
if (generic_issue_command_completed(result, Event.LOGOUT_SUCCESS, Event.LOGOUT_FAILED))
logged_out();
}
private uint on_logged_out(uint state, uint event, void *user) {
@ -751,6 +860,8 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
try {
cx.disconnect_async.end(result);
fsm.issue(Event.DISCONNECTED);
disconnected(DisconnectReason.LOCAL_CLOSE);
} catch (Error err) {
fsm.issue(Event.SEND_ERROR, null, null, err);
disconnect_params.err = err;
@ -780,19 +891,33 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
debug("Send error on %s: %s", to_full_string(), err.message);
cx = null;
Idle.add(on_fire_send_error_signal);
return State.BROKEN;
}
private bool on_fire_send_error_signal() {
disconnected(DisconnectReason.LOCAL_ERROR);
return false;
}
private uint on_recv_error(uint state, uint event, void *user, Object? object, Error? err) {
assert(err != null);
debug("Receive error on %s: %s", to_full_string(), err.message);
cx = null;
Idle.add(on_fire_recv_error_signal);
return State.BROKEN;
}
private bool on_fire_recv_error_signal() {
disconnected(DisconnectReason.REMOTE_ERROR);
return false;
}
// This handles the situation where the user submits a command before the connection has been
// established
private uint on_early_command(uint state, uint event, void *user, Object? object) {
@ -836,8 +961,10 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
// command submission
//
private void issue_status(Status status, Event ok_event, Event error_event, Object? object) {
private bool issue_status(Status status, Event ok_event, Event error_event, Object? object) {
fsm.issue((status == Status.OK) ? ok_event : error_event, null, object);
return (status == Status.OK);
}
private async AsyncCommandResponse issue_command_async(Command cmd, Object? user = null,
@ -888,24 +1015,6 @@ public class Geary.Imap.ClientSession : Object, Geary.Account {
return success;
}
//
// Geary.Account
//
public async Gee.Collection<string> list(string parent, Cancellable? cancellable = null) throws Error {
string specifier = String.is_empty(parent) ? "/" : parent;
specifier += (specifier.has_suffix("/")) ? "%" : "/%";
ListResults results = ListResults.decode(yield send_command_async(
new ListCommand(generate_tag(), specifier), cancellable));
return results.get_names();
}
public async Geary.Folder open(string mailbox, Cancellable? cancellable = null) throws Error {
return yield examine_async(mailbox, cancellable);
}
//
// network connection event handlers
//

View file

@ -0,0 +1,103 @@
/* Copyright 2011 Yorba Foundation
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
public class Geary.Imap.ClientSessionManager : Object, Geary.Account {
private string server;
private uint default_port;
private string user;
private string pass;
private Gee.HashSet<ClientSession> sessions = new Gee.HashSet<ClientSession>();
private int keepalive_sec = ClientSession.DEFAULT_KEEPALIVE_SEC;
public ClientSessionManager(string server, uint default_port, string user, string pass) {
this.server = server;
this.default_port = default_port;
this.user = user;
this.pass = pass;
}
/**
* Set to zero or negative value if keepalives should be disabled. (This is not recommended.)
*/
public void set_keepalive(int keepalive_sec) {
// set for future connections
this.keepalive_sec = keepalive_sec;
// set for all current connections
foreach (ClientSession session in sessions)
session.enable_keepalives(keepalive_sec);
}
public async Gee.Collection<string> list(string parent, Cancellable? cancellable = null) throws Error {
string specifier = String.is_empty(parent) ? "/" : parent;
specifier += (specifier.has_suffix("/")) ? "%" : "/%";
ClientSession session = yield get_authorized_session(cancellable);
ListResults results = ListResults.decode(yield session.send_command_async(
new ListCommand(session.generate_tag(), specifier), cancellable));
return results.get_names();
}
public async Geary.Folder open(string folder, Cancellable? cancellable = null) throws Error {
return new Mailbox(yield examine_async(folder, cancellable), on_destroying_mailbox);
}
private async ClientSession get_authorized_session(Cancellable? cancellable = null) throws Error {
foreach (ClientSession session in sessions) {
string? mailbox;
if (session.get_context(out mailbox) == ClientSession.Context.AUTHORIZED)
return session;
}
ClientSession new_session = new ClientSession(server, default_port);
yield new_session.connect_async(cancellable);
yield new_session.login_async(user, pass, cancellable);
// do this after logging in
new_session.enable_keepalives(keepalive_sec);
sessions.add(new_session);
return new_session;
}
public async ClientSession select_async(string folder, Cancellable? cancellable = null)
throws Error {
return yield select_examine_async(folder, true, cancellable);
}
public async ClientSession examine_async(string folder, Cancellable? cancellable = null)
throws Error {
return yield select_examine_async(folder, false, cancellable);
}
public async ClientSession select_examine_async(string folder, bool is_select,
Cancellable? cancellable = null) throws Error {
ClientSession.Context needed_context = (is_select) ? ClientSession.Context.SELECTED
: ClientSession.Context.EXAMINED;
foreach (ClientSession session in sessions) {
string? mailbox;
if (session.get_context(out mailbox) == needed_context && mailbox == folder)
return session;
}
ClientSession authd = yield get_authorized_session(cancellable);
yield authd.select_examine_async(folder, is_select, cancellable);
return authd;
}
private void on_destroying_mailbox(Mailbox mailbox) {
ClientSession? session = mailbox.get_client_session();
if (session != null)
session.close_mailbox_async.begin(null);
}
}

View file

@ -6,42 +6,49 @@
public class Geary.Imap.Mailbox : Object, Geary.Folder {
public string name { get; private set; }
public bool is_readonly { get; private set; }
private ClientSession sess;
private bool is_closed = false;
private ClientSession? session;
private Geary.Delegate.DestructorNotifier<Mailbox>? dtor_notifier;
internal Mailbox(string name, ClientSession sess) {
this.name = name;
this.sess = sess;
internal Mailbox(ClientSession session, Geary.Delegate.DestructorNotifier<Mailbox>? dtor_notifier) {
this.session = session;
this.dtor_notifier = dtor_notifier;
name = session.get_current_mailbox();
is_readonly = session.is_current_mailbox_readonly();
session.current_mailbox_changed.connect(on_session_mailbox_changed);
session.logged_out.connect(on_session_logged_out);
session.disconnected.connect(on_session_disconnected);
}
~Mailbox() {
assert(is_closed);
}
public MessageStream? read(int low, int count) {
return new MessageStreamImpl(sess, low, count);
}
public async void close(Cancellable? cancellable = null) throws Error {
yield sess.close_mailbox_async(cancellable);
is_closed = true;
}
}
private class Geary.Imap.MessageStreamImpl : Object, Geary.MessageStream {
private ClientSession sess;
private string span;
public MessageStreamImpl(ClientSession sess, int low, int count) {
assert(count > 0);
if (session != null) {
session.current_mailbox_changed.disconnect(on_session_mailbox_changed);
session.logged_out.disconnect(on_session_logged_out);
session.disconnected.disconnect(on_session_disconnected);
}
this.sess = sess;
span = (count > 1) ? "%d:%d".printf(low, low + count - 1) : "%d".printf(low);
if (dtor_notifier != null)
dtor_notifier(this);
}
public async Gee.List<Message>? read(Cancellable? cancellable = null) throws Error {
CommandResponse resp = yield sess.send_command_async(new FetchCommand(sess.generate_tag(),
internal ClientSession? get_client_session() {
return session;
}
public bool is_closed() {
return (session == null);
}
public async Gee.List<Message>? read(int low, int count, Cancellable? cancellable = null) throws Error {
if (is_closed())
throw new IOError.NOT_FOUND("Folder closed");
string span = (count > 1) ? "%d:%d".printf(low, low + count - 1) : "%d".printf(low);
CommandResponse resp = yield session.send_command_async(new FetchCommand(session.generate_tag(),
span, { FetchDataType.ENVELOPE }), cancellable);
if (resp.status_response.status != Status.OK)
@ -57,5 +64,39 @@ private class Geary.Imap.MessageStreamImpl : Object, Geary.MessageStream {
return msgs;
}
private void close(Geary.Folder.CloseReason reason) {
if (session == null)
return;
session = null;
closed(reason);
}
private void on_session_mailbox_changed(string? old_mailbox, string? new_mailbox, bool readonly) {
// this always mean one thing: this object is no longer valid
close(CloseReason.FOLDER_CLOSED);
}
private void on_session_logged_out() {
close(CloseReason.LOCAL_CLOSE);
}
private void on_session_disconnected(ClientSession.DisconnectReason reason) {
switch (reason) {
case ClientSession.DisconnectReason.LOCAL_CLOSE:
case ClientSession.DisconnectReason.LOCAL_ERROR:
close(CloseReason.LOCAL_CLOSE);
break;
case ClientSession.DisconnectReason.REMOTE_CLOSE:
case ClientSession.DisconnectReason.REMOTE_ERROR:
close(CloseReason.REMOTE_CLOSE);
break;
default:
assert_not_reached();
}
}
}

View file

@ -44,7 +44,6 @@ public class Geary.Imap.Serializer {
case DataFormat.Quoting.REQUIRED:
string quoted;
DataFormat.Quoting requirement = DataFormat.convert_to_quoted(str, out quoted);
debug("str=%s quoted=%s", str, quoted);
assert(requirement == DataFormat.Quoting.REQUIRED);
douts.put_string(quoted);

View file

@ -0,0 +1,12 @@
/* Copyright 2011 Yorba Foundation
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
namespace Geary.Delegate {
public delegate void DestructorNotifier<G>(G object);
}

View file

@ -4,7 +4,7 @@
* (version 2.1 or later). See the COPYING file in this distribution.
*/
namespace String {
namespace Geary.String {
public inline bool is_empty(string? str) {
return (str == null || str[0] == 0);