Skip to content

Commit

Permalink
IkpuUnitLoader
Browse files Browse the repository at this point in the history
  • Loading branch information
o.nadymov committed Apr 29, 2024
1 parent 96c193f commit 46d503c
Show file tree
Hide file tree
Showing 11 changed files with 466 additions and 32 deletions.
6 changes: 6 additions & 0 deletions Spoleto.VirtualKassa.MultiBank.sln
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "WorkFlow", "WorkFlow", "{48
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "Spoleto.VirtualKassa.MultiBank.Extensions", "src\Spoleto.VirtualKassa.MultiBank.Extensions\Spoleto.VirtualKassa.MultiBank.Extensions.csproj", "{DC77A1B8-14E1-4B81-AD45-FEE66C62FA37}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Spoleto.VirtualKassa.MultiBank.IkpuUnitLoader", "src\Spoleto.VirtualKassa.MultiBank.IkpuUnitLoader\Spoleto.VirtualKassa.MultiBank.IkpuUnitLoader.csproj", "{A0F6747D-89CA-4B1E-B77F-EFC69819C164}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand All @@ -48,6 +50,10 @@ Global
{DC77A1B8-14E1-4B81-AD45-FEE66C62FA37}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DC77A1B8-14E1-4B81-AD45-FEE66C62FA37}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DC77A1B8-14E1-4B81-AD45-FEE66C62FA37}.Release|Any CPU.Build.0 = Release|Any CPU
{A0F6747D-89CA-4B1E-B77F-EFC69819C164}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A0F6747D-89CA-4B1E-B77F-EFC69819C164}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A0F6747D-89CA-4B1E-B77F-EFC69819C164}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A0F6747D-89CA-4B1E-B77F-EFC69819C164}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using Microsoft.Extensions.Configuration;
using Spoleto.VirtualKassa.MultiBank.WpfTester.IkpuUnitLoader;

namespace Spoleto.VirtualKassa.MultiBank.IkpuUnitLoader
{
public static class ConfigurationHelper
{
private static readonly IConfigurationRoot _config;

static ConfigurationHelper()
{
_config = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", optional: true)
.AddUserSecrets("ac9436b5-16e3-4fdb-acce-226bb4ad441f")
.Build();
}

public static IkpuUnitLoaderOptions GetLoaderOptions()
{
var options = _config.GetSection(nameof(IkpuUnitLoaderOptions)).Get<IkpuUnitLoaderOptions>();

return options;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
using System.Text.Json.Serialization;
using Spoleto.VirtualKassa.MultiBank.Models;

namespace Spoleto.VirtualKassa.MultiBank.WpfTester.IkpuUnitLoader
{
internal class IkpuResponse
{
[JsonPropertyName("message")]
public string Message { get; set; }

[JsonPropertyName("success")]
public bool Success { get; set; }

[JsonPropertyName("code")]
public int Code { get; set; }

[JsonPropertyName("timestamp")]
public DateTime Timestamp { get; set; }

[JsonPropertyName("data")]
public List<PackageName> Data { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
using System.Data;
using System.Data.Common;
using System.Net.Http.Headers;
using System.Runtime.CompilerServices;
using System.Text.Json;
using Microsoft.Data.SqlClient;
using Spoleto.Common.Helpers;
using Spoleto.VirtualKassa.MultiBank.Models;

namespace Spoleto.VirtualKassa.MultiBank.WpfTester.IkpuUnitLoader
{
internal class IkpuUnitLoader
{
private const int BulkInsertTimeout = 300;
private const int BulkInsertBatchSize = 10000;

private readonly IkpuUnitLoaderOptions _options;
private readonly HttpClient _httpClient;

private record IdCode(Guid Id, string Code);

public IkpuUnitLoader(IkpuUnitLoaderOptions options, HttpClient httpClient)
{
ArgumentNullException.ThrowIfNull(options);
ArgumentNullException.ThrowIfNull(httpClient);

_options = options;

_httpClient = httpClient;
_httpClient.DefaultRequestHeaders.Authorization = new AuthenticationHeaderValue("Bearer", _options.BearerToken);
_httpClient.Timeout = TimeSpan.FromMinutes(60);
}

public async Task LoadIkpuUnitsAsync(CancellationToken cancellationToken = default)
{
var options = new ParallelOptions { MaxDegreeOfParallelism = _options.MaxDegreeOfParallelism };
var ikpuIdCodes = GetIkpuIdCodesAsync(cancellationToken);

var unitsToInsert = new List<(IdCode idCode, IkpuResponse ikpuResponse)>();
var semaphore = new SemaphoreSlim(1, 1);

await Parallel.ForEachAsync(ikpuIdCodes, options, async (ikpuIdCode, token) =>
{
Console.WriteLine($"Loading data for {ikpuIdCode.Code}...");
try
{
var ikpuResponse = await LoadIkpuUnitAsync(ikpuIdCode, cancellationToken).ConfigureAwait(false);
if (ikpuResponse?.Data.Count > 0)
{
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
unitsToInsert.Add((ikpuIdCode, ikpuResponse));
if (unitsToInsert.Count >= 100)
{
// Write data into the database
await SaveIkpuUnitAsync(unitsToInsert, cancellationToken).ConfigureAwait(false);
unitsToInsert.Clear();
}
}
finally
{
semaphore.Release();
}
}
}
catch (Exception ex)
{
//
}
});

if (unitsToInsert.Count > 0)
{
// Write data into the database
await SaveIkpuUnitAsync(unitsToInsert, cancellationToken).ConfigureAwait(false);
unitsToInsert.Clear();
}

unitsToInsert = null;
}

private async IAsyncEnumerable<IdCode> GetIkpuIdCodesAsync([EnumeratorCancellation] CancellationToken cancellationToken = default)
{
using var connection = new SqlConnection(_options.ConnectrionString);
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);

var sql = $@"SELECT parent.{_options.IdColumnName}, parent.{_options.CodeColumnName} FROM {_options.OriginIkpuTableName} parent
LEFT JOIN {_options.TargetIkpuUnitTableName} unit ON unit._id_classifier_code = parent.{_options.IdColumnName}
WHERE
NOT EXISTS (
SELECT child.{_options.IdColumnName}
FROM {_options.OriginIkpuTableName} AS child
WHERE child._id_parent = parent.{_options.IdColumnName}
) AND unit.{_options.IdColumnName} IS NULL";

using var command = new SqlCommand(sql, connection);
using var reader = await command.ExecuteReaderAsync(cancellationToken).ConfigureAwait(false);

while (await reader.ReadAsync(cancellationToken).ConfigureAwait(false))
{
var id = reader.GetGuid(0);
var code = reader.GetString(1);

yield return new IdCode(id, code);
}
}

private async Task<IkpuResponse> LoadIkpuUnitAsync(IdCode ikpuIdCode, CancellationToken cancellationToken = default)
{
var url = _options.ServiceUrl;
if (url.EndsWith('/'))
url += ikpuIdCode.Code;
else
url += $"/{ikpuIdCode.Code}";

var uri = new Uri(url);

var response = await _httpClient.GetAsync(url, cancellationToken).ConfigureAwait(false);
response.EnsureSuccessStatusCode();

string jsonResponse = await response.Content.ReadAsStringAsync(cancellationToken).ConfigureAwait(false);

try
{
var ikpuResponse = JsonHelper.FromJson<IkpuResponse>(jsonResponse);

if (!ikpuResponse.Success)
{
throw new Exception($"{nameof(IkpuResponse)} has {nameof(IkpuResponse.Success)} = false.{Environment.NewLine}Original Json:{Environment.NewLine}{jsonResponse}");
}

return ikpuResponse;
}
catch (JsonException ex)
{
throw;
}
}

private async Task SaveIkpuUnitAsync(IEnumerable<(IdCode idCode, IkpuResponse ikpuResponse)> unitsToInsert, CancellationToken cancellationToken = default)
{
using var connection = new SqlConnection(_options.ConnectrionString);
await connection.OpenAsync(cancellationToken).ConfigureAwait(false);

using var trans = await connection.BeginTransactionAsync(cancellationToken).ConfigureAwait(false);
try
{
await InsertUnitsAsync(unitsToInsert, connection, trans, cancellationToken).ConfigureAwait(false);

await trans.CommitAsync(cancellationToken).ConfigureAwait(false);
}
catch
{
if (trans.Connection != null)
{
await trans.RollbackAsync(cancellationToken).ConfigureAwait(false);
}

throw;
}
}

private async Task InsertUnitsAsync(IEnumerable<(IdCode idCode, IkpuResponse ikpuResponse)> unitsToInsert, SqlConnection connection, DbTransaction trans, CancellationToken cancellationToken = default)
{
// Create a DataTable to hold the data
var goodDataTable = new DataTable();
goodDataTable.Columns.Add("IdClassifierCode", typeof(Guid));
goodDataTable.Columns.Add(nameof(PackageName.Code), typeof(string));
goodDataTable.Columns.Add(nameof(PackageName.PackageType), typeof(string));
goodDataTable.Columns.Add(nameof(PackageName.NameUz), typeof(string));
goodDataTable.Columns.Add(nameof(PackageName.NameRu), typeof(string));
goodDataTable.Columns.Add(nameof(PackageName.NameLat), typeof(string));

// Populate the DataTable with the data
foreach (var (idCode, ikpuResponse) in unitsToInsert)
{
foreach (var package in ikpuResponse.Data)
{
var row = goodDataTable.NewRow();
row["IdClassifierCode"] = idCode.Id;
row[nameof(PackageName.Code)] = package.Code;
row[nameof(PackageName.PackageType)] = package.PackageType;
row[nameof(PackageName.NameUz)] = package.NameUz;
row[nameof(PackageName.NameRu)] = package.NameRu;
row[nameof(PackageName.NameLat)] = package.NameLat;

goodDataTable.Rows.Add(row);
}
}

var copyOptions = SqlBulkCopyOptions.CheckConstraints;

using var sqlBulkCopy = new SqlBulkCopy(connection, copyOptions, (SqlTransaction)trans);
sqlBulkCopy.BulkCopyTimeout = BulkInsertTimeout;
sqlBulkCopy.BatchSize = BulkInsertBatchSize;
sqlBulkCopy.DestinationTableName = _options.TargetIkpuUnitTableName;

sqlBulkCopy.ColumnMappings.Add("IdClassifierCode", "_id_classifier_code");
sqlBulkCopy.ColumnMappings.Add(nameof(PackageName.Code), "_code");
sqlBulkCopy.ColumnMappings.Add(nameof(PackageName.PackageType), "_package_type");
sqlBulkCopy.ColumnMappings.Add(nameof(PackageName.NameUz), "_name_uz");
sqlBulkCopy.ColumnMappings.Add(nameof(PackageName.NameRu), "_name_ru");
sqlBulkCopy.ColumnMappings.Add(nameof(PackageName.NameLat), "_name_lat");

await sqlBulkCopy.WriteToServerAsync(goodDataTable, cancellationToken).ConfigureAwait(false);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
using Spoleto.VirtualKassa.MultiBank.IkpuUnitLoader.IkpuUnitLoader;

namespace Spoleto.VirtualKassa.MultiBank.WpfTester.IkpuUnitLoader
{
public record IkpuUnitLoaderOptions
{
public int MaxDegreeOfParallelism { get; set; }

public string IdColumnName { get; set; }

public string CodeColumnName { get; set; }

public string OriginIkpuTableName { get; set; }

public string TargetIkpuUnitTableName { get; set; }

public string ConnectrionString { get; set; }

public string ServiceUrl { get; set; }

public string BearerToken { get; set; }

public ProxyInfo? ProxyInfo { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
namespace Spoleto.VirtualKassa.MultiBank.IkpuUnitLoader.IkpuUnitLoader
{
public class ProxyCredentials
{
public string UserName { get; set; }

public string Password { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace Spoleto.VirtualKassa.MultiBank.IkpuUnitLoader.IkpuUnitLoader
{
public class ProxyInfo
{
public string Url { get; set; }

public ProxyCredentials? Credentials { get; set; }

public override string ToString() => Url;
}
}
68 changes: 68 additions & 0 deletions src/Spoleto.VirtualKassa.MultiBank.IkpuUnitLoader/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
using System.Net.Security;
using System.Net;
using Microsoft.Extensions.DependencyInjection;
using Spoleto.VirtualKassa.MultiBank.IkpuUnitLoader;
using Spoleto.VirtualKassa.MultiBank.WpfTester.IkpuUnitLoader;
using Polly.Extensions.Http;
using Polly;

Console.WriteLine("Loading IKPU units...");

var options = ConfigurationHelper.GetLoaderOptions();

var services = new ServiceCollection();

services.AddHttpClient<IkpuUnitLoader>();
services.AddHttpClient<IkpuUnitLoader>()
.SetHandlerLifetime(TimeSpan.FromMinutes(5)) //Set lifetime to five minutes
.ConfigurePrimaryHttpMessageHandler(x=>
{
var socketsHttpHandler = new SocketsHttpHandler()
{
MaxConnectionsPerServer = 1000,
SslOptions = new SslClientAuthenticationOptions
{
// Leave certs unvalidated for debugging
RemoteCertificateValidationCallback = delegate { return true; }
}
};
var proxyInfo = options.ProxyInfo;
if (proxyInfo?.Url != null)
{
var proxy = new WebProxy(proxyInfo.Url);
if (proxyInfo.Credentials != null)
{
proxy.Credentials = new NetworkCredential(proxyInfo.Credentials.UserName, proxyInfo.Credentials.Password);
}
socketsHttpHandler.UseProxy = true;
socketsHttpHandler.Proxy = proxy;
}
return socketsHttpHandler;
})
.AddPolicyHandler(GetRetryPolicy());

services.AddSingleton(s => options);

var serviceProvider = services.BuildServiceProvider();

var loader = serviceProvider.GetRequiredService<IkpuUnitLoader>();
var cancellationToken = new CancellationToken();

await loader.LoadIkpuUnitsAsync(cancellationToken);

Console.WriteLine("Loading IKPU units is completed!");


static IAsyncPolicy<HttpResponseMessage> GetRetryPolicy()
{
return HttpPolicyExtensions
.HandleTransientHttpError()
.OrResult(msg => msg.StatusCode == System.Net.HttpStatusCode.NotFound)
.OrResult(msg => msg.StatusCode == System.Net.HttpStatusCode.TooManyRequests)
.WaitAndRetryAsync(10, GetTimeout());
}

static Func<int, TimeSpan> GetTimeout() => retryAttempt => TimeSpan.FromMinutes(retryAttempt);

Loading

0 comments on commit 46d503c

Please sign in to comment.