Perform database upgrade in background thread: Closes #7206

Db.VersionedDatabase.open_background() will do open() in background
thread.  ImapDB.Database now uses upcalls to schedule progress
monitor updates and a polled callback to pump the event loop.
This commit is contained in:
Jim Nelson 2013-09-24 17:08:02 -07:00
parent 0b4bc055b2
commit 1cca5a5e1a
5 changed files with 258 additions and 32 deletions

View file

@ -284,6 +284,7 @@ engine/util/util-scheduler.vala
engine/util/util-single-item.vala
engine/util/util-stream.vala
engine/util/util-string.vala
engine/util/util-synchronization.vala
engine/util/util-trillian.vala
)

View file

@ -5,22 +5,52 @@
*/
public class Geary.Db.VersionedDatabase : Geary.Db.Database {
public File schema_dir { get; private set; }
public ProgressMonitor upgrade_monitor { get; private set; }
public delegate void WorkCallback();
public VersionedDatabase(File db_file, File schema_dir, ProgressMonitor upgrade_monitor) {
public File schema_dir { get; private set; }
public VersionedDatabase(File db_file, File schema_dir) {
base (db_file);
this.schema_dir = schema_dir;
this.upgrade_monitor = upgrade_monitor;
}
/**
* Called by {@link open} if a schema upgrade is required and beginning.
*
* If called by {@link open_background}, this will be called in the context of a background
* thread.
*/
protected virtual void starting_upgrade(int current_version) {
}
/**
* Called by {@link open} just before performing a schema upgrade step.
*
* If called by {@link open_background}, this will be called in the context of a background
* thread.
*/
protected virtual void pre_upgrade(int version) {
}
/**
* Called by {@link open} just after performing a schema upgrade step.
*
* If called by {@link open_background}, this will be called in the context of a background
* thread.
*/
protected virtual void post_upgrade(int version) {
}
/**
* Called by {@link open} if a schema upgrade was required and has now completed.
*
* If called by {@link open_background}, this will be called in the context of a background
* thread.
*/
protected virtual void completed_upgrade(int final_version) {
}
private File get_schema_file(int db_version) {
return schema_dir.get_child("version-%03d.sql".printf(db_version));
}
@ -61,15 +91,16 @@ public class Geary.Db.VersionedDatabase : Geary.Db.Database {
}
// Go through all the version scripts in the schema directory and apply each of them.
bool started = false;
for (;;) {
File upgrade_script = get_schema_file(++db_version);
if (!upgrade_script.query_exists(cancellable))
break;
if (!upgrade_monitor.is_in_progress)
upgrade_monitor.notify_start();
pump_event_loop();
if (!started) {
starting_upgrade(db_version);
started = true;
}
pre_upgrade(db_version);
@ -89,18 +120,62 @@ public class Geary.Db.VersionedDatabase : Geary.Db.Database {
throw err;
}
pump_event_loop();
post_upgrade(db_version);
}
if (upgrade_monitor.is_in_progress)
upgrade_monitor.notify_finish();
if (started)
completed_upgrade(db_version);
}
protected void pump_event_loop() {
while (Gtk.events_pending())
Gtk.main_iteration();
/**
* Opens the database in a background thread so foreground work can be performed while updating.
*
* Since {@link open} may take a considerable amount of time for a {@link VersionedDatabase},
* background_open() can be used to perform that work in a thread while the calling thread
* "pumps" a {@link WorkCallback} every work_cb_msec milliseconds. In general, this is
* designed for allowing an event queue to execute tasks or update a progress monitor of some
* kind.
*
* Note that the database is not opened while the callback is executing and so it should not
* call into the database (unless it's a call safe to use prior to open).
*
* If work_cb_sec is zero or less, WorkCallback is called continuously, which may or may not be
* desired.
*
* @see open
*/
public void open_background(DatabaseFlags flags, PrepareConnection? prepare_cb,
WorkCallback work_cb, int work_cb_msec, Cancellable? cancellable = null) throws Error {
// use a SpinWaiter to safely wait for the thread to exit while occassionally calling the
// WorkCallback (which can not abort in current impl.) to do foreground work.
Synchronization.SpinWaiter waiter = new Synchronization.SpinWaiter(work_cb_msec, () => {
work_cb();
// continue (never abort)
return true;
});
// do the open in a background thread
Error? thread_err = null;
Thread<bool> thread = new Thread<bool>.try("Geary.Db.VersionedDatabase.open()", () => {
try {
open(flags, prepare_cb, cancellable);
} catch (Error err) {
thread_err = err;
}
// notify the foreground waiter we're done
waiter.notify();
return true;
});
// wait until thread is completed and then dispose of it
waiter.wait();
thread = null;
if (thread_err != null)
throw thread_err;
}
}

View file

@ -75,7 +75,7 @@ private class Geary.ImapDB.Account : BaseObject {
try {
db.open(
Db.DatabaseFlags.CREATE_DIRECTORY | Db.DatabaseFlags.CREATE_FILE | Db.DatabaseFlags.CHECK_CORRUPTION,
null, cancellable);
cancellable);
} catch (Error err) {
warning("Unable to open database: %s", err.message);

View file

@ -8,11 +8,16 @@ extern int sqlite3_unicodesn_register_tokenizer(Sqlite.Database db);
private class Geary.ImapDB.Database : Geary.Db.VersionedDatabase {
private const string DB_FILENAME = "geary.db";
private const int OPEN_PUMP_EVENT_LOOP_MSEC = 100;
private ProgressMonitor upgrade_monitor;
private string account_owner_email;
public Database(File db_dir, File schema_dir, ProgressMonitor upgrade_monitor,
string account_owner_email) {
base (get_db_file(db_dir), schema_dir, upgrade_monitor);
base (get_db_file(db_dir), schema_dir);
this.upgrade_monitor = upgrade_monitor;
this.account_owner_email = account_owner_email;
}
@ -20,14 +25,41 @@ private class Geary.ImapDB.Database : Geary.Db.VersionedDatabase {
return db_dir.get_child(DB_FILENAME);
}
public override void open(Db.DatabaseFlags flags, Db.PrepareConnection? prepare_cb,
Cancellable? cancellable = null) throws Error {
// have to do it this way because delegates don't play well with the ternary or nullable
// operators
if (prepare_cb != null)
base.open(flags, prepare_cb, cancellable);
else
base.open(flags, on_prepare_database_connection, cancellable);
/**
* Opens the ImapDB database.
*
* This should only be done from the main thread, as it is designed to pump the event loop
* while the database is being opened and updated.
*/
public new void open(Db.DatabaseFlags flags, Cancellable? cancellable) throws Error {
open_background(flags, on_prepare_database_connection, pump_event_loop,
OPEN_PUMP_EVENT_LOOP_MSEC, cancellable);
}
private void pump_event_loop() {
while (MainContext.default().pending())
MainContext.default().iteration(true);
}
protected override void starting_upgrade(int current_version) {
// can't call the ProgressMonitor directly, as it's hooked up to signals that expect to be
// called in the foreground thread, so use the Idle loop for this
Idle.add(() => {
if (!upgrade_monitor.is_in_progress)
upgrade_monitor.notify_start();
return false;
});
}
protected override void completed_upgrade(int final_version) {
// see starting_upgrade() for explanation why this is done in Idle loop
Idle.add(() => {
if (upgrade_monitor.is_in_progress)
upgrade_monitor.notify_finish();
return false;
});
}
protected override void post_upgrade(int version) {
@ -63,7 +95,6 @@ private class Geary.ImapDB.Database : Geary.Db.VersionedDatabase {
new MessageAddresses.from_result(account_owner_email, result);
foreach (Contact contact in message_addresses.contacts) {
do_update_contact(get_master_connection(), contact, null);
pump_event_loop();
}
result.next();
@ -93,8 +124,6 @@ private class Geary.ImapDB.Database : Geary.Db.VersionedDatabase {
}
select.next();
pump_event_loop();
}
} catch (Error e) {
debug("Error decoding folder names during upgrade to database schema 6: %s", e.message);
@ -187,8 +216,6 @@ private class Geary.ImapDB.Database : Geary.Db.VersionedDatabase {
}
select.next();
pump_event_loop();
}
return Db.TransactionOutcome.COMMIT;
@ -229,7 +256,6 @@ private class Geary.ImapDB.Database : Geary.Db.VersionedDatabase {
}
select.next();
pump_event_loop();
}
// additionally, because this schema change (and code changes as well) introduces

View file

@ -0,0 +1,124 @@
/* Copyright 2013 Yorba Foundation
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
namespace Geary.Synchronization {
/**
* A synchronization primitive that spins waiting for a completion state to be reached.
*
* SpinWaiter allows for the caller to specify work to be performed periodically in a callback
* while waiting for another thread to notify completion.
*/
public class SpinWaiter : BaseObject {
public delegate bool PollService();
private int poll_msec;
private PollService cb;
private Mutex mutex = Mutex();
private Cond cond = Cond();
private bool notified = false;
/**
* Create a {@link SpinWaiter}.
*
* poll_msec indicates how long to delay while spinning before interrupting and allowing
* the {@link PollService} to execute. If poll_msec is zero or less, PollService will be
* called constantly.
*/
public SpinWaiter(int poll_msec, PollService cb) {
this.poll_msec = poll_msec;
this.cb = cb;
}
/**
* Spins waiting for a completion state to be reached.
*
* There's two ways the completion state can be reached: (1) PollService returns false,
* indicating an abort state, (2) {@link stop} is called, indicating a success state, or
* (3) the Cancellable was cancelled, causing an IOError.CANCELLED exception to be thrown.
*
* {@link PollService} will be called from within the calling thread context.
*
* Although this is thread-safe, it's not designed to be invoked by multiple callers. That
* could cause the PollService callback to be called more often than specified in the
* constructor.
*
* @see stop
*/
public bool wait(Cancellable? cancellable = null) throws Error {
// normalize poll_msec; negative values are zeroed
int64 actual_poll_msec = Numeric.int64_floor(0, poll_msec);
bool result;
mutex.lock();
while (!notified) {
if (cancellable != null && cancellable.is_cancelled())
break;
int64 end_time = get_monotonic_time() + (actual_poll_msec * TimeSpan.MILLISECOND);
if (!cond.wait_until(mutex, end_time)) {
// timeout passed, allow the callback to run
if (!cb()) {
// PollService returned false, abort
break;
}
}
}
result = notified;
mutex.unlock();
if (cancellable.is_cancelled())
throw new IOError.CANCELLED("SpinWaiter.wait cancelled");
return result;
}
/**
* Signals a completion state to a thread calling {@link spin}.
*
* This call is thread-safe. However, once a {@link SpinWaiter} has been signalled to stop,
* it cannot be restarted.
*
* @see spin
*/
public new void notify() {
mutex.lock();
notified = true;
cond.broadcast();
mutex.unlock();
}
/**
* Indicates if the {@link SpinWaiter} has been notified.
*
* Other completion states (PollService returning false, Cancellable being cancelled in
* {@link wait}) are not recorded here.
*
* This method is thread-safe.
*
* @see notify
*/
public bool is_notified() {
bool result;
mutex.lock();
result = notified;
mutex.unlock();
return result;
}
}
}