From 8c545199577f3d5f947cb38d37d728b7cc8df195 Mon Sep 17 00:00:00 2001 From: stanislav_shchetinin Date: Fri, 2 Aug 2024 22:43:40 +0300 Subject: [PATCH] Bulk upsert integration test (#280) --- tests/integration/CMakeLists.txt | 1 + .../bulk_upsert_simple_it/CMakeLists.txt | 13 ++ .../bulk_upsert_simple_it/bulk_upsert.cpp | 163 ++++++++++++++++++ .../bulk_upsert_simple_it/bulk_upsert.h | 48 ++++++ .../bulk_upsert_simple_it/main.cpp | 52 ++++++ 5 files changed, 277 insertions(+) create mode 100644 tests/integration/bulk_upsert_simple_it/CMakeLists.txt create mode 100644 tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp create mode 100644 tests/integration/bulk_upsert_simple_it/bulk_upsert.h create mode 100644 tests/integration/bulk_upsert_simple_it/main.cpp diff --git a/tests/integration/CMakeLists.txt b/tests/integration/CMakeLists.txt index a7a4e7c971..510ce5332f 100644 --- a/tests/integration/CMakeLists.txt +++ b/tests/integration/CMakeLists.txt @@ -1 +1,2 @@ add_subdirectory(basic_example_it) +add_subdirectory(bulk_upsert_simple_it) diff --git a/tests/integration/bulk_upsert_simple_it/CMakeLists.txt b/tests/integration/bulk_upsert_simple_it/CMakeLists.txt new file mode 100644 index 0000000000..027cf05071 --- /dev/null +++ b/tests/integration/bulk_upsert_simple_it/CMakeLists.txt @@ -0,0 +1,13 @@ +add_ydb_test(NAME bulk_upsert_simple_it + SOURCES + main.cpp + bulk_upsert.cpp + bulk_upsert.h + LINK_LIBRARIES + yutil + YDB-CPP-SDK::Table + library-getopt + GTest::gtest_main + LABELS + integration +) diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp b/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp new file mode 100644 index 0000000000..90bf26207a --- /dev/null +++ b/tests/integration/bulk_upsert_simple_it/bulk_upsert.cpp @@ -0,0 +1,163 @@ +#include "bulk_upsert.h" + +#include + +static constexpr size_t BATCH_SIZE = 1000; + +static void ThrowOnError(const TStatus& status) { + if (!status.IsSuccess()) { + throw TYdbErrorException(status) << status; + } +} + +static std::string JoinPath(const std::string& basePath, const std::string& path) { + if (basePath.empty()) { + return path; + } + + std::filesystem::path prefixPathSplit(basePath); + prefixPathSplit /= path; + + return prefixPathSplit; +} + +TRunArgs GetRunArgs() { + + std::string database = std::getenv("YDB_DATABASE"); + std::string endpoint = std::getenv("YDB_ENDPOINT"); + + auto driverConfig = TDriverConfig() + .SetEndpoint(endpoint) + .SetDatabase(database) + .SetAuthToken(std::getenv("YDB_TOKEN") ? std::getenv("YDB_TOKEN") : ""); + + TDriver driver(driverConfig); + return {driver, JoinPath(database, "bulk")}; +} + +TStatus CreateTable(TTableClient& client, const std::string& table) { + TRetryOperationSettings settings; + auto status = client.RetryOperationSync([&table](TSession session) { + auto tableDesc = TTableBuilder() + .AddNullableColumn("App", EPrimitiveType::Utf8) + .AddNullableColumn("Timestamp", EPrimitiveType::Timestamp) + .AddNullableColumn("Host", EPrimitiveType::Utf8) + .AddNonNullableColumn("Id", EPrimitiveType::Uint64) + .AddNullableColumn("HttpCode", EPrimitiveType::Uint32) + .AddNullableColumn("Message", EPrimitiveType::Utf8) + .SetPrimaryKeyColumns({"App", "Timestamp", "Host", "Id"}) + .Build(); + + return session.CreateTable(table, std::move(tableDesc)).GetValueSync(); + }, settings); + + return status; +} + +TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, uint32_t lastNumber) { + logBatch.clear(); + uint32_t correctSumApp = 0; + uint32_t correctSumHost = 0; + uint32_t correctRowCount = 0; + + for (size_t i = 0; i < BATCH_SIZE; ++i) { + TLogMessage message; + message.Pk.Id = correctRowCount + lastNumber; + message.Pk.App = "App_" + std::to_string(logOffset % 10); + message.Pk.Host = "192.168.0." + std::to_string(logOffset % 11); + message.Pk.Timestamp = TInstant::Now() + TDuration::MilliSeconds(i % 1000); + message.HttpCode = 200; + message.Message = i % 2 ? "GET / HTTP/1.1" : "GET /images/logo.png HTTP/1.1"; + logBatch.emplace_back(message); + + correctSumApp += logOffset % 10; + correctSumHost += logOffset % 11; + ++correctRowCount; + + } + return {correctSumApp, correctSumHost, correctRowCount}; +} + +TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector& logBatch, + const TRetryOperationSettings& retrySettings) { + TValueBuilder rows; + rows.BeginList(); + for (const auto& message : logBatch) { + rows.AddListItem() + .BeginStruct() + .AddMember("Id").Uint64(message.Pk.Id) + .AddMember("App").Utf8(message.Pk.App) + .AddMember("Host").Utf8(message.Pk.Host) + .AddMember("Timestamp").Timestamp(message.Pk.Timestamp) + .AddMember("HttpCode").Uint32(message.HttpCode) + .AddMember("Message").Utf8(message.Message) + .EndStruct(); + } + rows.EndList(); + auto bulkUpsertOperation = [table, rowsValue = rows.Build()](TTableClient& tableClient) { + TValue r = rowsValue; + auto status = tableClient.BulkUpsert(table, std::move(r)); + return status.GetValueSync(); + }; + + auto status = tableClient.RetryOperationSync(bulkUpsertOperation, retrySettings); + return status; +} + +static TStatus SelectTransaction(TSession session, const std::string& path, + std::optional& resultSet) { + std::filesystem::path filesystemPath(path); + auto query = std::format(R"( + PRAGMA TablePathPrefix("{}"); + + SELECT + SUM(CAST(SUBSTRING(CAST(App as string), 4) as Int32)), + SUM(CAST(SUBSTRING(CAST(Host as string), 10) as Int32)), + COUNT(*) + FROM {} + )", filesystemPath.parent_path().string(), filesystemPath.filename().string()); + + auto txControl = + TTxControl::BeginTx(TTxSettings::SerializableRW()) + .CommitTx(); + + auto result = session.ExecuteDataQuery(query, txControl).GetValueSync(); + + if (result.IsSuccess()) { + resultSet = result.GetResultSet(0); + } + + return result; +} + +TStatistic Select(TTableClient& client, const std::string& path) { + std::optional resultSet; + ThrowOnError(client.RetryOperationSync([path, &resultSet](TSession session) { + return SelectTransaction(session, path, resultSet); + })); + + TResultSetParser parser(*resultSet); + + uint64_t sumApp = 0; + uint64_t sumHost = 0; + uint64_t rowCount = 0; + + if (parser.ColumnsCount() != 3 || parser.RowsCount() != 1) { + throw TYdbErrorException(TStatus(EStatus::GENERIC_ERROR, + {NYql::TIssue("The number of columns should be: 3.\nThe number of rows should be: 1")})); + } + + if (parser.TryNextRow()) { + sumApp = *parser.ColumnParser("column0").GetOptionalInt64(); + sumHost = *parser.ColumnParser("column1").GetOptionalInt64(); + rowCount = parser.ColumnParser("column2").GetUint64(); + } + + return {sumApp, sumHost, rowCount}; +} + +void DropTable(TTableClient& client, const std::string& path) { + ThrowOnError(client.RetryOperationSync([path](TSession session) { + return session.DropTable(path).ExtractValueSync(); + })); +} diff --git a/tests/integration/bulk_upsert_simple_it/bulk_upsert.h b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h new file mode 100644 index 0000000000..3fcaf7d634 --- /dev/null +++ b/tests/integration/bulk_upsert_simple_it/bulk_upsert.h @@ -0,0 +1,48 @@ +#pragma once + +#include +#include +#include + +using namespace NYdb; +using namespace NYdb::NTable; + +struct TRunArgs { + TDriver Driver; + std::string Path; +}; + +struct TLogMessage { + struct TPrimaryKeyLogMessage { + std::string App; + std::string Host; + TInstant Timestamp; + uint64_t Id; + }; + + TPrimaryKeyLogMessage Pk; + uint32_t HttpCode; + std::string Message; +}; + +class TYdbErrorException : public yexception { +public: + TYdbErrorException(const NYdb::TStatus& status) + : Status(status) {} + + NYdb::TStatus Status; +}; + +struct TStatistic { + uint64_t SumApp; + uint64_t SumHost; + uint64_t RowCount; +}; + +TRunArgs GetRunArgs(); +TStatus CreateTable(TTableClient& client, const std::string& table); +TStatistic GetLogBatch(uint64_t logOffset, std::vector& logBatch, uint32_t lastNumber); +TStatus WriteLogBatch(TTableClient& tableClient, const std::string& table, const std::vector& logBatch, + const TRetryOperationSettings& retrySettings); +TStatistic Select(TTableClient& client, const std::string& path); +void DropTable(TTableClient& client, const std::string& path); diff --git a/tests/integration/bulk_upsert_simple_it/main.cpp b/tests/integration/bulk_upsert_simple_it/main.cpp new file mode 100644 index 0000000000..3fee4a942a --- /dev/null +++ b/tests/integration/bulk_upsert_simple_it/main.cpp @@ -0,0 +1,52 @@ +#include "bulk_upsert.h" + +#include + +TEST(Integration, BulkUpsert) { + + uint32_t correctSumApp = 0; + uint32_t correctSumHost = 0; + uint32_t correctRowCount = 0; + + auto [driver, path] = GetRunArgs(); + + TTableClient client(driver); + uint32_t count = 1000; + TStatus statusCreate = CreateTable(client, path); + if (!statusCreate.IsSuccess()) { + FAIL() << "Create table failed with status: " << statusCreate << std::endl; + } + + TRetryOperationSettings writeRetrySettings; + writeRetrySettings + .Idempotent(true) + .MaxRetries(20); + + std::vector logBatch; + for (uint32_t offset = 0; offset < count; ++offset) { + + auto [batchSumApp, batchSumHost, batchRowCount] = GetLogBatch(offset, logBatch, correctRowCount); + correctSumApp += batchSumApp; + correctSumHost += batchSumHost; + correctRowCount += batchRowCount; + + TStatus statusWrite = WriteLogBatch(client, path, logBatch, writeRetrySettings); + if (!statusWrite.IsSuccess()) { + FAIL() << "Write failed with status: " << statusWrite << std::endl; + } + } + + try { + auto [sumApp, sumHost, rowCount] = Select(client, path); + EXPECT_EQ(rowCount, correctRowCount); + EXPECT_EQ(sumApp, correctSumApp); + EXPECT_EQ(sumHost, correctSumHost); + } catch (const TYdbErrorException& e) { + driver.Stop(true); + FAIL() << "Execution failed due to fatal error:\nStatus: " << ToString(e.Status.GetStatus()) << std::endl << e.Status.GetIssues().ToString(); + } + + DropTable(client, path); + driver.Stop(true); + +}