Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace synchronous locks with async #957

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 54 additions & 29 deletions src/SQLiteAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
Expand Down Expand Up @@ -203,7 +204,7 @@ public SQLiteConnectionWithLock GetConnection ()
return SQLiteConnectionPool.Shared.GetConnection (_connectionString);
}

SQLiteConnectionWithLock GetConnectionAndTransactionLock (out object transactionLock)
SQLiteConnectionWithLock GetConnectionAndTransactionLock (out SemaphoreSlim transactionLock)
{
return SQLiteConnectionPool.Shared.GetConnectionAndTransactionLock (_connectionString, out transactionLock);
}
Expand All @@ -220,34 +221,38 @@ public Task CloseAsync ()

Task<T> ReadAsync<T> (Func<SQLiteConnectionWithLock, T> read)
{
return Task.Factory.StartNew (() => {
return Task.Run<T> (async () => {
var conn = GetConnection ();
using (conn.Lock ()) {
using (await conn.LockAsync ().ConfigureAwait (false)) {
return read (conn);
}
}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
});
}

Task<T> WriteAsync<T> (Func<SQLiteConnectionWithLock, T> write)
{
return Task.Factory.StartNew (() => {
return Task.Run<T> (async () => {
var conn = GetConnection ();
using (conn.Lock ()) {
using (await conn.LockAsync ().ConfigureAwait (false)) {
return write (conn);
}
}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
});
}

Task<T> TransactAsync<T> (Func<SQLiteConnectionWithLock, T> transact)
{
return Task.Factory.StartNew (() => {
return Task.Run<T> (async () => {
var conn = GetConnectionAndTransactionLock (out var transactionLock);
lock (transactionLock) {
using (conn.Lock ()) {
await transactionLock.WaitAsync ().ConfigureAwait (false);
try {
using (await conn.LockAsync ().ConfigureAwait (false)) {
return transact (conn);
}
}
}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
finally {
transactionLock.Release ();
}
});
}

/// <summary>
Expand Down Expand Up @@ -1191,22 +1196,22 @@ public AsyncTableQuery (TableQuery<T> innerQuery)

Task<U> ReadAsync<U> (Func<SQLiteConnectionWithLock, U> read)
{
return Task.Factory.StartNew (() => {
return Task.Run<U> (async () => {
var conn = (SQLiteConnectionWithLock)_innerQuery.Connection;
using (conn.Lock ()) {
using (await conn.LockAsync ().ConfigureAwait (false)) {
return read (conn);
}
}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
});
}

Task<U> WriteAsync<U> (Func<SQLiteConnectionWithLock, U> write)
{
return Task.Factory.StartNew (() => {
return Task.Run<U> (async () => {
var conn = (SQLiteConnectionWithLock)_innerQuery.Connection;
using (conn.Lock ()) {
using (await conn.LockAsync ().ConfigureAwait (false)) {
return write (conn);
}
}, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
});
}

/// <summary>
Expand Down Expand Up @@ -1362,7 +1367,7 @@ class Entry

public SQLiteConnectionString ConnectionString { get; }

public object TransactionLock { get; } = new object ();
public SemaphoreSlim TransactionLock { get; } = new SemaphoreSlim (1);

public Entry (SQLiteConnectionString connectionString)
{
Expand All @@ -1386,7 +1391,7 @@ public void Close ()
}

readonly Dictionary<string, Entry> _entries = new Dictionary<string, Entry> ();
readonly object _entriesLock = new object ();
readonly object _entriesLock = new object();

static readonly SQLiteConnectionPool _shared = new SQLiteConnectionPool ();

Expand All @@ -1404,11 +1409,11 @@ public SQLiteConnectionWithLock GetConnection (SQLiteConnectionString connection
return GetConnectionAndTransactionLock (connectionString, out var _);
}

public SQLiteConnectionWithLock GetConnectionAndTransactionLock (SQLiteConnectionString connectionString, out object transactionLock)
public SQLiteConnectionWithLock GetConnectionAndTransactionLock (SQLiteConnectionString connectionString, out SemaphoreSlim transactionLock)
{
var key = connectionString.UniqueKey;
Entry entry;
lock (_entriesLock) {
var key = connectionString.UniqueKey;
Entry entry;
if (!_entries.TryGetValue (key, out entry)) {
// The opens the database while we're locked
// This is to ensure another thread doesn't get an unopened database
Expand Down Expand Up @@ -1456,7 +1461,8 @@ public void Reset ()
/// </summary>
public class SQLiteConnectionWithLock : SQLiteConnection
{
readonly object _lockPoint = new object ();
readonly SemaphoreSlim _asyncLock =
new SemaphoreSlim (1);

/// <summary>
/// Initializes a new instance of the <see cref="T:SQLite.SQLiteConnectionWithLock"/> class.
Expand All @@ -1478,24 +1484,43 @@ public SQLiteConnectionWithLock (SQLiteConnectionString connectionString)
/// on the returned object.
/// </summary>
/// <returns>The lock.</returns>
[Obsolete("Please use LockAsync to prevent threadpool starvation")]
public IDisposable Lock ()
{
return SkipLock ? (IDisposable)new FakeLockWrapper() : new LockWrapper (_lockPoint);
return LockAsync ().Result;
}

/// <summary>
/// Asynchronously lock the database to serialize access to it.
/// To unlock it, call Dispose on the returned object.
/// </summary>
/// <returns>The lock.</returns>
public Task<IDisposable> LockAsync ()
{
return SkipLock ?
Task.FromResult ((IDisposable)new FakeLockWrapper ()) :
LockWrapper.CreateAsync (_asyncLock);
}

class LockWrapper : IDisposable
{
object _lockPoint;
SemaphoreSlim asyncLock;

LockWrapper (SemaphoreSlim asyncLock)
{
this.asyncLock = asyncLock;
}

public LockWrapper (object lockPoint)
public static async Task<IDisposable> CreateAsync (SemaphoreSlim asyncLock)
{
_lockPoint = lockPoint;
Monitor.Enter (_lockPoint);
var wrapper = new LockWrapper (asyncLock);
await asyncLock.WaitAsync ().ConfigureAwait (false);
return wrapper;
}

public void Dispose ()
{
Monitor.Exit (_lockPoint);
asyncLock.Release ();
}
}
class FakeLockWrapper : IDisposable
Expand Down