Rework how the client session manager manages the IMAP session pool.

* src/engine/imap/transport/imap-client-session-manager.vala
  (ClientSessionManager): Use a non-blocking queue for free sessions
  rather than a set of reserved sessions, so we can instantly pick a free
  one when needed and available. Manage establishing new connections from
  one place in check_pool (was adjust_session_pool) rather than in a few
  different places in the class. Greatly simply lock management and
  connection establishment code. Add some doc comments, clean up code
  organisation.
This commit is contained in:
Michael James Gratton 2018-01-18 14:59:41 +11:00
parent 8536ad276f
commit 938033f3a4

View file

@ -1,16 +1,31 @@
/*
* Copyright 2016 Software Freedom Conservancy Inc.
* Copyright 2017 Michael Gratton <mike@vee.net>
* Copyright 2017-2018 Michael Gratton <mike@vee.net>
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* Manages a pool of IMAP client sessions.
*
* When opened and when reachable, the manager will establish a pool
* of {@link ClientSession} instances that are connected to the IMAP
* endpoint of an account, ensuring there are at least {@link
* min_pool_size} available. A connected, authorised client session
* can be obtained from the connection pool by calling {@link
* claim_authorized_session_async}, and when finished with returned by
* calling {@link release_session_async}.
*
* This class is not thread-safe.
*/
public class Geary.Imap.ClientSessionManager : BaseObject {
private const int DEFAULT_MIN_POOL_SIZE = 1;
private const int POOL_START_TIMEOUT_SEC = 4;
private const int POOL_STOP_TIMEOUT_SEC = 1;
private const int POOL_START_TIMEOUT_SEC = 1;
private const int POOL_STOP_TIMEOUT_SEC = 3;
/** Determines if the manager has been opened. */
public bool is_open { get; private set; default = false; }
@ -32,7 +47,7 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
* and returning to an authorized state.
*/
public uint unselected_keepalive_sec { get; set; default = ClientSession.DEFAULT_UNSELECTED_KEEPALIVE_SEC; }
/**
* Set to zero or negative value if keepalives should be disabled when a mailbox is selected
* or examined. (This is not recommended.)
@ -40,7 +55,7 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
* This only affects newly selected/examined sessions.
*/
public uint selected_keepalive_sec { get; set; default = ClientSession.DEFAULT_SELECTED_KEEPALIVE_SEC; }
/**
* Set to zero or negative value if keepalives should be disabled when a mailbox is selected
* or examined and IDLE is supported. (This is not recommended.)
@ -48,7 +63,7 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
* This only affects newly selected/examined sessions.
*/
public uint selected_with_idle_keepalive_sec { get; set; default = ClientSession.DEFAULT_SELECTED_WITH_IDLE_KEEPALIVE_SEC; }
/**
* ClientSessionManager attempts to maintain a minimum number of open sessions with the server
* so they're immediately ready for use.
@ -65,15 +80,19 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
private AccountInformation account_information;
private Endpoint endpoint;
private Gee.HashSet<ClientSession> sessions = new Gee.HashSet<ClientSession>();
private int pending_sessions = 0;
private Nonblocking.Mutex sessions_mutex = new Nonblocking.Mutex();
private Gee.HashSet<ClientSession> reserved_sessions = new Gee.HashSet<ClientSession>();
private bool authentication_failed = false;
private bool untrusted_host = false;
private Gee.Set<ClientSession> all_sessions =
new Gee.HashSet<ClientSession>();
private Nonblocking.Queue<ClientSession> free_queue =
new Nonblocking.Queue<ClientSession>.fifo();
private TimeoutManager pool_start;
private TimeoutManager pool_stop;
private Cancellable? pool_cancellable = null;
private bool authentication_failed = false;
private bool untrusted_host = false;
/**
* Fired after when the manager has a working connection.
@ -103,7 +122,7 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
this.pool_start = new TimeoutManager.seconds(
POOL_START_TIMEOUT_SEC,
() => { this.adjust_session_pool.begin(); }
() => { this.check_pool.begin(); }
);
this.pool_stop = new TimeoutManager.seconds(
@ -114,7 +133,7 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
~ClientSessionManager() {
if (is_open)
warning("Destroying opened ClientSessionManager");
warning("[%s] Destroying opened ClientSessionManager", to_string());
this.endpoint.untrusted_host.disconnect(on_imap_untrusted_host);
this.endpoint.notify[Endpoint.PROP_TRUST_UNTRUSTED_HOST].disconnect(on_imap_trust_untrusted_host);
@ -126,11 +145,12 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
this.is_open = true;
this.authentication_failed = false;
this.pool_cancellable = new Cancellable();
this.endpoint.connectivity.notify["is-reachable"].connect(on_connectivity_change);
this.endpoint.connectivity.address_error_reported.connect(on_connectivity_error);
if (this.endpoint.connectivity.is_reachable.is_certain()) {
this.adjust_session_pool.begin();
this.check_pool.begin();
} else {
this.endpoint.connectivity.check_reachable.begin();
}
@ -145,6 +165,7 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
this.pool_start.reset();
this.pool_stop.reset();
this.pool_cancellable.cancel();
this.endpoint.connectivity.notify["is-reachable"].disconnect(on_connectivity_change);
this.endpoint.connectivity.address_error_reported.disconnect(on_connectivity_error);
@ -154,11 +175,11 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
// TODO: This isn't the best (deterministic) way to deal with this, but it's easy and works
// for now
int attempts = 0;
while (sessions.size > 0) {
debug("Waiting for ClientSessions to disconnect from ClientSessionManager...");
while (this.all_sessions.size > 0) {
debug("[%s] Waiting for client sessions to disconnect...", to_string());
Timeout.add(250, close_async.callback);
yield;
// give up after three seconds
if (++attempts > 12)
break;
@ -174,199 +195,82 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
public void credentials_updated() {
this.authentication_failed = false;
if (this.is_open) {
this.adjust_session_pool.begin();
this.check_pool.begin();
}
}
private void check_open() throws Error {
if (!is_open)
throw new EngineError.OPEN_REQUIRED("ClientSessionManager is not open");
}
// TODO: Need a more thorough and bulletproof system for maintaining a pool of ready
// authorized sessions.
private async void adjust_session_pool() {
if (!this.is_open)
return;
/**
* Claims a free session, blocking until one becomes available.
*
* This call will fail fast if the pool is known to not in the
* right state (bad authorisation credentials, host not ready,
* etc), but then will block while attempting to obtain a
* connection if the free queue is empty. If an error occurs when
* this connection is in progress, then the call will block until
* another becomes available (host becomes reachable again, user
* enters password, etc). If this is undesirable, then the caller
* may cancel the call.
*
* @throws ImapError.UNAUTHENTICATED if the stored credentials are
* invalid.
* @throws ImapError.UNAVAILABLE if the IMAP endpoint is not
* trusted or is not reachable.
*/
public async ClientSession claim_authorized_session_async(Cancellable? cancellable)
throws Error {
check_open();
debug("[%s] Claiming session from %d of %d free",
to_string(), this.free_queue.size, this.all_sessions.size);
int token;
try {
token = yield sessions_mutex.claim_async();
} catch (Error claim_err) {
debug("Unable to claim session table mutex for adjusting pool: %s", claim_err.message);
return;
}
while ((sessions.size + pending_sessions) < min_pool_size
&& this.is_open
&& !this.authentication_failed
&& !this.untrusted_host
&& this.endpoint.connectivity.is_reachable.is_certain()) {
this.pending_sessions++;
create_new_authorized_session.begin(
null,
(obj, res) => {
this.pending_sessions--;
try {
this.create_new_authorized_session.end(res);
} catch (Error err) {
connection_failed(err);
}
});
}
try {
sessions_mutex.release(ref token);
} catch (Error release_err) {
debug("Unable to release session table mutex after adjusting pool: %s", release_err.message);
}
}
private async ClientSession create_new_authorized_session(Cancellable? cancellable) throws Error {
if (authentication_failed)
if (this.authentication_failed)
throw new ImapError.UNAUTHENTICATED("Invalid ClientSessionManager credentials");
if (untrusted_host)
if (this.untrusted_host)
throw new ImapError.UNAVAILABLE("Untrusted host %s", endpoint.to_string());
if (!this.endpoint.connectivity.is_reachable.is_certain())
throw new ImapError.UNAVAILABLE("Host at %s is unreachable", endpoint.to_string());
ClientSession new_session = new ClientSession(endpoint);
// add session to pool before launching all the connect activity so error cases can properly
// back it out
if (sessions_mutex.is_locked())
locked_add_session(new_session);
else
yield unlocked_add_session_async(new_session);
try {
yield new_session.connect_async(cancellable);
} catch (Error err) {
debug("[%s] Connect failure: %s", new_session.to_string(), err.message);
bool removed;
if (sessions_mutex.is_locked())
removed = locked_remove_session(new_session);
else
removed = yield unlocked_remove_session_async(new_session);
assert(removed);
throw err;
}
try {
yield new_session.initiate_session_async(account_information.imap_credentials, cancellable);
} catch (Error err) {
debug("[%s] Initiate session failure: %s", new_session.to_string(), err.message);
// need to disconnect before throwing error ... don't honor Cancellable here, it's
// important to disconnect the client before dropping the ref
try {
yield new_session.disconnect_async();
} catch (Error disconnect_err) {
debug("[%s] Error disconnecting due to session initiation failure, ignored: %s",
new_session.to_string(), disconnect_err.message);
ClientSession? claimed = null;
while (claimed == null) {
// This isn't racy since this is class is not accessed by
// multiple threads. Don't wait for it though because we
// only want to kick off establishing the connection, and
// wait for it via the queue.
if (this.free_queue.size == 0) {
check_pool.begin();
}
bool removed;
if (sessions_mutex.is_locked())
removed = locked_remove_session(new_session);
else
removed = yield unlocked_remove_session_async(new_session);
assert(removed);
throw err;
}
if (!this.is_ready) {
this.is_ready = true;
ready();
}
claimed = yield this.free_queue.receive(cancellable);
// do this after logging in
new_session.enable_keepalives(selected_keepalive_sec, unselected_keepalive_sec,
selected_with_idle_keepalive_sec);
// since "disconnected" is used to remove the ClientSession from the sessions list, want
// to only connect to the signal once the object has been added to the list; otherwise it's
// possible a cancel during the connect or login will result in a "disconnected" signal,
// removing the session before it's added
new_session.disconnected.connect(on_disconnected);
return new_session;
}
public async ClientSession claim_authorized_session_async(Cancellable? cancellable) throws Error {
check_open();
int token = yield sessions_mutex.claim_async(cancellable);
ClientSession? found_session = null;
foreach (ClientSession session in sessions) {
MailboxSpecifier? mailbox;
if (!reserved_sessions.contains(session) &&
(session.get_protocol_state(out mailbox) == ClientSession.ProtocolState.AUTHORIZED)) {
found_session = session;
break;
// Connection may have gone bad sitting in the queue, so
// check it before using it
if (!(yield check_session(claimed, false))) {
claimed = null;
}
}
Error? err = null;
try {
if (found_session == null)
found_session = yield create_new_authorized_session(cancellable);
} catch (Error create_err) {
debug("Error creating session: %s", create_err.message);
err = create_err;
}
// claim it now
if (found_session != null) {
bool added = reserved_sessions.add(found_session);
assert(added);
}
try {
sessions_mutex.release(ref token);
} catch (Error release_err) {
debug("Error releasing sessions table mutex: %s", release_err.message);
}
if (err != null)
throw err;
return found_session;
return claimed;
}
public async void release_session_async(ClientSession session, Cancellable? cancellable)
throws Error {
// Don't check_open(), it's valid for this to be called when
// is_open is false, that happens during mop-up
MailboxSpecifier? mailbox = null;
ClientSession.ProtocolState context = session.get_protocol_state(out mailbox);
if (context == ClientSession.ProtocolState.UNCONNECTED) {
// Already disconnected, so drop it on the floor
try {
yield unlocked_remove_session_async(session);
} catch (Error err) {
debug("[%s] Error removing unconnected session: %s",
to_string(), err.message);
}
} else if (this.is_open && !this.discard_returned_sessions) {
bool free = false;
switch (context) {
case ClientSession.ProtocolState.AUTHORIZED:
case ClientSession.ProtocolState.CLOSING_MAILBOX:
// keep as-is, but add back to the free list
free = true;
break;
debug("[%s] Returning session with %d of %d free",
to_string(), this.free_queue.size, this.all_sessions.size);
case ClientSession.ProtocolState.SELECTED:
case ClientSession.ProtocolState.SELECTING:
if (!this.is_open || this.discard_returned_sessions) {
yield force_disconnect(session);
} else if (yield check_session(session, true)) {
bool free = true;
MailboxSpecifier? mailbox = null;
ClientSession.ProtocolState proto = session.get_protocol_state(out mailbox);
// If the session has a mailbox selected, close it before
// adding it back to the pool
if (proto == ClientSession.ProtocolState.SELECTED ||
proto == ClientSession.ProtocolState.SELECTING) {
debug("[%s] Closing %s for released session %s",
to_string(),
mailbox != null ? mailbox.to_string() : "(unknown)",
@ -378,60 +282,161 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
} catch (ImapError imap_error) {
debug("[%s] Error attempting to close released session %s: %s",
to_string(), session.to_string(), imap_error.message);
free = false;
}
if (session.get_protocol_state(out mailbox) == ClientSession.ProtocolState.AUTHORIZED) {
// Now in authorized state, free it up for re-use
free = true;
} else {
if (session.get_protocol_state(null) !=
ClientSession.ProtocolState.AUTHORIZED) {
// Closing it didn't work, so drop it
yield force_disconnect(session);
free = false;
}
break;
default:
// This class is tasked with holding onto a pool of
// authorized connections, so if one is released
// outside that state, pessimistically drop it
yield force_disconnect(session);
break;
}
if (free) {
debug("[%s] Unreserving session %s",
to_string(), session.to_string());
try {
int token = yield sessions_mutex.claim_async(cancellable);
this.reserved_sessions.remove(session);
this.sessions_mutex.release(ref token);
} catch (Error err) {
message("[%s] Unable to add %s to the free list: %s",
to_string(), session.to_string(), err.message);
}
this.free_queue.send(session);
}
} else {
// Not open, or we are discarding sessions, so close it.
yield force_disconnect(session);
}
// If we're discarding returned sessions, we don't want to
// create any more, so only twiddle the pool if not.
if (!this.discard_returned_sessions) {
this.adjust_session_pool.begin();
}
}
private void check_open() throws Error {
if (!is_open)
throw new EngineError.OPEN_REQUIRED("ClientSessionManager is not open");
}
private async void check_pool() {
debug("[%s] Checking session pool with %d of %d free",
to_string(), this.free_queue.size, this.all_sessions.size);
while (this.is_open &&
!this.authentication_failed &&
!this.untrusted_host &&
this.endpoint.connectivity.is_reachable.is_certain()) {
// Open pool sessions serially to avoid hammering the server
try {
ClientSession free = yield this.create_new_authorized_session(
this.pool_cancellable
);
yield this.sessions_mutex.execute_locked(() => {
this.all_sessions.add(free);
});
this.free_queue.send(free);
} catch (Error err) {
debug("[%s] Error adding free session pool: %s",
to_string(),
err.message);
break;
}
if (this.all_sessions.size >= this.min_pool_size) {
break;
}
}
}
/** Determines if a session is valid, disposing of it if not. */
private async bool check_session(ClientSession target, bool allow_selected) {
bool valid = false;
switch (target.get_protocol_state(null)) {
case ClientSession.ProtocolState.AUTHORIZED:
case ClientSession.ProtocolState.CLOSING_MAILBOX:
valid = true;
break;
case ClientSession.ProtocolState.SELECTED:
case ClientSession.ProtocolState.SELECTING:
if (allow_selected) {
valid = true;
} else {
yield force_disconnect(target);
}
break;
case ClientSession.ProtocolState.UNCONNECTED:
// Already disconnected, so drop it on the floor
try {
yield remove_session_async(target);
} catch (Error err) {
debug("[%s] Error removing unconnected session: %s",
to_string(), err.message);
}
break;
default:
yield force_disconnect(target);
break;
}
return valid;
}
private async ClientSession create_new_authorized_session(Cancellable? cancellable) throws Error {
ClientSession new_session = new ClientSession(endpoint);
// Listen for auth failures early so the client is notified if
// there is an error, even though we won't want to keep the
// session around.
new_session.login_failed.connect(on_login_failed);
try {
yield new_session.connect_async(cancellable);
} catch (Error err) {
debug("[%s] Connect failure: %s", new_session.to_string(), err.message);
connection_failed(err);
throw err;
}
try {
yield new_session.initiate_session_async(account_information.imap_credentials, cancellable);
} catch (Error err) {
debug("[%s] Initiate session failure: %s", new_session.to_string(), err.message);
// need to disconnect before throwing error ... don't honor Cancellable here, it's
// important to disconnect the client before dropping the ref
try {
yield new_session.disconnect_async();
} catch (Error disconnect_err) {
debug("[%s] Error disconnecting due to session initiation failure, ignored: %s",
new_session.to_string(), disconnect_err.message);
}
connection_failed(err);
throw err;
}
// Only bother tracking disconnects and enabling keeping alive
// now the session is properly established.
new_session.disconnected.connect(on_disconnected);
new_session.enable_keepalives(selected_keepalive_sec,
unselected_keepalive_sec,
selected_with_idle_keepalive_sec);
// We now have a good connection, so signal us as ready if not
// already done so.
if (!this.is_ready) {
this.is_ready = true;
ready();
}
return new_session;
}
/** Disconnects all sessions in the pool. */
private async void force_disconnect_all()
throws Error {
debug("[%s] Dropping and disconnecting %d sessions",
to_string(), this.sessions.size);
to_string(), this.all_sessions.size);
// Take a copy and work off that while scheduling disconnects,
// since as they disconnect they'll remove themselves from the
// sessions list and cause the loop below to explode.
int token = yield this.sessions_mutex.claim_async();
ClientSession[] to_close = this.sessions.to_array();
this.sessions_mutex.release(ref token);
ClientSession[]? to_close = null;
yield this.sessions_mutex.execute_locked(() => {
to_close = this.all_sessions.to_array();
});
// Disconnect all existing sessions at once. Don't block
// waiting for any since we don't want to delay closing the
@ -445,7 +450,7 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
debug("[%s] Dropping session %s", to_string(), session.to_string());
try {
yield unlocked_remove_session_async(session);
yield remove_session_async(session);
} catch (Error err) {
debug("[%s] Error removing session: %s", to_string(), err.message);
}
@ -455,12 +460,29 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
session.disconnect_async.begin();
}
private async bool remove_session_async(ClientSession session) throws Error {
// Ensure the session isn't held on to, anywhere
this.free_queue.revoke(session);
bool removed = false;
yield this.sessions_mutex.execute_locked(() => {
removed = this.all_sessions.remove(session);
});
if (removed) {
session.disconnected.disconnect(on_disconnected);
session.login_failed.disconnect(on_login_failed);
}
return removed;
}
private void on_disconnected(ClientSession session, ClientSession.DisconnectReason reason) {
this.unlocked_remove_session_async.begin(
this.remove_session_async.begin(
session,
(obj, res) => {
try {
this.unlocked_remove_session_async.end(res);
this.remove_session_async.end(res);
} catch (Error err) {
debug("[%s] Error removing disconnected session: %s",
to_string(),
@ -477,56 +499,20 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
session.disconnect_async.begin();
}
// Only call with sessions mutex locked
private void locked_add_session(ClientSession session) {
sessions.add(session);
// See create_new_authorized_session() for why the "disconnected" signal is not subscribed
// to here (but *is* unsubscribed to in remove_session())
session.login_failed.connect(on_login_failed);
}
private async void unlocked_add_session_async(ClientSession session) throws Error {
int token = yield sessions_mutex.claim_async();
locked_add_session(session);
sessions_mutex.release(ref token);
}
// Only call with sessions mutex locked
private bool locked_remove_session(ClientSession session) {
bool removed = sessions.remove(session);
if (removed) {
session.disconnected.disconnect(on_disconnected);
session.login_failed.disconnect(on_login_failed);
}
reserved_sessions.remove(session);
return removed;
}
private async bool unlocked_remove_session_async(ClientSession session) throws Error {
int token = yield sessions_mutex.claim_async();
bool removed = locked_remove_session(session);
sessions_mutex.release(ref token);
return removed;
}
private void on_imap_untrusted_host() {
// this is called any time trust issues are detected, so immediately clutch in to stop
// retries
untrusted_host = true;
}
private void on_imap_trust_untrusted_host() {
// fired when the trust_untrusted_host property changes, indicating if the user has agreed
// to ignore the trust problems and continue connecting
if (untrusted_host && endpoint.trust_untrusted_host == Trillian.TRUE) {
untrusted_host = false;
if (is_open)
adjust_session_pool.begin();
check_pool.begin();
}
}
@ -554,7 +540,9 @@ public class Geary.Imap.ClientSessionManager : BaseObject {
* Use only for debugging and logging.
*/
public string to_string() {
return "ClientSessionManager/%s %d sessions, %d reserved".printf(endpoint.to_string(),
sessions.size, reserved_sessions.size);
return "%s:%s".printf(
this.account_information.id,
endpoint.to_string()
);
}
}