Tidy up and document lock classes in Geary.Nonblocking.

* src/engine/nonblocking/nonblocking-abstract-semaphore.vala: Renamed to
  nonblocking-lock.vala, rename class from AbstractSemaphore to Lock and
  update subclasses, since it is used as a basis for a number of
  different lock implementations. Make two getter methods into read-only
  properties. Fill out doc comments to be much more comprehensive.

* src/engine/nonblocking/nonblocking-mutex.vala (Mutex): Provide a
  high-level API and update documentation comments.

* src/engine/nonblocking/nonblocking-queue.vala (Queue): Add to doc
  comments.

* src/engine/nonblocking/nonblocking-variants.vala: Actually document
  how each of the variants behave.
This commit is contained in:
Michael James Gratton 2018-01-17 14:47:57 +11:00
parent cc29024501
commit 004b02a82d
9 changed files with 235 additions and 105 deletions

View file

@ -255,11 +255,11 @@ engine/mime/mime-disposition-type.vala
engine/mime/mime-error.vala
engine/mime/mime-multipart-subtype.vala
engine/nonblocking/nonblocking-abstract-semaphore.vala
engine/nonblocking/nonblocking-batch.vala
engine/nonblocking/nonblocking-concurrent.vala
engine/nonblocking/nonblocking-counting-semaphore.vala
engine/nonblocking/nonblocking-error.vala
engine/nonblocking/nonblocking-lock.vala
engine/nonblocking/nonblocking-mutex.vala
engine/nonblocking/nonblocking-queue.vala
engine/nonblocking/nonblocking-reporting-semaphore.vala

View file

@ -51,7 +51,7 @@ private abstract class Geary.ImapEngine.ReplayOperation : Geary.BaseObject, Gee.
public OnError on_remote_error { get; protected set; }
public int remote_retry_count { get; set; default = 0; }
public Error? err { get; private set; default = null; }
public bool notified { get { return semaphore.is_passed(); } }
public bool notified { get { return semaphore.can_pass; } }
private Nonblocking.Semaphore semaphore = new Nonblocking.Semaphore();
@ -152,7 +152,7 @@ private abstract class Geary.ImapEngine.ReplayOperation : Geary.BaseObject, Gee.
// Can only be called once
internal void notify_ready(Error? err) {
assert(!semaphore.is_passed());
assert(!semaphore.can_pass);
this.err = err;

View file

@ -252,11 +252,11 @@ geary_engine_vala_sources = files(
'mime/mime-error.vala',
'mime/mime-multipart-subtype.vala',
'nonblocking/nonblocking-abstract-semaphore.vala',
'nonblocking/nonblocking-batch.vala',
'nonblocking/nonblocking-concurrent.vala',
'nonblocking/nonblocking-counting-semaphore.vala',
'nonblocking/nonblocking-error.vala',
'nonblocking/nonblocking-lock.vala',
'nonblocking/nonblocking-mutex.vala',
'nonblocking/nonblocking-queue.vala',
'nonblocking/nonblocking-reporting-semaphore.vala',

View file

@ -170,7 +170,7 @@ public class Geary.Nonblocking.Batch : BaseObject {
*
* If the batch is executing or already executed, IOError.PENDING will be thrown. If the
* Cancellable is already cancelled, IOError.CANCELLED is thrown. Other errors may be thrown
* as well; see {@link AbstractSemaphore.wait_async}.
* as well; see {@link Lock.wait_async}.
*
* Batch will launch each BatchOperation in the order added. Depending on the BatchOperation,
* this does not guarantee that they'll complete in any particular order.

View file

@ -1,17 +1,22 @@
/* Copyright 2016 Software Freedom Conservancy Inc.
/*
* Copyright 2016 Software Freedom Conservancy Inc.
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* A nonblocking semaphore which allows for any number of tasks to run, but only signalling
* completion when all have finished.
* A counting, asynchronous semaphore.
*
* Unlike the other {@link AbstractSemaphore} variants, a task must {@link acquire} before it
* can {@link notify}. The number of acquired tasks is kept in the {@link count} property.
* Unlike the other {@link Lock} variants, a task must {@link acquire}
* before it can {@link notify}. The number of acquired tasks is kept
* in the {@link count} property. Waiting tasks are released only when
* the count returns to zero.
*
* This class is ''not'' thread safe and should only be used by
* asynchronous tasks.
*/
public class Geary.Nonblocking.CountingSemaphore : Geary.Nonblocking.AbstractSemaphore {
public class Geary.Nonblocking.CountingSemaphore : Geary.Nonblocking.Lock {
/**
* The number of tasks which have {@link acquire} the semaphore.
*/

View file

@ -1,170 +1,227 @@
/* Copyright 2016 Software Freedom Conservancy Inc.
/*
* Copyright 2016 Software Freedom Conservancy Inc.
* Copyright 2018 Michael Gratton <mike@vee.net>.
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
public abstract class Geary.Nonblocking.AbstractSemaphore : BaseObject {
/**
* A generic asynchronous lock data type.
*
* This class provides an asynchronous, queue-based lock
* implementation to allow implementing safe access to resources that
* are shared by asynchronous tasks. An asynchronous task may call
* {@link wait_async} to wait for the lock to be marked as safe to
* pass. Another asynchronous task may call {@link notify} to mark the
* lock as being safe, notifying waiting tasks. Once marked as being
* safe to pass, a lock may be reset to being unsafe by calling {@link
* reset}.
*
* See the specialised sub-classes for concrete implementations,
* which vary based on two features:
*
* //Broadcasting//: Whether all waiting tasks are notified when the
* lock may be passed, or just the next earliest waiting task.
*
* //Autoreset//: Whether the lock is automatically reset after
* notifying all waiting tasks, or if it must be manually reset by
* calling {@link reset}.
*
* This class is ''not'' thread safe and should only be used by
* asynchronous tasks.
*/
public abstract class Geary.Nonblocking.Lock : BaseObject {
private class Pending : BaseObject {
public unowned SourceFunc cb;
public Cancellable? cancellable;
public bool passed = false;
public bool scheduled = false;
public signal void cancelled();
public Pending(SourceFunc cb, Cancellable? cancellable) {
this.cb = cb;
this.cancellable = cancellable;
if (cancellable != null)
cancellable.cancelled.connect(on_cancelled);
}
~Pending() {
if (cancellable != null)
cancellable.cancelled.disconnect(on_cancelled);
}
private void on_cancelled() {
cancelled();
}
public void schedule(bool passed) {
assert(!scheduled);
this.passed = passed;
Scheduler.on_idle(cb);
scheduled = true;
}
}
/** Determines if this lock is marked as safe to pass. */
public bool can_pass { get { return this.passed; } }
/** Determines if this lock has been cancelled. */
public bool is_cancelled {
get {
return this.cancellable != null && this.cancellable.is_cancelled();
}
}
private bool broadcast;
private bool autoreset;
private Cancellable? cancellable;
private bool passed = false;
private Gee.List<Pending> pending_queue = new Gee.LinkedList<Pending>();
protected AbstractSemaphore(bool broadcast, bool autoreset, Cancellable? cancellable = null) {
protected Lock(bool broadcast, bool autoreset, Cancellable? cancellable = null) {
this.broadcast = broadcast;
this.autoreset = autoreset;
this.cancellable = cancellable;
if (cancellable != null)
cancellable.cancelled.connect(on_cancelled);
}
~AbstractSemaphore() {
~Lock() {
if (pending_queue.size > 0) {
warning("Nonblocking semaphore destroyed with %d pending callers", pending_queue.size);
warning("Nonblocking lock destroyed with %d pending callers", pending_queue.size);
foreach (Pending pending in pending_queue)
pending.cancelled.disconnect(on_pending_cancelled);
}
if (cancellable != null)
cancellable.cancelled.disconnect(on_cancelled);
}
private void trigger(bool all) {
if (pending_queue.size == 0)
return;
// in both cases, mark the Pending object(s) as passed in case this is an auto-reset
// semaphore
// in both cases, mark the Pending object(s) as passed in case
// this is an auto-reset lock
if (all) {
foreach (Pending pending in pending_queue)
pending.schedule(passed);
pending_queue.clear();
} else {
Pending pending = pending_queue.remove_at(0);
pending.schedule(passed);
}
}
/**
* Marks the lock as being safe to pass.
*
* Asynchronous tasks waiting on this lock via a call to {@link
* wait_async} are resumed when this method is called. If this
* lock is broadcasting then all pending tasks are released,
* otherwise only the first in the queue is released.
*
* @throws GLib.IOError.CANCELLED if either the lock is cancelled
* or the caller's `cancellable` argument is cancelled.
*/
public virtual new void notify() throws Error {
check_cancelled();
passed = true;
trigger(broadcast);
if (autoreset)
reset();
}
/**
* Calls notify() without throwing an Exception, which is merely logged if encountered.
* Calls {@link notify} without throwing an exception.
*
* If an error is thrown, it is logged but otherwise ignored.
*/
public void blind_notify() {
try {
notify();
} catch (Error err) {
message("Error notifying semaphore: %s", err.message);
message("Error notifying lock: %s", err.message);
}
}
/**
* Waits for the lock to be marked as being safe to pass.
*
* If the lock is already marked as being safe to pass, then this
* method will return immediately. If not, the call to this method
* will yield and not resume until the lock as been marked as safe
* by a call to {@link notify}.
*
* @throws GLib.IOError.CANCELLED if either the lock is cancelled or
* the caller's `cancellable` argument is cancelled.
*/
public virtual async void wait_async(Cancellable? cancellable = null) throws Error {
for (;;) {
check_user_cancelled(cancellable);
check_cancelled();
if (passed)
return;
Pending pending = new Pending(wait_async.callback, cancellable);
pending.cancelled.connect(on_pending_cancelled);
pending_queue.add(pending);
yield;
pending.cancelled.disconnect(on_pending_cancelled);
if (pending.passed) {
check_user_cancelled(cancellable);
return;
}
}
}
/**
* Marks this lock as being unsafe to pass.
*/
public virtual void reset() {
passed = false;
}
public bool is_passed() {
return passed;
}
public bool is_cancelled() {
return (cancellable != null) ? cancellable.is_cancelled() : false;
}
private void check_cancelled() throws Error {
if (is_cancelled())
throw new IOError.CANCELLED("Semaphore cancelled");
if (this.is_cancelled)
throw new IOError.CANCELLED("Lock was cancelled");
}
private static void check_user_cancelled(Cancellable? cancellable) throws Error {
if (cancellable != null && cancellable.is_cancelled())
throw new IOError.CANCELLED("User cancelled operation");
throw new IOError.CANCELLED("User cancelled lock operation");
}
private void on_pending_cancelled(Pending pending) {
// if already scheduled, the cancellation will be dealt with when they wake up
if (pending.scheduled)
return;
bool removed = pending_queue.remove(pending);
assert(removed);
Scheduler.on_idle(pending.cb);
}
private void on_cancelled() {
trigger(true);
}
}
}

View file

@ -1,41 +1,79 @@
/* Copyright 2016 Software Freedom Conservancy Inc.
/*
* Copyright 2016 Software Freedom Conservancy Inc.
* Copyright 2018 Michael Gratton <mike@vee.net>
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* A task primitive for creating critical sections inside of asynchronous code.
* A primitive for creating critical sections inside of asynchronous tasks.
*
* Like other primitives in {@link Nonblocking}, Mutex is ''not'' designed for a threaded
* environment.
* Two methods can be used for executing code protected by this
* mutex. The easiest is to create a {@link CriticalSection} delegate
* and pass it to {@link execute_locked}. This will manage acquiring
* the lock as needed. The lower-level method is to call {@link
* claim_async}, execute the critical section, then ensure {@link
* release} is always called afterwards.
*
* This class is ''not'' thread safe and should only be used by
* asynchronous tasks.
*/
public class Geary.Nonblocking.Mutex : BaseObject {
public const int INVALID_TOKEN = -1;
/** A delegate that can be executed by this lock. */
public delegate void CriticalSection() throws GLib.Error;
private Spinlock spinlock = new Spinlock();
private bool locked = false;
private int next_token = INVALID_TOKEN + 1;
private int locked_token = INVALID_TOKEN;
public Mutex() {
}
/**
* Returns true if the {@link Mutex} has been claimed by a task.
*/
public bool is_locked() {
return locked;
}
/**
* Claim (i.e. lock) the {@link Mutex} and begin execution inside a critical section.
* Executes a critical section while protected by this mutex.
*
* claim_async will block asynchronously waiting for the Mutex to be released, if it's already
* claimed.
* This high-level method takes care of claiming, executing, then
* releasing the mutex, without requiring the caller to manage any
* this.
*
* @return A token which must be used to {@link release} the Mutex.
* @throws GLib.IOError.CANCELLED thrown if the caller's
* cancellable is cancelled before execution is completed
* @throws GLib.Error if an error occurred during execution of
* //target//.
*/
public async void execute_locked(Mutex.CriticalSection target,
Cancellable? cancellable = null)
throws Error {
int token = yield claim_async(cancellable);
try {
target();
} finally {
try {
release(ref token);
} catch (Error err) {
debug("Mutex error releasing token: %s", err.message);
}
}
}
/**
* Locks the mutex for execution inside a critical section.
*
* If already claimed, this call will block asynchronously waiting
* for the mutex to be released.
*
* @return A token which must be passed to {@link release} when
* the critical section has completed executing.
*/
public async int claim_async(Cancellable? cancellable = null) throws Error {
for (;;) {
@ -44,31 +82,33 @@ public class Geary.Nonblocking.Mutex : BaseObject {
do {
locked_token = next_token++;
} while (locked_token == INVALID_TOKEN);
return locked_token;
}
yield spinlock.wait_async(cancellable);
}
}
/**
* Release (i.e. unlock) the {@link Mutex} and end execution inside a critical section.
* Releases the lock at the end of executing a critical section.
*
* The token returned by {@link claim_async} must be supplied as a parameter. It will be
* modified by this call so it can't be reused.
* The token returned by {@link claim_async} must be supplied as a
* parameter. It will be modified by this call so it can't be
* reused.
*
* Throws IOError.INVALID_ARGUMENT if the token was not the one returned by claim_async.
* Throws IOError.INVALID_ARGUMENT if the token was not the one
* returned by claim_async.
*/
public void release(ref int token) throws Error {
if (token != locked_token || token == INVALID_TOKEN)
throw new IOError.INVALID_ARGUMENT("Token %d is not the lock token", token);
locked = false;
token = INVALID_TOKEN;
locked_token = INVALID_TOKEN;
spinlock.notify();
}
}
}

View file

@ -11,7 +11,10 @@
*
* This class can be used to asynchronously wait for items to be added
* to the queue, the asynchronous call blocking until an item is
* ready.
* ready. Multiple asynchronous tasks can queue objects via {@link
* send}, and tasks can wait for items via {@link receive}. If there
* are multiple tasks waiting for items, the first to wait will
* receive the next item.
*/
public class Geary.Nonblocking.Queue<G> : BaseObject {

View file

@ -1,36 +1,61 @@
/* Copyright 2016 Software Freedom Conservancy Inc.
/*
* Copyright 2016 Software Freedom Conservancy Inc.
* Copyright 2018 Michael Gratton <mike@vee.net>.
*
* This software is licensed under the GNU Lesser General Public License
* (version 2.1 or later). See the COPYING file in this distribution.
*/
/**
* A Semaphore is a broadcasting, manually-resetting {@link AbstractSemaphore}.
* A broadcasting, manually-resetting asynchronous lock.
*
* This lock type will notify all waiting asynchronous tasks when
* marked as being safe to pass, and requires a call to {@link
* Lock.reset} to be marked as unsafe again.
*
* This class is ''not'' thread safe and should only be used by
* asynchronous tasks.
*
* @see Lock
*/
public class Geary.Nonblocking.Semaphore : Geary.Nonblocking.AbstractSemaphore {
public class Geary.Nonblocking.Semaphore : Geary.Nonblocking.Lock {
public Semaphore(Cancellable? cancellable = null) {
base (true, false, cancellable);
}
}
/**
* An Event is a broadcasting, auto-resetting {@link AbstractSemaphore}.
* A broadcasting, automatically-resetting asynchronous lock.
*
* This lock type will notify all waiting asynchronous tasks when
* marked as being safe to pass, and will automatically reset as being
* unsafe to pass after doing so.
*
* This class is ''not'' thread safe and should only be used by
* asynchronous tasks.
*
* @see Lock
*/
public class Geary.Nonblocking.Event : Geary.Nonblocking.AbstractSemaphore {
public class Geary.Nonblocking.Event : Geary.Nonblocking.Lock {
public Event(Cancellable? cancellable = null) {
base (true, true, cancellable);
}
}
/**
* A Spinlock is a single-notifying, auto-resetting {@link AbstractSemaphore}.
* A single-task-notifying, automatically-resetting asynchronous lock.
*
* This lock type will the first asynchronous task waiting when marked
* as being safe to pass, and will automatically reset as being unsafe
* to pass after doing so.
*
* This class is ''not'' thread safe and should only be used by
* asynchronous tasks.
*
* @see Lock
*/
public class Geary.Nonblocking.Spinlock : Geary.Nonblocking.AbstractSemaphore {
public class Geary.Nonblocking.Spinlock : Geary.Nonblocking.Lock {
public Spinlock(Cancellable? cancellable = null) {
base (false, true, cancellable);
}
}