Skip to content

Commit

Permalink
Merge pull request #248 from adam-poit/fix-kill-query
Browse files Browse the repository at this point in the history
Handle QueryInterrupted exceptions when canceling completed queries.
  • Loading branch information
bgrainger committed Apr 22, 2017
2 parents d9780e4 + 13cfd38 commit 4832251
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 2 deletions.
1 change: 1 addition & 0 deletions src/MySqlConnector/MySqlClient/MySqlDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ private void DoClose()
var connection = Command.Connection;
if (!connection.BufferResultSets)
connection.Session.FinishQuerying();

Command.ReaderClosed();
if ((m_behavior & CommandBehavior.CloseConnection) != 0)
{
Expand Down
28 changes: 26 additions & 2 deletions src/MySqlConnector/Serialization/MySqlSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,30 @@ public void SetActiveReader(MySqlDataReader dataReader)

public void FinishQuerying()
{
bool clearConnection = false;
lock (m_lock)
{
VerifyState(State.Querying, State.CancelingQuery);
if (m_state == State.CancelingQuery)
{
m_state = State.ClearingPendingCancellation;
clearConnection = true;
}
}

if (clearConnection)
{
// KILL QUERY will kill a subsequent query if the command it was intended to cancel has already completed.
// In order to handle this case, we issue a dummy query that will consume the pending cancellation.
// See https://bugs.mysql.com/bug.php?id=45679
var payload = new PayloadData(new ArraySegment<byte>(PayloadUtilities.CreateEofStringPayload(CommandKind.Query, "DO SLEEP(0);")));
SendAsync(payload, IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
payload = ReceiveReplyAsync(IOBehavior.Synchronous, CancellationToken.None).GetAwaiter().GetResult();
OkPayload.Create(payload);
}

lock (m_lock)
{
VerifyState(State.Querying, State.ClearingPendingCancellation);
m_state = State.Connected;
m_activeReader = null;
m_activeCommand = null;
Expand Down Expand Up @@ -311,7 +332,7 @@ private void VerifyConnected()
{
if (m_state == State.Closed)
throw new ObjectDisposedException(nameof(MySqlSession));
if (m_state != State.Connected && m_state != State.Querying && m_state != State.CancelingQuery && m_state != State.Closing)
if (m_state != State.Connected && m_state != State.Querying && m_state != State.CancelingQuery && m_state != State.ClearingPendingCancellation && m_state != State.Closing)
throw new InvalidOperationException("MySqlSession is not connected.");
}
}
Expand Down Expand Up @@ -667,6 +688,9 @@ private enum State
// The session is connected to a server and the active query is being cancelled.
CancelingQuery,

// A cancellation is pending on the server and needs to be cleared.
ClearingPendingCancellation,

// The session is closing.
Closing,

Expand Down
47 changes: 47 additions & 0 deletions tests/SideBySide/CancelTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,53 @@ public async Task CancelCommandWithTokenBeforeExecuteReader()
}
}

[Fact]
public async Task CancelCompletedCommand()
{
await m_database.Connection.ExecuteAsync(@"drop table if exists cancel_completed_command;
create table cancel_completed_command (
id bigint unsigned,
value varchar(45)
);").ConfigureAwait(false);

using (var cmd = m_database.Connection.CreateCommand())
{
cmd.CommandText = @"insert into cancel_completed_command (id, value) values (1, null);";

using (await cmd.ExecuteReaderAsync().ConfigureAwait(false))
cmd.Cancel();
}

using (var cmd = m_database.Connection.CreateCommand())
{
cmd.CommandText = @"update cancel_completed_command SET value = ""value"" where id = 1;";

await cmd.ExecuteNonQueryAsync().ConfigureAwait(false);
}

using (var cmd = m_database.Connection.CreateCommand())
{
cmd.CommandText = "select value from cancel_completed_command where id = 1;";
var value = (string) await cmd.ExecuteScalarAsync();
Assert.Equal("value", value);
}
}

[Fact]
public void ImplicitCancelWithDapper()
{
m_database.Connection.Execute(@"drop table if exists cancel_completed_command;
create table cancel_completed_command(id integer not null primary key, value text null);");

// a query that returns 0 fields will cause Dapper to cancel the command
m_database.Connection.Query<int>("insert into cancel_completed_command(id, value) values (1, null);");

m_database.Connection.Execute("update cancel_completed_command set value = 'value' where id = 1;");

var value = m_database.Connection.Query<string>(@"select value from cancel_completed_command where id = 1").FirstOrDefault();
Assert.Equal("value", value);
}

[UnbufferedResultSetsFact]
public async Task CancelHugeQueryWithTokenAfterExecuteReader()
{
Expand Down

0 comments on commit 4832251

Please sign in to comment.