Skip to content

Commit

Permalink
Cache and reuse ColumnDefinitionPayload objects.
Browse files Browse the repository at this point in the history
Also reuse the array of payloads created by ResultSet. ResultSet was already caching the byte array from which the column definitions were deserialized; it now caches the deserialized objects, too. This memory is reused when the MySqlDataReader/Connection/session is returned to the pool then retrieved to execute a second query.

If a result set retrieves a huge number of columns, a large amount of memory may be kept alive in the pool until the session is expired. This should hopefully be a rare occurrence but is also offset by the benefit of reusing that memory if very wide queries are being executed frequently.
  • Loading branch information
bgrainger committed Aug 16, 2023
1 parent 5ade1ae commit 0e277c9
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 55 deletions.
53 changes: 31 additions & 22 deletions src/MySqlConnector/Core/ResultSet.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public void Reset()
{
// ResultSet can be re-used, so initialize everything
BufferState = ResultSetState.None;
ColumnDefinitions = null;
m_columnDefinitions = default;
WarningCount = 0;
State = ResultSetState.None;
ContainsCommandParameters = false;
Expand Down Expand Up @@ -55,7 +55,7 @@ public async Task ReadResultSetHeaderAsync(IOBehavior ioBehavior)
WarningCount = ok.WarningCount;
if (ok.NewSchema is not null)
Connection.Session.DatabaseOverride = ok.NewSchema;
ColumnDefinitions = null;
m_columnDefinitions = default;
State = (ok.ServerStatus & ServerStatus.MoreResultsExist) == 0
? ResultSetState.NoMoreData
: ResultSetState.HasMoreData;
Expand Down Expand Up @@ -115,38 +115,44 @@ public async Task ReadResultSetHeaderAsync(IOBehavior ioBehavior)
else
{
var columnCountPacket = ColumnCountPayload.Create(payload.Span, Session.SupportsCachedPreparedMetadata);
var columnCount = columnCountPacket.ColumnCount;
if (!columnCountPacket.MetadataFollows)
{
// reuse previous metadata
ColumnDefinitions = DataReader.LastUsedPreparedStatement!.Columns!;
if (ColumnDefinitions.Length != columnCountPacket.ColumnCount)
throw new InvalidOperationException($"Expected result set to have {ColumnDefinitions.Length} columns, but it contains {columnCountPacket.ColumnCount} columns");
m_columnDefinitions = DataReader.LastUsedPreparedStatement!.Columns!;
if (m_columnDefinitions.Length != columnCount)
throw new InvalidOperationException($"Expected result set to have {m_columnDefinitions.Length} columns, but it contains {columnCount} columns");
}
else
{
// parse columns
// reserve adequate space to hold a copy of all column definitions (but note that this can be resized below if we guess too small)
Utility.Resize(ref m_columnDefinitionPayloads, columnCountPacket.ColumnCount * 96);
Utility.Resize(ref m_columnDefinitionPayloadBytes, columnCount * 96);

ColumnDefinitions = new ColumnDefinitionPayload[columnCountPacket.ColumnCount];
for (var column = 0; column < ColumnDefinitions.Length; column++)
// increase the cache size to be large enough to hold all the column definitions
if (m_columnDefinitionPayloadCache is null)
m_columnDefinitionPayloadCache = new ColumnDefinitionPayload[columnCount];
else if (m_columnDefinitionPayloadCache.Length < columnCount)
Array.Resize(ref m_columnDefinitionPayloadCache, Math.Max(columnCount, m_columnDefinitionPayloadCache.Length * 2));
m_columnDefinitions = m_columnDefinitionPayloadCache.AsMemory(0, columnCount);

for (var column = 0; column < columnCount; column++)
{
payload = await Session.ReceiveReplyAsync(ioBehavior, CancellationToken.None).ConfigureAwait(false);
var payloadLength = payload.Span.Length;

// 'Session.ReceiveReplyAsync' reuses a shared buffer; make a copy so that the column definitions can always be safely read at any future point
if (m_columnDefinitionPayloadUsedBytes + payloadLength > m_columnDefinitionPayloads.Count)
Utility.Resize(ref m_columnDefinitionPayloads, m_columnDefinitionPayloadUsedBytes + payloadLength);
payload.Span.CopyTo(m_columnDefinitionPayloads.AsSpan(m_columnDefinitionPayloadUsedBytes));
if (m_columnDefinitionPayloadUsedBytes + payloadLength > m_columnDefinitionPayloadBytes.Count)
Utility.Resize(ref m_columnDefinitionPayloadBytes, m_columnDefinitionPayloadUsedBytes + payloadLength);
payload.Span.CopyTo(m_columnDefinitionPayloadBytes.AsSpan(m_columnDefinitionPayloadUsedBytes));

var columnDefinition = ColumnDefinitionPayload.Create(new ResizableArraySegment<byte>(m_columnDefinitionPayloads, m_columnDefinitionPayloadUsedBytes, payloadLength));
ColumnDefinitions[column] = columnDefinition;
ColumnDefinitionPayload.Create(ref m_columnDefinitionPayloadCache[column], new ResizableArraySegment<byte>(m_columnDefinitionPayloadBytes, m_columnDefinitionPayloadUsedBytes, payloadLength));
m_columnDefinitionPayloadUsedBytes += payloadLength;
}

// server supports metadata caching, but has re-sent it, so something has changed since last prepare/execution
if (Session.SupportsCachedPreparedMetadata && DataReader.LastUsedPreparedStatement is { } preparedStatement)
preparedStatement.Columns = ColumnDefinitions;
preparedStatement.Columns = ColumnDefinitions.ToArray();
}

if (!Session.SupportsDeprecateEof)
Expand Down Expand Up @@ -277,7 +283,7 @@ public async Task<bool> ReadAsync(IOBehavior ioBehavior, CancellationToken cance

public string GetName(int ordinal)
{
if (ColumnDefinitions is null)
if (!HasResultSet)
throw new InvalidOperationException("There is no current result set.");
if (ordinal < 0 || ordinal >= ColumnDefinitions.Length)
throw new IndexOutOfRangeException($"value must be between 0 and {ColumnDefinitions.Length - 1}");
Expand All @@ -286,7 +292,7 @@ public string GetName(int ordinal)

public string GetDataTypeName(int ordinal)
{
if (ColumnDefinitions is null)
if (!HasResultSet)
throw new InvalidOperationException("There is no current result set.");
if (ordinal < 0 || ordinal >= ColumnDefinitions.Length)
throw new IndexOutOfRangeException($"value must be between 0 and {ColumnDefinitions.Length - 1}");
Expand All @@ -299,7 +305,7 @@ public string GetDataTypeName(int ordinal)

public Type GetFieldType(int ordinal)
{
if (ColumnDefinitions is null)
if (!HasResultSet)
throw new InvalidOperationException("There is no current result set.");
if (ordinal < 0 || ordinal >= ColumnDefinitions.Length)
throw new IndexOutOfRangeException($"value must be between 0 and {ColumnDefinitions.Length - 1}");
Expand All @@ -311,9 +317,9 @@ public Type GetFieldType(int ordinal)
}

public MySqlDbType GetColumnType(int ordinal) =>
TypeMapper.ConvertToMySqlDbType(ColumnDefinitions![ordinal], Connection.TreatTinyAsBoolean, Connection.GuidFormat);
TypeMapper.ConvertToMySqlDbType(ColumnDefinitions[ordinal], Connection.TreatTinyAsBoolean, Connection.GuidFormat);

public int FieldCount => ColumnDefinitions?.Length ?? 0;
public int FieldCount => ColumnDefinitions.Length;

public bool HasRows
{
Expand All @@ -329,7 +335,7 @@ public int GetOrdinal(string name)
{
if (name is null)
throw new ArgumentNullException(nameof(name));
if (ColumnDefinitions is null)
if (!HasResultSet)
throw new InvalidOperationException("There is no current result set.");

for (var column = 0; column < ColumnDefinitions.Length; column++)
Expand All @@ -355,14 +361,17 @@ public Row GetCurrentRow()
public ServerSession Session => DataReader.Session!;

public ResultSetState BufferState { get; private set; }
public ColumnDefinitionPayload[]? ColumnDefinitions { get; private set; }
public ReadOnlySpan<ColumnDefinitionPayload> ColumnDefinitions => m_columnDefinitions.Span;
public int WarningCount { get; private set; }
public ResultSetState State { get; private set; }
public bool HasResultSet => !(State == ResultSetState.None || ColumnDefinitions.Length == 0);
public bool ContainsCommandParameters { get; private set; }

private ResizableArray<byte>? m_columnDefinitionPayloads;
private ResizableArray<byte>? m_columnDefinitionPayloadBytes;
private int m_columnDefinitionPayloadUsedBytes;
private Queue<Row>? m_readBuffer;
private Row? m_row;
private bool m_hasRows;
private ReadOnlyMemory<ColumnDefinitionPayload> m_columnDefinitions;
private ColumnDefinitionPayload[]? m_columnDefinitionPayloadCache;
}
4 changes: 2 additions & 2 deletions src/MySqlConnector/Core/ServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ public async Task PrepareAsync(IMySqlCommand command, IOBehavior ioBehavior, Can
var payloadLength = payload.Span.Length;
Utility.Resize(ref columnsAndParameters, columnsAndParametersSize + payloadLength);
payload.Span.CopyTo(columnsAndParameters.AsSpan(columnsAndParametersSize));
parameters[i] = ColumnDefinitionPayload.Create(new(columnsAndParameters, columnsAndParametersSize, payloadLength));
ColumnDefinitionPayload.Create(ref parameters[i], new(columnsAndParameters, columnsAndParametersSize, payloadLength));
columnsAndParametersSize += payloadLength;
}
if (!SupportsDeprecateEof)
Expand All @@ -254,7 +254,7 @@ public async Task PrepareAsync(IMySqlCommand command, IOBehavior ioBehavior, Can
var payloadLength = payload.Span.Length;
Utility.Resize(ref columnsAndParameters, columnsAndParametersSize + payloadLength);
payload.Span.CopyTo(columnsAndParameters.AsSpan(columnsAndParametersSize));
columns[i] = ColumnDefinitionPayload.Create(new(columnsAndParameters, columnsAndParametersSize, payloadLength));
ColumnDefinitionPayload.Create(ref columns[i], new(columnsAndParameters, columnsAndParametersSize, payloadLength));
columnsAndParametersSize += payloadLength;
}
if (!SupportsDeprecateEof)
Expand Down
24 changes: 14 additions & 10 deletions src/MySqlConnector/MySqlDataReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,19 @@ public override long GetChars(int ordinal, long dataOffset, char[]? buffer, int
/// <returns>A <see cref="System.Collections.ObjectModel.ReadOnlyCollection{DbColumn}"/> containing metadata about the result set.</returns>
public ReadOnlyCollection<DbColumn> GetColumnSchema()
{
var hasNoSchema = !m_resultSet.HasResultSet || m_resultSet.ContainsCommandParameters;
var columnDefinitions = m_resultSet.ColumnDefinitions;
var hasNoSchema = columnDefinitions is null || m_resultSet.ContainsCommandParameters;
return hasNoSchema ? new List<DbColumn>().AsReadOnly() :
columnDefinitions!
.Select((c, n) => (DbColumn) new MySqlDbColumn(n, c, Connection!.AllowZeroDateTime, GetResultSet().GetColumnType(n)))
.ToList().AsReadOnly();
var schema = new List<DbColumn>(columnDefinitions.Length);
if (!hasNoSchema)
{
for (var n = 0; n < columnDefinitions.Length; n++)
{
var columnDefinition = columnDefinitions[n];
var column = new MySqlDbColumn(n, columnDefinition, Connection!.AllowZeroDateTime, GetResultSet().GetColumnType(n));
schema.Add(column);
}
}
return schema.AsReadOnly();
}

/// <summary>
Expand Down Expand Up @@ -459,8 +466,6 @@ internal async Task InitAsync(CommandListPosition commandListPosition, ICommandP
throw new InvalidOperationException("Expected m_hasMoreResults to be false");
if (m_resultSet.BufferState != ResultSetState.None || m_resultSet.State != ResultSetState.None)
throw new InvalidOperationException("Expected BufferState and State to be ResultSetState.None.");
if (m_resultSet.ColumnDefinitions is not null)
throw new InvalidOperationException("Expected ColumnDefinitions to be null");
m_closed = false;
m_hasWarnings = false;
RealRecordsAffected = null;
Expand Down Expand Up @@ -504,12 +509,11 @@ internal async Task InitAsync(CommandListPosition commandListPosition, ICommandP

internal DataTable? BuildSchemaTable()
{
var columnDefinitions = m_resultSet.ColumnDefinitions;
if (columnDefinitions is null || m_resultSet.ContainsCommandParameters)
if (!m_resultSet.HasResultSet || m_resultSet.ContainsCommandParameters)
return null;

var schemaTable = new DataTable("SchemaTable") { Locale = CultureInfo.InvariantCulture };
schemaTable.MinimumCapacity = columnDefinitions.Length;
schemaTable.MinimumCapacity = m_resultSet.ColumnDefinitions.Length;

var columnName = new DataColumn(SchemaTableColumn.ColumnName, typeof(string));
var ordinal = new DataColumn(SchemaTableColumn.ColumnOrdinal, typeof(int));
Expand Down
52 changes: 31 additions & 21 deletions src/MySqlConnector/Protocol/Payloads/ColumnDefinitionPayload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@ public string Name
}
}

public CharacterSet CharacterSet { get; }
public CharacterSet CharacterSet { get; private set; }

public uint ColumnLength { get; }
public uint ColumnLength { get; private set; }

public ColumnType ColumnType { get; }
public ColumnType ColumnType { get; private set; }

public ColumnFlags ColumnFlags { get; }
public ColumnFlags ColumnFlags { get; private set; }

public string SchemaName
{
Expand Down Expand Up @@ -74,27 +74,43 @@ public string PhysicalName
}
}

public byte Decimals { get; }
public byte Decimals { get; private set; }

public static ColumnDefinitionPayload Create(ResizableArraySegment<byte> arraySegment)
public static void Create(ref ColumnDefinitionPayload payload, ResizableArraySegment<byte> arraySegment)
{
var reader = new ByteArrayReader(arraySegment);
payload ??= new ColumnDefinitionPayload();
payload.Init(arraySegment);
}

private void Init(ResizableArraySegment<byte> originalData)
{
OriginalData = originalData;
var reader = new ByteArrayReader(originalData);
SkipLengthEncodedByteString(ref reader); // catalog
SkipLengthEncodedByteString(ref reader); // schema
SkipLengthEncodedByteString(ref reader); // table
SkipLengthEncodedByteString(ref reader); // physical table
SkipLengthEncodedByteString(ref reader); // name
SkipLengthEncodedByteString(ref reader); // physical name
reader.ReadByte(0x0C); // length of fixed-length fields, always 0x0C
var characterSet = (CharacterSet) reader.ReadUInt16();
var columnLength = reader.ReadUInt32();
var columnType = (ColumnType) reader.ReadByte();
var columnFlags = (ColumnFlags) reader.ReadUInt16();
var decimals = reader.ReadByte(); // 0x00 for integers and static strings, 0x1f for dynamic strings, double, float, 0x00 to 0x51 for decimals
CharacterSet = (CharacterSet) reader.ReadUInt16();
ColumnLength = reader.ReadUInt32();
ColumnType = (ColumnType) reader.ReadByte();
ColumnFlags = (ColumnFlags) reader.ReadUInt16();
Decimals = reader.ReadByte(); // 0x00 for integers and static strings, 0x1f for dynamic strings, double, float, 0x00 to 0x51 for decimals
reader.ReadByte(0); // reserved byte 1
reader.ReadByte(0); // reserved byte 2

return new ColumnDefinitionPayload(arraySegment, characterSet, columnLength, columnType, columnFlags, decimals);
if (m_readNames)
{
m_catalogName = null;
m_schemaName = null;
m_table = null;
m_physicalTable = null;
m_name = null;
m_physicalName = null;
m_readNames = false;
}
}

private static void SkipLengthEncodedByteString(ref ByteArrayReader reader)
Expand All @@ -103,14 +119,8 @@ private static void SkipLengthEncodedByteString(ref ByteArrayReader reader)
reader.Offset += length;
}

private ColumnDefinitionPayload(ResizableArraySegment<byte> originalData, CharacterSet characterSet, uint columnLength, ColumnType columnType, ColumnFlags columnFlags, byte decimals)
private ColumnDefinitionPayload()
{
OriginalData = originalData;
CharacterSet = characterSet;
ColumnLength = columnLength;
ColumnType = columnType;
ColumnFlags = columnFlags;
Decimals = decimals;
}

private void ReadNames()
Expand All @@ -125,7 +135,7 @@ private void ReadNames()
m_readNames = true;
}

private ResizableArraySegment<byte> OriginalData { get; }
private ResizableArraySegment<byte> OriginalData { get; set; }

private bool m_readNames;
private string? m_name;
Expand Down

0 comments on commit 0e277c9

Please sign in to comment.