Skip to content

Commit

Permalink
Implement core code to support multiple compression methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrainger committed Oct 5, 2024
1 parent 8107efe commit f74ce0b
Show file tree
Hide file tree
Showing 9 changed files with 66 additions and 16 deletions.
8 changes: 8 additions & 0 deletions src/MySqlConnector/Core/CompressionMethod.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
namespace MySqlConnector.Core;

internal enum CompressionMethod
{
None,
Zlib,
Zstandard,
}
18 changes: 11 additions & 7 deletions src/MySqlConnector/Core/ServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,9 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
ServerVersion = new(initialHandshake.ServerVersion);
ConnectionId = initialHandshake.ConnectionId;
AuthPluginData = initialHandshake.AuthPluginData;
m_useCompression = cs.UseCompression && (initialHandshake.ProtocolCapabilities & ProtocolCapabilities.Compress) != 0;
m_compressionMethod = !cs.UseCompression ? CompressionMethod.None :
((initialHandshake.ProtocolCapabilities & ProtocolCapabilities.ZstandardCompressionAlgorithm) != 0 && connection.ZstandardPlugin is not null) ? CompressionMethod.Zstandard :
((initialHandshake.ProtocolCapabilities & ProtocolCapabilities.Compress) != 0) ? CompressionMethod.Zlib : CompressionMethod.None;
CancellationTimeout = cs.CancellationTimeout;
UserID = cs.UserID;

Expand Down Expand Up @@ -483,7 +485,7 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
else
{
// pipelining is not currently compatible with compression
m_supportsPipelining = !cs.UseCompression && cs.Pipelining is not false;
m_supportsPipelining = m_compressionMethod == CompressionMethod.None && cs.Pipelining is not false;

// for pipelining, concatenate reset connection and SET NAMES query into one buffer
if (m_supportsPipelining)
Expand All @@ -500,7 +502,7 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
}
}

Log.SessionMadeConnection(m_logger, Id, ServerVersion.OriginalString, ConnectionId, m_useCompression, m_supportsConnectionAttributes, SupportsDeprecateEof, SupportsCachedPreparedMetadata, serverSupportsSsl, SupportsSessionTrack, m_supportsPipelining, SupportsQueryAttributes);
Log.SessionMadeConnection(m_logger, Id, ServerVersion.OriginalString, ConnectionId, m_compressionMethod != CompressionMethod.None, m_supportsConnectionAttributes, SupportsDeprecateEof, SupportsCachedPreparedMetadata, serverSupportsSsl, SupportsSessionTrack, m_supportsPipelining, SupportsQueryAttributes);

if (cs.SslMode != MySqlSslMode.None && (cs.SslMode != MySqlSslMode.Preferred || serverSupportsSsl))
{
Expand All @@ -517,7 +519,7 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella
cs.ConnectionAttributes = CreateConnectionAttributes(cs.ApplicationName);

var password = GetPassword(cs, connection);
using (var handshakeResponsePayload = HandshakeResponse41Payload.Create(initialHandshake, cs, password, m_useCompression, m_characterSet, m_supportsConnectionAttributes ? cs.ConnectionAttributes : null))
using (var handshakeResponsePayload = HandshakeResponse41Payload.Create(initialHandshake, cs, password, m_compressionMethod, connection.ZstandardPlugin?.CompressionLevel, m_characterSet, m_supportsConnectionAttributes ? cs.ConnectionAttributes : null))
await SendReplyAsync(handshakeResponsePayload, ioBehavior, cancellationToken).ConfigureAwait(false);
payload = await ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);

Expand Down Expand Up @@ -570,8 +572,10 @@ public async Task DisposeAsync(IOBehavior ioBehavior, CancellationToken cancella

var redirectionUrl = ok.RedirectionUrl;

if (m_useCompression)
if (m_compressionMethod == CompressionMethod.Zlib)
m_payloadHandler = new CompressedPayloadHandler(m_payloadHandler.ByteHandler);
else if (m_compressionMethod == CompressionMethod.Zstandard)
m_payloadHandler = connection.ZstandardPlugin!.CreatePayloadHandler(m_payloadHandler.ByteHandler);

// send 'SET NAMES' to set the character set and collation unless the server reports that it's already using the desired character set (e.g., MariaDB >= 11.5)
if (ok.NewCharacterSet != (ServerVersion.Version >= ServerVersions.SupportsUtf8Mb4 ? CharacterSet.Utf8Mb4Binary : CharacterSet.Utf8Mb3Binary))
Expand Down Expand Up @@ -1632,7 +1636,7 @@ caCertificateChain is not null &&

var checkCertificateRevocation = cs.SslMode == MySqlSslMode.VerifyFull;

using (var initSsl = HandshakeResponse41Payload.CreateWithSsl(serverCapabilities, cs, m_useCompression, m_characterSet))
using (var initSsl = HandshakeResponse41Payload.CreateWithSsl(serverCapabilities, cs, m_compressionMethod, m_characterSet))
await SendReplyAsync(initSsl, ioBehavior, cancellationToken).ConfigureAwait(false);

var clientAuthenticationOptions = new SslClientAuthenticationOptions
Expand Down Expand Up @@ -2129,7 +2133,7 @@ protected override void OnStatementBegin(int index)
private SslStream? m_sslStream;
private X509Certificate2? m_clientCertificate;
private IPayloadHandler? m_payloadHandler;
private bool m_useCompression;
private CompressionMethod m_compressionMethod;
private bool m_isSecureConnection;
private bool m_supportsConnectionAttributes;
private bool m_supportsPipelining;
Expand Down
2 changes: 2 additions & 0 deletions src/MySqlConnector/MySqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
using Microsoft.Extensions.Logging;
using MySqlConnector.Core;
using MySqlConnector.Logging;
using MySqlConnector.Plugins;
using MySqlConnector.Protocol;
using MySqlConnector.Protocol.Payloads;
using MySqlConnector.Protocol.Serialization;
Expand Down Expand Up @@ -979,6 +980,7 @@ internal void Cancel(ICancellableCommand command, int commandId, bool isCancel)

internal MySqlTransaction? CurrentTransaction { get; set; }
internal MySqlConnectorLoggingConfiguration LoggingConfiguration { get; }
internal ZstandardPlugin? ZstandardPlugin { get; set; }
internal bool AllowLoadLocalInfile => GetInitializedConnectionSettings().AllowLoadLocalInfile;
internal bool AllowUserVariables => GetInitializedConnectionSettings().AllowUserVariables;
internal bool AllowZeroDateTime => GetInitializedConnectionSettings().AllowZeroDateTime;
Expand Down
2 changes: 2 additions & 0 deletions src/MySqlConnector/MySqlConnector.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

<ItemGroup>
<InternalsVisibleTo Include="MySqlConnector.DependencyInjection.Tests" Key="00240000048000001402000006020000002400005253413100100000010001000521c81bf0f0ec7b261bb89bb583611d3767205d542c16c9353e317455acf612d3ec3dd03b77e7e6fda1aa8f15c58576d90dae0fb9f4fd4bd48709ae199b8c771963fa67d70b35f7ed2fbb6c60423935adfae0606716ea6ce31a1fcd56fdb206fc0c3b1205ec6ba56fb20c14c42105a601ddd0bfaea7207d535b29a39ffe82f00880f4f64f86e6bcf26eb5242a133bad9d7a32e3126036b68b13b413ce4097dfc18d9a5b1e494f1aed54dc84d7089fd0d931a49e679fdc7c8f07a5121df38ec27c2c9993a8f8f136b2937849aed32aef7324a5b7e482dc2eb693c7988f6074e82e75a41dd001587be4d79108588b25d40ed9aeb30ff921edaf509c94f71428e48219ba940f5f10c061421dc0c006e09feadec30df20b2d13d02c3ce4ceb32b6fbefd254288d45f3bb2c425b197e19699d7efdfc7aba5dd45b727bc98abd866d2f6e69e33a64e4b5a5ab1e4d749266c7bf285550da9fb036f10eff76b697de9c5ed8de4a3cdbca1174543540bed6c3a95641cfdacbac834896639f8a75ed1fb9cfd9983d83d0b43b76bd3894bd2b3da0dd23d1e0362985217f087acce1a7f56546c214890acae8fc60e27890ff31c38578f85e220342061a1a5c867362a14aafdffa003dc13af064f5f860d1757883ea5237feed3a6228c86200062bd88f5592d5c399ef270a562d458ae8eac5eaa382b5bcc3f64298cc34b4598f0b33d7943b8" />
<InternalsVisibleTo Include="MySqlConnector.Encryption" Key="0024000004800000940000000602000000240000525341310004000001000100e57cf9527549be513974d12b6730ff1db99b10a1498bb424ef0a14f0441f7043c5159f51629255107b314c710bf29463e1cfa4585732f560c3ef25207c7dfa57004d28927078a28b0ab7fa534403c18f2d5aaeda506a3d276c26b3bcc62f7fc8efe9aaf3e68b14b839ad10f10f42db834d170c96b1da9fc463231afe73bbaabe" />
<InternalsVisibleTo Include="MySqlConnector.Zstandard" Key="0024000004800000940000000602000000240000525341310004000001000100e57cf9527549be513974d12b6730ff1db99b10a1498bb424ef0a14f0441f7043c5159f51629255107b314c710bf29463e1cfa4585732f560c3ef25207c7dfa57004d28927078a28b0ab7fa534403c18f2d5aaeda506a3d276c26b3bcc62f7fc8efe9aaf3e68b14b839ad10f10f42db834d170c96b1da9fc463231afe73bbaabe" />
<InternalsVisibleTo Include="MySqlConnector.Tests" Key="00240000048000001402000006020000002400005253413100100000010001000521c81bf0f0ec7b261bb89bb583611d3767205d542c16c9353e317455acf612d3ec3dd03b77e7e6fda1aa8f15c58576d90dae0fb9f4fd4bd48709ae199b8c771963fa67d70b35f7ed2fbb6c60423935adfae0606716ea6ce31a1fcd56fdb206fc0c3b1205ec6ba56fb20c14c42105a601ddd0bfaea7207d535b29a39ffe82f00880f4f64f86e6bcf26eb5242a133bad9d7a32e3126036b68b13b413ce4097dfc18d9a5b1e494f1aed54dc84d7089fd0d931a49e679fdc7c8f07a5121df38ec27c2c9993a8f8f136b2937849aed32aef7324a5b7e482dc2eb693c7988f6074e82e75a41dd001587be4d79108588b25d40ed9aeb30ff921edaf509c94f71428e48219ba940f5f10c061421dc0c006e09feadec30df20b2d13d02c3ce4ceb32b6fbefd254288d45f3bb2c425b197e19699d7efdfc7aba5dd45b727bc98abd866d2f6e69e33a64e4b5a5ab1e4d749266c7bf285550da9fb036f10eff76b697de9c5ed8de4a3cdbca1174543540bed6c3a95641cfdacbac834896639f8a75ed1fb9cfd9983d83d0b43b76bd3894bd2b3da0dd23d1e0362985217f087acce1a7f56546c214890acae8fc60e27890ff31c38578f85e220342061a1a5c867362a14aafdffa003dc13af064f5f860d1757883ea5237feed3a6228c86200062bd88f5592d5c399ef270a562d458ae8eac5eaa382b5bcc3f64298cc34b4598f0b33d7943b8" />
<InternalsVisibleTo Include="IntegrationTests" Key="00240000048000001402000006020000002400005253413100100000010001000521c81bf0f0ec7b261bb89bb583611d3767205d542c16c9353e317455acf612d3ec3dd03b77e7e6fda1aa8f15c58576d90dae0fb9f4fd4bd48709ae199b8c771963fa67d70b35f7ed2fbb6c60423935adfae0606716ea6ce31a1fcd56fdb206fc0c3b1205ec6ba56fb20c14c42105a601ddd0bfaea7207d535b29a39ffe82f00880f4f64f86e6bcf26eb5242a133bad9d7a32e3126036b68b13b413ce4097dfc18d9a5b1e494f1aed54dc84d7089fd0d931a49e679fdc7c8f07a5121df38ec27c2c9993a8f8f136b2937849aed32aef7324a5b7e482dc2eb693c7988f6074e82e75a41dd001587be4d79108588b25d40ed9aeb30ff921edaf509c94f71428e48219ba940f5f10c061421dc0c006e09feadec30df20b2d13d02c3ce4ceb32b6fbefd254288d45f3bb2c425b197e19699d7efdfc7aba5dd45b727bc98abd866d2f6e69e33a64e4b5a5ab1e4d749266c7bf285550da9fb036f10eff76b697de9c5ed8de4a3cdbca1174543540bed6c3a95641cfdacbac834896639f8a75ed1fb9cfd9983d83d0b43b76bd3894bd2b3da0dd23d1e0362985217f087acce1a7f56546c214890acae8fc60e27890ff31c38578f85e220342061a1a5c867362a14aafdffa003dc13af064f5f860d1757883ea5237feed3a6228c86200062bd88f5592d5c399ef270a562d458ae8eac5eaa382b5bcc3f64298cc34b4598f0b33d7943b8" />
<Using Include="System.Data" />
Expand Down
8 changes: 6 additions & 2 deletions src/MySqlConnector/MySqlDataSource.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Microsoft.Extensions.Logging;
using MySqlConnector.Core;
using MySqlConnector.Logging;
using MySqlConnector.Plugins;
using MySqlConnector.Protocol.Serialization;

namespace MySqlConnector;
Expand All @@ -18,7 +19,7 @@ public sealed class MySqlDataSource : DbDataSource
/// <param name="connectionString">The connection string for the MySQL Server. This parameter is required.</param>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="connectionString"/> is <c>null</c>.</exception>
public MySqlDataSource(string connectionString)
: this(connectionString ?? throw new ArgumentNullException(nameof(connectionString)), MySqlConnectorLoggingConfiguration.NullConfiguration, null, null, null, null, default, default)
: this(connectionString ?? throw new ArgumentNullException(nameof(connectionString)), MySqlConnectorLoggingConfiguration.NullConfiguration, null, null, null, null, default, default, default)
{
}

Expand All @@ -29,14 +30,16 @@ internal MySqlDataSource(string connectionString,
RemoteCertificateValidationCallback? remoteCertificateValidationCallback,
Func<MySqlProvidePasswordContext, CancellationToken, ValueTask<string>>? periodicPasswordProvider,
TimeSpan periodicPasswordProviderSuccessRefreshInterval,
TimeSpan periodicPasswordProviderFailureRefreshInterval)
TimeSpan periodicPasswordProviderFailureRefreshInterval,
ZstandardPlugin? zstandardPlugin)
{
m_connectionString = connectionString;
LoggingConfiguration = loggingConfiguration;
Name = name;
m_clientCertificatesCallback = clientCertificatesCallback;
m_remoteCertificateValidationCallback = remoteCertificateValidationCallback;
m_logger = loggingConfiguration.DataSourceLogger;
m_zstandardPlugin = zstandardPlugin;

Pool = ConnectionPool.CreatePool(m_connectionString, LoggingConfiguration, name);
m_id = Interlocked.Increment(ref s_lastId);
Expand Down Expand Up @@ -221,6 +224,7 @@ private string ProvidePasswordFromInitialRefreshTask(MySqlProvidePasswordContext
private readonly Func<MySqlProvidePasswordContext, CancellationToken, ValueTask<string>>? m_periodicPasswordProvider;
private readonly TimeSpan m_periodicPasswordProviderSuccessRefreshInterval;
private readonly TimeSpan m_periodicPasswordProviderFailureRefreshInterval;
private readonly ZstandardPlugin? m_zstandardPlugin;
private readonly MySqlProvidePasswordContext? m_providePasswordContext;
private readonly CancellationTokenSource? m_passwordProviderTimerCancellationTokenSource;
private readonly Timer? m_passwordProviderTimer;
Expand Down
6 changes: 5 additions & 1 deletion src/MySqlConnector/MySqlDataSourceBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using System.Security.Cryptography.X509Certificates;
using Microsoft.Extensions.Logging;
using MySqlConnector.Logging;
using MySqlConnector.Plugins;

namespace MySqlConnector;

Expand Down Expand Up @@ -102,7 +103,8 @@ public MySqlDataSource Build()
m_remoteCertificateValidationCallback,
m_periodicPasswordProvider,
m_periodicPasswordProviderSuccessRefreshInterval,
m_periodicPasswordProviderFailureRefreshInterval
m_periodicPasswordProviderFailureRefreshInterval,
ZstandardPlugin
);
}

Expand All @@ -111,6 +113,8 @@ public MySqlDataSource Build()
/// </summary>
public MySqlConnectionStringBuilder ConnectionStringBuilder { get; }

internal ZstandardPlugin? ZstandardPlugin { get; set; }

private ILoggerFactory? m_loggerFactory;
private string? m_name;
private Func<X509CertificateCollection, ValueTask>? m_clientCertificatesCallback;
Expand Down
9 changes: 9 additions & 0 deletions src/MySqlConnector/Plugins/ZstandardPlugin.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
using MySqlConnector.Protocol.Serialization;

namespace MySqlConnector.Plugins;

internal abstract class ZstandardPlugin
{
public abstract IPayloadHandler CreatePayloadHandler(IByteHandler byteHandler);
public abstract int CompressionLevel { get; }
}
17 changes: 11 additions & 6 deletions src/MySqlConnector/Protocol/Payloads/HandshakeResponse41Payload.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ namespace MySqlConnector.Protocol.Payloads;

internal static class HandshakeResponse41Payload
{
private static ByteBufferWriter CreateCapabilitiesPayload(ProtocolCapabilities serverCapabilities, ConnectionSettings cs, bool useCompression, CharacterSet characterSet, ProtocolCapabilities additionalCapabilities = 0)
private static ByteBufferWriter CreateCapabilitiesPayload(ProtocolCapabilities serverCapabilities, ConnectionSettings cs, CompressionMethod compressionMethod, CharacterSet characterSet, ProtocolCapabilities additionalCapabilities = 0)
{
var writer = new ByteBufferWriter();

Expand All @@ -22,10 +22,11 @@ private static ByteBufferWriter CreateCapabilitiesPayload(ProtocolCapabilities s
(cs.AllowLoadLocalInfile ? ProtocolCapabilities.LocalFiles : 0) |
(string.IsNullOrWhiteSpace(cs.Database) ? 0 : ProtocolCapabilities.ConnectWithDatabase) |
(cs.UseAffectedRows ? 0 : ProtocolCapabilities.FoundRows) |
(useCompression ? ProtocolCapabilities.Compress : ProtocolCapabilities.None) |
(compressionMethod == CompressionMethod.Zlib ? ProtocolCapabilities.Compress : ProtocolCapabilities.None) |
(serverCapabilities & ProtocolCapabilities.ConnectionAttributes) |
(serverCapabilities & ProtocolCapabilities.SessionTrack) |
(serverCapabilities & ProtocolCapabilities.DeprecateEof) |
(compressionMethod == CompressionMethod.Zstandard ? ProtocolCapabilities.ZstandardCompressionAlgorithm : 0) |
(serverCapabilities & ProtocolCapabilities.QueryAttributes) |
(serverCapabilities & ProtocolCapabilities.MariaDbCacheMetadata) |
additionalCapabilities;
Expand All @@ -51,13 +52,13 @@ private static ByteBufferWriter CreateCapabilitiesPayload(ProtocolCapabilities s
return writer;
}

public static PayloadData CreateWithSsl(ProtocolCapabilities serverCapabilities, ConnectionSettings cs, bool useCompression, CharacterSet characterSet) =>
CreateCapabilitiesPayload(serverCapabilities, cs, useCompression, characterSet, ProtocolCapabilities.Ssl).ToPayloadData();
public static PayloadData CreateWithSsl(ProtocolCapabilities serverCapabilities, ConnectionSettings cs, CompressionMethod compressionMethod, CharacterSet characterSet) =>
CreateCapabilitiesPayload(serverCapabilities, cs, compressionMethod, characterSet, ProtocolCapabilities.Ssl).ToPayloadData();

public static PayloadData Create(InitialHandshakePayload handshake, ConnectionSettings cs, string password, bool useCompression, CharacterSet characterSet, byte[]? connectionAttributes)
public static PayloadData Create(InitialHandshakePayload handshake, ConnectionSettings cs, string password, CompressionMethod compressionMethod, int? compressionLevel, CharacterSet characterSet, byte[]? connectionAttributes)
{
// TODO: verify server capabilities
var writer = CreateCapabilitiesPayload(handshake.ProtocolCapabilities, cs, useCompression, characterSet);
var writer = CreateCapabilitiesPayload(handshake.ProtocolCapabilities, cs, compressionMethod, characterSet);
writer.WriteNullTerminatedString(cs.UserID);
var authenticationResponse = AuthenticationUtility.CreateAuthenticationResponse(handshake.AuthPluginData, password);
writer.Write((byte) authenticationResponse.Length);
Expand All @@ -72,6 +73,10 @@ public static PayloadData Create(InitialHandshakePayload handshake, ConnectionSe
if (connectionAttributes is not null)
writer.Write(connectionAttributes);

// Zstandard compression level
if (compressionMethod == CompressionMethod.Zstandard)
writer.Write((byte) (compressionLevel ?? 10));

return writer.ToPayloadData();
}
}
12 changes: 12 additions & 0 deletions src/MySqlConnector/Protocol/ProtocolCapabilities.cs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,18 @@ internal enum ProtocolCapabilities : ulong
/// </summary>
DeprecateEof = 0x100_0000,

/// <summary>
/// The client can handle optional metadata information in the resultset.
/// </summary>
/// <remarks>Corresponds to CLIENT_OPTIONAL_RESULTSET_METADATA.</remarks>
ClientOptionalResultsetMetadata = 0x200_0000,

/// <summary>
/// The client supports the Zstandard compression algorithm.
/// </summary>
/// <remarks>Corresponds to CLIENT_ZSTD_COMPRESSION_ALGORITHM.</remarks>
ZstandardCompressionAlgorithm = 0x400_0000,

/// <summary>
/// Supports query attributes (CLIENT_QUERY_ATTRIBUTES).
/// </summary>
Expand Down

0 comments on commit f74ce0b

Please sign in to comment.