Skip to content

Commit

Permalink
Uint64 -> Int32
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillKurdyukov committed Aug 14, 2024
1 parent fd86581 commit 9ba3fac
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 23 deletions.
46 changes: 28 additions & 18 deletions slo/src/Internal/SloContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ namespace Internal;
public abstract class SloContext<T> where T : IDisposable
{
protected readonly ILoggerFactory Factory;
protected readonly ILogger Logger;
private readonly ILogger _logger;

private volatile int _maxId;

protected SloContext()
{
Factory = LoggerFactory.Create(builder => builder.AddConsole().SetMinimumLevel(LogLevel.Information));
Logger = Factory.CreateLogger<SloContext<T>>();
_logger = Factory.CreateLogger<SloContext<T>>();
}

protected abstract string JobName { get; }
Expand All @@ -31,13 +31,13 @@ public async Task Create(CreateConfig config)
using var client = await CreateClient(config);
for (var attempt = 0; attempt < maxCreateAttempts; attempt++)
{
Logger.LogInformation("Creating table {TableName}..", config.TableName);
_logger.LogInformation("Creating table {TableName}..", config.TableName);
try
{
var createTableSql = $"""
CREATE TABLE `{config.TableName}` (
hash Uint64,
id Uint64,
id Int32,
payload_str Text,
payload_double Double,
payload_timestamp Timestamp,
Expand All @@ -51,17 +51,17 @@ PRIMARY KEY (hash, id)
AUTO_PARTITIONING_MAX_PARTITIONS_COUNT = {config.MaxPartitionsCount}
);
""";
Logger.LogInformation("YQL script: {sql}", createTableSql);
_logger.LogInformation("YQL script: {sql}", createTableSql);

await Create(client, createTableSql, config.WriteTimeout);

Logger.LogInformation("Created table {TableName}!", config.TableName);
_logger.LogInformation("Created table {TableName}", config.TableName);

break;
}
catch (Exception e)
{
Logger.LogError(e, "Fail created table");
_logger.LogError(e, "Fail created table");

if (attempt == maxCreateAttempts - 1)
{
Expand All @@ -84,11 +84,11 @@ PRIMARY KEY (hash, id)
}
catch (Exception e)
{
Logger.LogError(e, "Init failed when all tasks, continue..");
_logger.LogError(e, "Init failed when all tasks, continue..");
}
finally
{
Logger.LogInformation("Created task is finished");
_logger.LogInformation("Created task is finished");
}
}

Expand All @@ -101,7 +101,7 @@ public async Task Run(RunConfig runConfig)
using var prometheus = new MetricPusher(promPgwEndpoint, JobName, intervalMilliseconds: runConfig.ReportPeriod);
prometheus.Start();

var (_, _, maxId) = await Select(client, $"SELECT COUNT(*) FROM `{runConfig.TableName};`",
var (_, _, maxId) = await Select(client, $"SELECT MAX(id) as max_id FROM `{runConfig.TableName};`",
new Dictionary<string, YdbValue>(), runConfig.ReadTimeout);
_maxId = (int)maxId!;

Expand Down Expand Up @@ -132,11 +132,14 @@ public async Task Run(RunConfig runConfig)
var writeTask = ShootingTask(writeLimiter, "write", Upsert);
var readTask = ShootingTask(readLimiter, "read", Select);

Logger.LogInformation("Started write / read shooting..");
_logger.LogInformation("Started write / read shooting..");

await Task.WhenAll(readTask, writeTask);

Logger.LogInformation("Run task is finished");
await prometheus.StopAsync();
await MetricReset(promPgwEndpoint);

_logger.LogInformation("Run task is finished");
return;

Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName,
Expand All @@ -158,7 +161,7 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName,
if (statusCode != StatusCode.Success)
{
Logger.LogError("Failed {ShootingName} operation code: {StatusCode}", shootingName,
_logger.LogError("Failed {ShootingName} operation code: {StatusCode}", shootingName,
statusCode);
notOkGauge.Inc();
label = "err";
Expand All @@ -175,15 +178,15 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName,
}
catch (RateLimitRejectedException)
{
Logger.LogInformation("Waiting {ShootingName} tasks", shootingName);
_logger.LogInformation("Waiting {ShootingName} tasks", shootingName);
await Task.Delay(990, cancellationTokenSource.Token);
}
}
await Task.WhenAll(tasks);
Logger.LogInformation("{ShootingName} shooting is stopped", shootingName);
_logger.LogInformation("{ShootingName} shooting is stopped", shootingName);
}, cancellationTokenSource.Token);
}
}
Expand All @@ -203,15 +206,15 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName,

return Upsert(client,
$"""
DECLARE $id AS Uint64;
DECLARE $id AS Int32;
DECLARE $payload_str AS Utf8;
DECLARE $payload_double AS Double;
DECLARE $payload_timestamp AS Timestamp;
UPSERT INTO `{config.TableName}` (id, hash, payload_str, payload_double, payload_timestamp)
VALUES ($id, Digest::NumericHash($id), $payload_str, $payload_double, $payload_timestamp)
""", new Dictionary<string, YdbValue>
{
{ "$id", YdbValue.MakeUint64((ulong)Interlocked.Increment(ref _maxId)) },
{ "$id", YdbValue.MakeInt32(Interlocked.Increment(ref _maxId)) },
{
"$payload_str", YdbValue.MakeUtf8(string.Join(string.Empty, Enumerable
.Repeat(0, Random.Shared.Next(minSizeStr, maxSizeStr))
Expand All @@ -228,7 +231,7 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName,
{
var (attempts, code, _) = await Select(client,
$"""
DECLARE $id AS Uint64;
DECLARE $id AS Int32;
SELECT id, payload_str, payload_double, payload_timestamp, payload_hash
FROM `{config.TableName}` WHERE id = $id AND hash = Digest::NumericHash($id)
""",
Expand All @@ -239,4 +242,11 @@ Task ShootingTask(RateLimitPolicy rateLimitPolicy, string shootingName,

return (attempts, code);
}

private async Task MetricReset(string promPgwEndpoint)
{
var deleteUri = $"{promPgwEndpoint}/job/{JobName}";
using var httpClient = new HttpClient();
await httpClient.DeleteAsync(deleteUri);
}
}
30 changes: 25 additions & 5 deletions slo/src/TableService/SloContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ namespace TableService;

public class SloContext : SloContext<TableClient>
{
private readonly TxControl _txControl = TxControl.BeginSerializableRW().Commit();
protected override string JobName => "workload-table-service";

protected override async Task Create(TableClient client, string createTableSql, int operationTimeout)
Expand All @@ -24,8 +25,6 @@ protected override async Task Create(TableClient client, string createTableSql,
protected override async Task<(int, StatusCode)> Upsert(TableClient tableClient, string upsertSql,
Dictionary<string, YdbValue> parameters, int writeTimeout, Gauge? errorsGauge = null)
{
var txControl = TxControl.BeginSerializableRW().Commit();

var querySettings = new ExecuteDataQuerySettings
{ OperationTimeout = TimeSpan.FromSeconds(writeTimeout) };

Expand All @@ -35,7 +34,7 @@ protected override async Task Create(TableClient client, string createTableSql,
async session =>
{
attempts++;
var response = await session.ExecuteDataQuery(upsertSql, txControl, parameters, querySettings);
var response = await session.ExecuteDataQuery(upsertSql, _txControl, parameters, querySettings);
if (response.Status.IsSuccess)
{
return response;
Expand All @@ -49,10 +48,31 @@ protected override async Task Create(TableClient client, string createTableSql,
return (attempts, response.Status.StatusCode);
}

protected override Task<(int, StatusCode, object)> Select(TableClient client, string selectSql,
protected override async Task<(int, StatusCode, object?)> Select(TableClient tableClient, string selectSql,
Dictionary<string, YdbValue> parameters, int readTimeout, Gauge? errorsGauge = null)
{
throw new NotImplementedException();
var querySettings = new ExecuteDataQuerySettings
{ OperationTimeout = TimeSpan.FromSeconds(readTimeout) };

var attempts = 0;

var response = (ExecuteDataQueryResponse)await tableClient.SessionExec(
async session =>
{
attempts++;
var response = await session.ExecuteDataQuery(selectSql, _txControl, parameters, querySettings);
if (response.Status.IsSuccess)
{
return response;
}
errorsGauge?.WithLabels(Utils.GetResonseStatusName(response.Status.StatusCode), "retried").Inc();
return response;
});

return (attempts, response.Status.StatusCode,
response.Status.IsSuccess ? response.Result.ResultSets[0].Rows[0][0].GetInt32() : null);
}

protected override async Task<TableClient> CreateClient(Config config)
Expand Down

0 comments on commit 9ba3fac

Please sign in to comment.