Transactions added to database code: Closes #4235
The entire database module now uses Transactions in order to guarantee atomicity of all operations. However, due to limitations in SQLHeavy and its implementation of async, we can't use it (and SQLite's) transactional models. This patch introduces a rather hamhanded transactional model where the entire database is locked for each request using a NonblockingMutex. This is the safest approach, but certainly not an optimal one. When SQLHeavy's code is in place, hopefully we can rip this out with a minimum of fuss.
This commit is contained in:
parent
03d63ecc1a
commit
7e8edcb355
19 changed files with 547 additions and 219 deletions
|
|
@ -76,26 +76,33 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
check_open();
|
||||
|
||||
// TODO: This can be cached and updated when changes occur
|
||||
return yield location_table.fetch_count_for_folder_async(folder_row.id, cancellable);
|
||||
return yield location_table.fetch_count_for_folder_async(null, folder_row.id, cancellable);
|
||||
}
|
||||
|
||||
public override async void create_email_async(Geary.Email email, Cancellable? cancellable = null)
|
||||
throws Error {
|
||||
yield atomic_create_email_async(null, email, cancellable);
|
||||
}
|
||||
|
||||
private async void atomic_create_email_async(Transaction? supplied_transaction, Geary.Email email,
|
||||
Cancellable? cancellable) throws Error {
|
||||
check_open();
|
||||
|
||||
Geary.Imap.EmailIdentifier id = (Geary.Imap.EmailIdentifier) email.id;
|
||||
|
||||
Transaction transaction = supplied_transaction ?? yield db.begin_transaction_async(
|
||||
"Folder.atomic_create_email_async", cancellable);
|
||||
|
||||
// See if it already exists; first by UID (which is only guaranteed to be unique in a folder,
|
||||
// not account-wide)
|
||||
int64 message_id;
|
||||
if (yield location_table.does_ordering_exist_async(folder_row.id, email.location.ordering,
|
||||
out message_id, cancellable)) {
|
||||
if (yield location_table.does_ordering_exist_async(transaction, folder_row.id,
|
||||
email.location.ordering, out message_id, cancellable)) {
|
||||
throw new EngineError.ALREADY_EXISTS("Email with UID %s already exists in %s",
|
||||
id.uid.to_string(), to_string());
|
||||
}
|
||||
|
||||
// TODO: The following steps should be atomic
|
||||
message_id = yield message_table.create_async(
|
||||
message_id = yield message_table.create_async(transaction,
|
||||
new MessageRow.from_email(message_table, email),
|
||||
cancellable);
|
||||
|
||||
|
|
@ -103,7 +110,7 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
// (which fulfills the requirements for the ordering column)
|
||||
MessageLocationRow location_row = new MessageLocationRow(location_table, Row.INVALID_ID,
|
||||
message_id, folder_row.id, email.location.ordering, email.location.position);
|
||||
yield location_table.create_async(location_row, cancellable);
|
||||
yield location_table.create_async(transaction, location_row, cancellable);
|
||||
|
||||
// only write out the IMAP email properties if they're supplied and there's something to
|
||||
// write out -- no need to create an empty row
|
||||
|
|
@ -111,10 +118,17 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
if (email.fields.fulfills(Geary.Email.Field.PROPERTIES) && properties != null) {
|
||||
ImapMessagePropertiesRow properties_row = new ImapMessagePropertiesRow.from_imap_properties(
|
||||
imap_message_properties_table, message_id, properties);
|
||||
yield imap_message_properties_table.create_async(properties_row, cancellable);
|
||||
yield imap_message_properties_table.create_async(transaction, properties_row, cancellable);
|
||||
}
|
||||
|
||||
notify_messages_appended(yield get_email_count_async(cancellable));
|
||||
int count = yield location_table.fetch_count_for_folder_async(transaction, folder_row.id,
|
||||
cancellable);
|
||||
|
||||
// only commit if not supplied a transaction
|
||||
if (supplied_transaction == null)
|
||||
yield transaction.commit_async(cancellable);
|
||||
|
||||
notify_messages_appended(count);
|
||||
}
|
||||
|
||||
public override async Gee.List<Geary.Email>? list_email_async(int low, int count,
|
||||
|
|
@ -127,10 +141,13 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
if (count == 0)
|
||||
return null;
|
||||
|
||||
Gee.List<MessageLocationRow>? list = yield location_table.list_async(folder_row.id, low,
|
||||
count, cancellable);
|
||||
Transaction transaction = yield db.begin_transaction_async("Folder.list_email_async",
|
||||
cancellable);
|
||||
|
||||
return yield list_email(list, required_fields, cancellable);
|
||||
Gee.List<MessageLocationRow>? list = yield location_table.list_async(transaction,
|
||||
folder_row.id, low, count, cancellable);
|
||||
|
||||
return yield list_email(transaction, list, required_fields, cancellable);
|
||||
}
|
||||
|
||||
public override async Gee.List<Geary.Email>? list_email_sparse_async(int[] by_position,
|
||||
|
|
@ -138,24 +155,32 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
throws Error {
|
||||
check_open();
|
||||
|
||||
Gee.List<MessageLocationRow>? list = yield location_table.list_sparse_async(folder_row.id,
|
||||
by_position, cancellable);
|
||||
Transaction transaction = yield db.begin_transaction_async("Folder.list_email_sparse_async",
|
||||
cancellable);
|
||||
|
||||
return yield list_email(list, required_fields, cancellable);
|
||||
Gee.List<MessageLocationRow>? list = yield location_table.list_sparse_async(transaction,
|
||||
folder_row.id, by_position, cancellable);
|
||||
|
||||
return yield list_email(transaction, list, required_fields, cancellable);
|
||||
}
|
||||
|
||||
public async Gee.List<Geary.Email>? list_email_uid_async(Geary.Imap.UID? low, Geary.Imap.UID? high,
|
||||
Geary.Email.Field required_fields, Cancellable? cancellable = null) throws Error {
|
||||
check_open();
|
||||
|
||||
Gee.List<MessageLocationRow>? list = yield location_table.list_ordering_async(folder_row.id,
|
||||
(low != null) ? low.value : 1, (high != null) ? high.value : -1, cancellable);
|
||||
Transaction transaction = yield db.begin_transaction_async("Folder.list_email_uid_async",
|
||||
cancellable);
|
||||
|
||||
return yield list_email(list, required_fields, cancellable);
|
||||
Gee.List<MessageLocationRow>? list = yield location_table.list_ordering_async(transaction,
|
||||
folder_row.id,(low != null) ? low.value : 1, (high != null) ? high.value : -1,
|
||||
cancellable);
|
||||
|
||||
return yield list_email(transaction, list, required_fields, cancellable);
|
||||
}
|
||||
|
||||
private async Gee.List<Geary.Email>? list_email(Gee.List<MessageLocationRow>? list,
|
||||
Geary.Email.Field required_fields, Cancellable? cancellable) throws Error {
|
||||
private async Gee.List<Geary.Email>? list_email(Transaction transaction,
|
||||
Gee.List<MessageLocationRow>? list, Geary.Email.Field required_fields, Cancellable? cancellable)
|
||||
throws Error {
|
||||
check_open();
|
||||
|
||||
if (list == null || list.size == 0)
|
||||
|
|
@ -167,8 +192,8 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
Gee.List<Geary.Email> emails = new Gee.ArrayList<Geary.Email>();
|
||||
foreach (MessageLocationRow location_row in list) {
|
||||
// fetch the message itself
|
||||
MessageRow? message_row = yield message_table.fetch_async(location_row.message_id,
|
||||
required_fields, cancellable);
|
||||
MessageRow? message_row = yield message_table.fetch_async(transaction,
|
||||
location_row.message_id, required_fields, cancellable);
|
||||
assert(message_row != null);
|
||||
|
||||
// only add to the list if the email contains all the required fields (because
|
||||
|
|
@ -178,14 +203,14 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
|
||||
ImapMessagePropertiesRow? properties = null;
|
||||
if (required_fields.require(Geary.Email.Field.PROPERTIES)) {
|
||||
properties = yield imap_message_properties_table.fetch_async(location_row.message_id,
|
||||
cancellable);
|
||||
properties = yield imap_message_properties_table.fetch_async(transaction,
|
||||
location_row.message_id, cancellable);
|
||||
if (properties == null)
|
||||
continue;
|
||||
}
|
||||
|
||||
Geary.Imap.UID uid = new Geary.Imap.UID(location_row.ordering);
|
||||
int position = yield location_row.get_position_async(cancellable);
|
||||
int position = yield location_row.get_position_async(transaction, cancellable);
|
||||
if (position == -1) {
|
||||
debug("Unable to locate position of email during list of %s, dropping", to_string());
|
||||
|
||||
|
|
@ -210,15 +235,18 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
|
||||
Geary.Imap.UID uid = ((Imap.EmailIdentifier) id).uid;
|
||||
|
||||
MessageLocationRow? location_row = yield location_table.fetch_by_ordering_async(folder_row.id,
|
||||
uid.value, cancellable);
|
||||
Transaction transaction = yield db.begin_transaction_async("Folder.fetch_email_async",
|
||||
cancellable);
|
||||
|
||||
MessageLocationRow? location_row = yield location_table.fetch_by_ordering_async(transaction,
|
||||
folder_row.id, uid.value, cancellable);
|
||||
if (location_row == null) {
|
||||
throw new EngineError.NOT_FOUND("No message with ID %s in folder %s", id.to_string(),
|
||||
to_string());
|
||||
}
|
||||
|
||||
MessageRow? message_row = yield message_table.fetch_async(location_row.message_id,
|
||||
required_fields, cancellable);
|
||||
MessageRow? message_row = yield message_table.fetch_async(transaction,
|
||||
location_row.message_id, required_fields, cancellable);
|
||||
if (message_row == null) {
|
||||
throw new EngineError.NOT_FOUND("No message with ID %s in folder %s", id.to_string(),
|
||||
to_string());
|
||||
|
|
@ -234,8 +262,8 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
|
||||
ImapMessagePropertiesRow? properties = null;
|
||||
if (required_fields.require(Geary.Email.Field.PROPERTIES)) {
|
||||
properties = yield imap_message_properties_table.fetch_async(location_row.message_id,
|
||||
cancellable);
|
||||
properties = yield imap_message_properties_table.fetch_async(transaction,
|
||||
location_row.message_id, cancellable);
|
||||
if (properties == null) {
|
||||
throw new EngineError.INCOMPLETE_MESSAGE(
|
||||
"Message %s in folder %s does not have PROPERTIES field", id.to_string(),
|
||||
|
|
@ -243,7 +271,7 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
}
|
||||
}
|
||||
|
||||
int position = yield location_row.get_position_async(cancellable);
|
||||
int position = yield location_row.get_position_async(transaction, cancellable);
|
||||
if (position == -1) {
|
||||
throw new EngineError.NOT_FOUND("Unable to determine position of email %s in %s",
|
||||
id.to_string(), to_string());
|
||||
|
|
@ -259,7 +287,8 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
public async Geary.Imap.UID? get_earliest_uid_async(Cancellable? cancellable = null) throws Error {
|
||||
check_open();
|
||||
|
||||
int64 ordering = yield location_table.get_earliest_ordering_async(folder_row.id, cancellable);
|
||||
int64 ordering = yield location_table.get_earliest_ordering_async(null, folder_row.id,
|
||||
cancellable);
|
||||
|
||||
return (ordering >= 1) ? new Geary.Imap.UID(ordering) : null;
|
||||
}
|
||||
|
|
@ -268,19 +297,27 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
throws Error {
|
||||
check_open();
|
||||
|
||||
Transaction transaction = yield db.begin_transaction_async("Folder.remove_email_async",
|
||||
cancellable);
|
||||
|
||||
// TODO: Right now, deleting an email is merely detaching its association with a folder
|
||||
// (since it may be located in multiple folders). This means at some point in the future
|
||||
// a vacuum will be required to remove emails that are completely unassociated with the
|
||||
// account
|
||||
if (!yield location_table.remove_by_position_async(folder_row.id, position, cancellable)) {
|
||||
if (!yield location_table.remove_by_position_async(transaction, folder_row.id, position, cancellable)) {
|
||||
throw new EngineError.NOT_FOUND("Message #%d in local store of %s not found", position,
|
||||
to_string());
|
||||
}
|
||||
|
||||
notify_message_removed(position, yield get_email_count_async(cancellable));
|
||||
int count = yield location_table.fetch_count_for_folder_async(transaction, folder_row.id,
|
||||
cancellable);
|
||||
|
||||
yield transaction.commit_async(cancellable);
|
||||
|
||||
notify_message_removed(position, count);
|
||||
}
|
||||
|
||||
public async bool is_email_present(Geary.EmailIdentifier id, out Geary.Email.Field available_fields,
|
||||
public async bool is_email_present_async(Geary.EmailIdentifier id, out Geary.Email.Field available_fields,
|
||||
Cancellable? cancellable = null) throws Error {
|
||||
check_open();
|
||||
|
||||
|
|
@ -288,13 +325,16 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
|
||||
available_fields = Geary.Email.Field.NONE;
|
||||
|
||||
MessageLocationRow? location_row = yield location_table.fetch_by_ordering_async(folder_row.id,
|
||||
uid.value, cancellable);
|
||||
Transaction transaction = yield db.begin_transaction_async("Folder.is_email_present",
|
||||
cancellable);
|
||||
|
||||
MessageLocationRow? location_row = yield location_table.fetch_by_ordering_async(transaction,
|
||||
folder_row.id, uid.value, cancellable);
|
||||
if (location_row == null)
|
||||
return false;
|
||||
|
||||
return yield message_table.fetch_fields_async(location_row.message_id, out available_fields,
|
||||
cancellable);
|
||||
return yield message_table.fetch_fields_async(transaction, location_row.message_id,
|
||||
out available_fields, cancellable);
|
||||
}
|
||||
|
||||
public async bool is_email_associated_async(Geary.Email email, Cancellable? cancellable = null)
|
||||
|
|
@ -302,7 +342,7 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
check_open();
|
||||
|
||||
int64 message_id;
|
||||
return yield location_table.does_ordering_exist_async(folder_row.id,
|
||||
return yield location_table.does_ordering_exist_async(null, folder_row.id,
|
||||
((Geary.Imap.EmailIdentifier) email.id).uid.value, out message_id, cancellable);
|
||||
}
|
||||
|
||||
|
|
@ -313,10 +353,13 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
Geary.Imap.EmailLocation location = (Geary.Imap.EmailLocation) email.location;
|
||||
Geary.Imap.EmailIdentifier id = (Geary.Imap.EmailIdentifier) email.id;
|
||||
|
||||
Transaction transaction = yield db.begin_transaction_async("Folder.update_email_async",
|
||||
cancellable);
|
||||
|
||||
// See if the message can be identified in the folder (which both reveals association and
|
||||
// a message_id that can be used for a merge; note that this works without a Message-ID)
|
||||
int64 message_id;
|
||||
bool associated = yield location_table.does_ordering_exist_async(folder_row.id,
|
||||
bool associated = yield location_table.does_ordering_exist_async(transaction, folder_row.id,
|
||||
id.uid.value, out message_id, cancellable);
|
||||
|
||||
// If working around the lack of a Message-ID and not associated with this folder, treat
|
||||
|
|
@ -327,29 +370,35 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
if (!duplicate_okay)
|
||||
throw new EngineError.INCOMPLETE_MESSAGE("No Message-ID");
|
||||
|
||||
yield create_email_async(email, cancellable);
|
||||
yield atomic_create_email_async(transaction, email, cancellable);
|
||||
} else {
|
||||
yield merge_email_async(message_id, email, cancellable);
|
||||
yield merge_email_async(transaction, message_id, email, cancellable);
|
||||
}
|
||||
|
||||
yield transaction.commit_if_required_async(cancellable);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// If not associated, find message with matching Message-ID
|
||||
if (!associated) {
|
||||
Gee.List<int64?>? list = yield message_table.search_message_id_async(email.message_id,
|
||||
cancellable);
|
||||
Gee.List<int64?>? list = yield message_table.search_message_id_async(transaction,
|
||||
email.message_id, cancellable);
|
||||
|
||||
// If none found, this operation is a create
|
||||
if (list == null || list.size == 0) {
|
||||
yield create_email_async(email, cancellable);
|
||||
yield atomic_create_email_async(transaction, email, cancellable);
|
||||
|
||||
yield transaction.commit_if_required_async(cancellable);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
// Too many found turns this operation into a create
|
||||
if (list.size != 1) {
|
||||
yield create_email_async(email, cancellable);
|
||||
yield atomic_create_email_async(transaction, email, cancellable);
|
||||
|
||||
yield transaction.commit_if_required_async(cancellable);
|
||||
|
||||
return;
|
||||
}
|
||||
|
|
@ -361,8 +410,8 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
// TODO: Need to lock the database during this operation, as these steps should be atomic.
|
||||
if (!associated) {
|
||||
// see if an email exists at this position
|
||||
MessageLocationRow? location_row = yield location_table.fetch_async(folder_row.id,
|
||||
location.position);
|
||||
MessageLocationRow? location_row = yield location_table.fetch_async(transaction,
|
||||
folder_row.id, location.position, cancellable);
|
||||
if (location_row != null) {
|
||||
throw new EngineError.ALREADY_EXISTS("Email already exists at position %d in %s",
|
||||
email.location.position, to_string());
|
||||
|
|
@ -371,17 +420,18 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
// insert email at supplied position
|
||||
location_row = new MessageLocationRow(location_table, Row.INVALID_ID, message_id,
|
||||
folder_row.id, id.uid.value, location.position);
|
||||
yield location_table.create_async(location_row, cancellable);
|
||||
yield location_table.create_async(transaction, location_row, cancellable);
|
||||
}
|
||||
|
||||
// Merge any new information with the existing message in the local store
|
||||
yield merge_email_async(message_id, email, cancellable);
|
||||
yield merge_email_async(transaction, message_id, email, cancellable);
|
||||
|
||||
yield transaction.commit_if_required_async(cancellable);
|
||||
|
||||
// Done.
|
||||
}
|
||||
|
||||
// TODO: The database should be locked around this method, as it should be atomic.
|
||||
private async void merge_email_async(int64 message_id, Geary.Email email,
|
||||
private async void merge_email_async(Transaction transaction, int64 message_id, Geary.Email email,
|
||||
Cancellable? cancellable = null) throws Error {
|
||||
assert(message_id != Row.INVALID_ID);
|
||||
|
||||
|
|
@ -389,7 +439,7 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
if (email.fields == Geary.Email.Field.NONE)
|
||||
return;
|
||||
|
||||
MessageRow? message_row = yield message_table.fetch_async(message_id, email.fields,
|
||||
MessageRow? message_row = yield message_table.fetch_async(transaction, message_id, email.fields,
|
||||
cancellable);
|
||||
assert(message_row != null);
|
||||
|
||||
|
|
@ -397,7 +447,7 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
|
||||
// possible nothing has changed or been added
|
||||
if (message_row.fields != Geary.Email.Field.NONE)
|
||||
yield message_table.merge_async(message_row, cancellable);
|
||||
yield message_table.merge_async(transaction, message_row, cancellable);
|
||||
|
||||
// update IMAP properties
|
||||
if (email.fields.fulfills(Geary.Email.Field.PROPERTIES)) {
|
||||
|
|
@ -407,8 +457,8 @@ public class Geary.Sqlite.Folder : Geary.AbstractFolder, Geary.LocalFolder, Gear
|
|||
long rfc822_size =
|
||||
(properties.rfc822_size != null) ? properties.rfc822_size.value : -1;
|
||||
|
||||
yield imap_message_properties_table.update_async(message_id, properties.flags.serialize(),
|
||||
internaldate, rfc822_size, cancellable);
|
||||
yield imap_message_properties_table.update_async(transaction, message_id,
|
||||
properties.flags.serialize(), internaldate, rfc822_size, cancellable);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue