diff --git a/cloud/blockstore/vhost-server/backend.h b/cloud/blockstore/vhost-server/backend.h index 165c95801bc..2b339738d4a 100644 --- a/cloud/blockstore/vhost-server/backend.h +++ b/cloud/blockstore/vhost-server/backend.h @@ -16,6 +16,7 @@ struct IBackend: public IStartable virtual vhd_bdev_info Init(const TOptions& options) = 0; virtual void ProcessQueue( + ui32 queueIndex, vhd_request_queue* queue, TSimpleStats& queueStats) = 0; virtual std::optional GetCompletionStats( diff --git a/cloud/blockstore/vhost-server/backend_aio.cpp b/cloud/blockstore/vhost-server/backend_aio.cpp index b5174a244f2..07ff6f0317f 100644 --- a/cloud/blockstore/vhost-server/backend_aio.cpp +++ b/cloud/blockstore/vhost-server/backend_aio.cpp @@ -146,7 +146,7 @@ class TAioBackend final: public IBackend io_context_t Io = {}; ui32 BatchSize = 0; - TVector Batch; + TVector> Batches; std::thread CompletionThread; @@ -158,7 +158,10 @@ class TAioBackend final: public IBackend vhd_bdev_info Init(const TOptions& options) override; void Start() override; void Stop() override; - void ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats) override; + void ProcessQueue( + ui32 queueIndex, + vhd_request_queue* queue, + TSimpleStats& queueStats) override; std::optional GetCompletionStats(TDuration timeout) override; private: @@ -186,7 +189,10 @@ vhd_bdev_info TAioBackend::Init(const TOptions& options) Y_ABORT_UNLESS(io_setup(BatchSize, &Io) >= 0, "io_setup"); - Batch.reserve(BatchSize); + for (ui32 i = 0; i < options.QueueCount; i++) { + Batches.emplace_back(); + Batches.back().reserve(BatchSize); + } EOpenMode flags = EOpenModeFlag::OpenExisting | EOpenModeFlag::DirectAligned | @@ -283,22 +289,27 @@ void TAioBackend::Stop() Devices.clear(); } -void TAioBackend::ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats) +void TAioBackend::ProcessQueue( + ui32 queueIndex, + vhd_request_queue* queue, + TSimpleStats& queueStats) { int ret; + auto& batch = Batches[queueIndex]; + for (;;) { const TCpuCycles now = GetCycleCount(); // append new requests to the tail of the batch - queueStats.Dequeued += PrepareBatch(queue, Batch, now); + queueStats.Dequeued += PrepareBatch(queue, batch, now); - if (Batch.empty()) { + if (batch.empty()) { break; } do { - ret = io_submit(Io, Batch.size(), Batch.data()); + ret = io_submit(Io, batch.size(), batch.data()); } while (ret == -EINTR); // kernel queue full, punt the re-submission to later event loop @@ -314,16 +325,16 @@ void TAioBackend::ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStat ++queueStats.SubFailed; - if (Batch[0]->data) { + if (batch[0]->data) { CompleteRequest( - Batch[0], - static_cast(Batch[0]->data), + batch[0], + static_cast(batch[0]->data), VHD_BDEV_IOERR, queueStats, now); } else { CompleteRequest( - static_cast(Batch[0]), + static_cast(batch[0]), VHD_BDEV_IOERR, queueStats, now); @@ -337,10 +348,10 @@ void TAioBackend::ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStat #ifndef NDEBUG STORAGE_DEBUG( "submitted " << ret << ": " - << ToString(std::span(Batch.data(), ret))); + << ToString(std::span(batch.data(), ret))); #endif // remove submitted items from the batch - Batch.erase(Batch.begin(), Batch.begin() + ret); + batch.erase(batch.begin(), batch.begin() + ret); } } diff --git a/cloud/blockstore/vhost-server/backend_null.cpp b/cloud/blockstore/vhost-server/backend_null.cpp index 6134113e4ba..2ba21e17907 100644 --- a/cloud/blockstore/vhost-server/backend_null.cpp +++ b/cloud/blockstore/vhost-server/backend_null.cpp @@ -23,7 +23,10 @@ class TNullBackend final: public IBackend vhd_bdev_info Init(const TOptions& options) override; void Start() override; void Stop() override; - void ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats) override; + void ProcessQueue( + ui32 queueIndex, + vhd_request_queue* queue, + TSimpleStats& queueStats) override; std::optional GetCompletionStats(TDuration timeout) override; }; @@ -60,8 +63,12 @@ void TNullBackend::Start() void TNullBackend::Stop() {} -void TNullBackend::ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats) +void TNullBackend::ProcessQueue( + ui32 queueIndex, + vhd_request_queue* queue, + TSimpleStats& queueStats) { + Y_UNUSED(queueIndex); Y_UNUSED(queueStats); vhd_request req; diff --git a/cloud/blockstore/vhost-server/backend_rdma.cpp b/cloud/blockstore/vhost-server/backend_rdma.cpp index 63b18559a3b..dbb8d082738 100644 --- a/cloud/blockstore/vhost-server/backend_rdma.cpp +++ b/cloud/blockstore/vhost-server/backend_rdma.cpp @@ -49,8 +49,10 @@ class TRdmaBackend final: public IBackend vhd_bdev_info Init(const TOptions& options) override; void Start() override; void Stop() override; - void ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats) - override; + void ProcessQueue( + ui32 queueIndex, + vhd_request_queue* queue, + TSimpleStats& queueStats) override; std::optional GetCompletionStats(TDuration timeout) override; private: @@ -179,9 +181,12 @@ void TRdmaBackend::Stop() } void TRdmaBackend::ProcessQueue( + ui32 queueIndex, vhd_request_queue* queue, TSimpleStats& queueStats) { + Y_UNUSED(queueIndex); + vhd_request req; while (vhd_dequeue_request(queue, &req)) { ++queueStats.Dequeued; diff --git a/cloud/blockstore/vhost-server/server.cpp b/cloud/blockstore/vhost-server/server.cpp index a233a90f8e7..100eec40dc6 100644 --- a/cloud/blockstore/vhost-server/server.cpp +++ b/cloud/blockstore/vhost-server/server.cpp @@ -251,7 +251,7 @@ void TServer::QueueThreadFunc(ui32 queueIndex) break; } - Backend->ProcessQueue(queue, queueStats); + Backend->ProcessQueue(queueIndex, queue, queueStats); SyncQueueStats(queueIndex, queueStats); } diff --git a/cloud/blockstore/vhost-server/server_ut.cpp b/cloud/blockstore/vhost-server/server_ut.cpp index cb0a8525964..9d472a8a49a 100644 --- a/cloud/blockstore/vhost-server/server_ut.cpp +++ b/cloud/blockstore/vhost-server/server_ut.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -386,4 +387,56 @@ Y_UNIT_TEST_SUITE(TServerTest) } } } + + Y_UNIT_TEST_F(ShouldHandleMutlipleQueues, TFixture) + { + StartServer(); + + const ui32 requestCount = 10; + const ui64 sectorsPerChunk = ChunkByteCount / SectorSize; + const ui64 sectorCount = sectorsPerChunk * ChunkCount; + + TVector> statuses; + TVector> futures; + + for (ui64 i = 0; i != requestCount; ++i) { + std::span hdr = Hdr(Memory, { + .type = VIRTIO_BLK_T_OUT, + .sector = i % sectorCount + }); + std::span sector = Memory.Allocate(SectorSize); + std::span status = Memory.Allocate(1); + + UNIT_ASSERT_VALUES_EQUAL(SectorSize, sector.size()); + UNIT_ASSERT_VALUES_EQUAL(1, status.size()); + + memset(sector.data(), 'A' + i % 26, sector.size_bytes()); + + statuses.push_back(status); + futures.push_back(Client.WriteAsync( + i % QueueCount, + { hdr, sector }, + { status } + )); + } + + WaitAll(futures).Wait(); + + TSimpleStats prevStats; + auto stats = Server->GetStats(prevStats); + + UNIT_ASSERT_VALUES_EQUAL(requestCount, stats.Submitted); + UNIT_ASSERT_VALUES_EQUAL(requestCount, stats.Completed); + UNIT_ASSERT_VALUES_EQUAL(0, stats.CompFailed); + UNIT_ASSERT_VALUES_EQUAL(0, stats.SubFailed); + + for (ui32 i = 0; i != requestCount; ++i) { + const ui32 len = futures[i].GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C( + statuses[i].size(), len, + sectorsPerChunk << " | " << i); + UNIT_ASSERT_VALUES_EQUAL(0, statuses[i][0]); + } + } }