Skip to content

Commit

Permalink
Refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
XmasApple committed Sep 13, 2023
1 parent 1520013 commit cf8e51e
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 108 deletions.
69 changes: 8 additions & 61 deletions slo/src/Cli/CliCommands.cs
Original file line number Diff line number Diff line change
@@ -1,48 +1,22 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Prometheus;
using slo.Jobs;
using Ydb.Sdk;
using Ydb.Sdk.Table;

namespace slo.Cli;

public static class CliCommands
{
private static ServiceProvider GetServiceProvider()
{
return new ServiceCollection()
.AddLogging(configure => configure.AddConsole().SetMinimumLevel(LogLevel.Information))
.BuildServiceProvider();
}


internal static async Task Create(CreateConfig config)
{
Console.WriteLine(config);
var driverConfig = new DriverConfig(
config.Endpoint,
config.Db
);

await using var serviceProvider = GetServiceProvider();
var loggerFactory = serviceProvider.GetService<ILoggerFactory>();

loggerFactory ??= NullLoggerFactory.Instance;
await using var driver = await Driver.CreateInitialized(driverConfig, loggerFactory);

using var tableClient = new TableClient(driver);

var executor = new Executor(tableClient);
await using var client = await Client.CreateAsync(config.Endpoint, config.Db, config.TableName);

var table = new Table(config.TableName, executor);
const int maxCreateAttempts = 10;
for (var i = 0; i < maxCreateAttempts; i++)
{
try
{
await table.Init(config.InitialDataCount,
await client.Init(config.InitialDataCount,
config.PartitionSize,
config.MinPartitionsCount,
config.MaxPartitionsCount,
Expand All @@ -60,47 +34,20 @@ await table.Init(config.InitialDataCount,
internal static async Task CleanUp(CleanUpConfig config)
{
Console.WriteLine(config);
var driverConfig = new DriverConfig(
config.Endpoint,
config.Db
);

await using var serviceProvider = GetServiceProvider();
var loggerFactory = serviceProvider.GetService<ILoggerFactory>();
await using var client = await Client.CreateAsync(config.Endpoint, config.Db, config.TableName);

loggerFactory ??= NullLoggerFactory.Instance;
await using var driver = await Driver.CreateInitialized(driverConfig, loggerFactory);

using var tableClient = new TableClient(driver);

var executor = new Executor(tableClient);

var table = new Table(config.TableName, executor);
await table.CleanUp(TimeSpan.FromMilliseconds(config.WriteTimeout));
await client.CleanUp(TimeSpan.FromMilliseconds(config.WriteTimeout));
}

internal static async Task Run(RunConfig config)
{
var promPgwEndpoint = $"{config.PromPgw}/metrics";
const string job = "workload-dotnet";

var driverConfig = new DriverConfig(
config.Endpoint,
config.Db
);

await using var serviceProvider = GetServiceProvider();
var loggerFactory = serviceProvider.GetService<ILoggerFactory>();

loggerFactory ??= NullLoggerFactory.Instance;
await using var driver = await Driver.CreateInitialized(driverConfig, loggerFactory);

using var tableClient = new TableClient(driver);

var executor = new Executor(tableClient);
await using var client = await Client.CreateAsync(config.Endpoint, config.Db, config.TableName);

var table = new Table(config.TableName, executor);
await table.Init(config.InitialDataCount, 1, 6, 1000, TimeSpan.FromMilliseconds(config.WriteTimeout));
await client.Init(config.InitialDataCount, 1, 6, 1000, TimeSpan.FromMilliseconds(config.WriteTimeout));

Console.WriteLine(config.PromPgw);

Expand All @@ -112,15 +59,15 @@ internal static async Task Run(RunConfig config)
var duration = TimeSpan.FromSeconds(config.Time);

var readJob = new ReadJob(
table,
client,
new RateLimitedCaller(
config.ReadRps,
duration
),
TimeSpan.FromMilliseconds(config.ReadTimeout));

var writeJob = new WriteJob(
table,
client,
new RateLimitedCaller(
config.WriteRps,
duration
Expand Down
89 changes: 89 additions & 0 deletions slo/src/Client.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using Ydb.Sdk;
using Ydb.Sdk.Table;

namespace slo;

public class Client : IAsyncDisposable
{
public readonly Executor Executor;
public readonly string TableName;

private readonly ServiceProvider _serviceProvider;
private readonly Driver _driver;
private readonly TableClient _tableClient;

private Client(string tableName, Executor executor, ServiceProvider serviceProvider, Driver driver,
TableClient tableClient)
{
TableName = tableName;
Executor = executor;
_serviceProvider = serviceProvider;
_driver = driver;
_tableClient = tableClient;
}

public async Task Init(int initialDataCount, int partitionSize, int minPartitionsCount, int maxPartitionsCount,
TimeSpan timeout)
{
await Executor.ExecuteSchemeQuery(
Queries.GetCreateQuery(TableName, partitionSize, minPartitionsCount, maxPartitionsCount),
timeout);

await DataGenerator.LoadMaxId(TableName, Executor);

var tasks = new List<Task> { Capacity = initialDataCount };

for (var i = 0; i < initialDataCount; i++)
tasks.Add(
Executor.ExecuteDataQuery(Queries.GetWriteQuery(TableName),
DataGenerator.GetUpsertData(),
timeout: timeout));

await Task.WhenAll(tasks);
}

public async Task CleanUp(TimeSpan timeout)
{
await Executor.ExecuteSchemeQuery(Queries.GetDropQuery(TableName), timeout);
}

private static ServiceProvider GetServiceProvider()
{
return new ServiceCollection()
.AddLogging(configure => configure.AddConsole().SetMinimumLevel(LogLevel.Information))
.BuildServiceProvider();
}

public static async Task<Client> CreateAsync(string endpoint, string db, string tableName)
{
var driverConfig = new DriverConfig(
endpoint,
db
);

var serviceProvider = GetServiceProvider();
var loggerFactory = serviceProvider.GetService<ILoggerFactory>();

loggerFactory ??= NullLoggerFactory.Instance;
var driver = await Driver.CreateInitialized(driverConfig, loggerFactory);

var tableClient = new TableClient(driver);

var executor = new Executor(tableClient);

var table = new Client(tableName, executor, serviceProvider, driver, tableClient);

return table;
}


public async ValueTask DisposeAsync()
{
_tableClient.Dispose();
await _driver.DisposeAsync();
await _serviceProvider.DisposeAsync();
}
}
6 changes: 3 additions & 3 deletions slo/src/Jobs/Job.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@ public abstract class Job
protected readonly Histogram AttemptsHistogram;
protected readonly Random Random = new();

protected readonly Table Table;
protected readonly Client Client;

protected Job(Table table, RateLimitedCaller rateLimitedCaller, string jobName, TimeSpan timeout)
protected Job(Client client, RateLimitedCaller rateLimitedCaller, string jobName, TimeSpan timeout)
{
Table = table;
Client = client;
_rateLimitedCaller = rateLimitedCaller;
Timeout = timeout;

Expand Down
6 changes: 3 additions & 3 deletions slo/src/Jobs/ReadJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ namespace slo.Jobs;

internal class ReadJob : Job
{
public ReadJob(Table table, RateLimitedCaller rateLimitedCaller, TimeSpan timeout) : base(table, rateLimitedCaller, "read", timeout)
public ReadJob(Client client, RateLimitedCaller rateLimitedCaller, TimeSpan timeout) : base(client, rateLimitedCaller, "read", timeout)
{
}

Expand All @@ -16,8 +16,8 @@ protected override async Task PerformQuery()
{ "$id", YdbValue.MakeUint64((ulong)Random.Next(DataGenerator.MaxId)) }
};

await Table.Executor.ExecuteDataQuery(
Queries.GetReadQuery(Table.TableName),
await Client.Executor.ExecuteDataQuery(
Queries.GetReadQuery(Client.TableName),
parameters,
AttemptsHistogram,
Timeout
Expand Down
6 changes: 3 additions & 3 deletions slo/src/Jobs/WriteJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace slo.Jobs;

internal class WriteJob : Job
{
public WriteJob(Table table, RateLimitedCaller rateLimitedCaller, TimeSpan timeout) : base(table, rateLimitedCaller,
public WriteJob(Client client, RateLimitedCaller rateLimitedCaller, TimeSpan timeout) : base(client, rateLimitedCaller,
"write", timeout)
{
}
Expand All @@ -12,8 +12,8 @@ protected override async Task PerformQuery()
{
var parameters = DataGenerator.GetUpsertData();

await Table.Executor.ExecuteDataQuery(
Queries.GetWriteQuery(Table.TableName),
await Client.Executor.ExecuteDataQuery(
Queries.GetWriteQuery(Client.TableName),
parameters,
AttemptsHistogram,
Timeout
Expand Down
38 changes: 0 additions & 38 deletions slo/src/Table.cs

This file was deleted.

0 comments on commit cf8e51e

Please sign in to comment.