Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: vhost-server aio backend was incorrectly batching ios #248

Merged
merged 1 commit into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloud/blockstore/vhost-server/backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ struct IBackend: public IStartable

virtual vhd_bdev_info Init(const TOptions& options) = 0;
virtual void ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats) = 0;
virtual std::optional<TSimpleStats> GetCompletionStats(
Expand Down
37 changes: 24 additions & 13 deletions cloud/blockstore/vhost-server/backend_aio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class TAioBackend final: public IBackend
io_context_t Io = {};

ui32 BatchSize = 0;
TVector<iocb*> Batch;
TVector<TVector<iocb*>> Batches;

std::thread CompletionThread;

Expand All @@ -158,7 +158,10 @@ class TAioBackend final: public IBackend
vhd_bdev_info Init(const TOptions& options) override;
void Start() override;
void Stop() override;
void ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats) override;
void ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats) override;
std::optional<TSimpleStats> GetCompletionStats(TDuration timeout) override;

private:
Expand Down Expand Up @@ -186,7 +189,10 @@ vhd_bdev_info TAioBackend::Init(const TOptions& options)

Y_ABORT_UNLESS(io_setup(BatchSize, &Io) >= 0, "io_setup");

Batch.reserve(BatchSize);
for (ui32 i = 0; i < options.QueueCount; i++) {
Batches.emplace_back();
Batches.back().reserve(BatchSize);
}

EOpenMode flags =
EOpenModeFlag::OpenExisting | EOpenModeFlag::DirectAligned |
Expand Down Expand Up @@ -283,22 +289,27 @@ void TAioBackend::Stop()
Devices.clear();
}

void TAioBackend::ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats)
void TAioBackend::ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats)
{
int ret;

auto& batch = Batches[queueIndex];

for (;;) {
const TCpuCycles now = GetCycleCount();

// append new requests to the tail of the batch
queueStats.Dequeued += PrepareBatch(queue, Batch, now);
queueStats.Dequeued += PrepareBatch(queue, batch, now);

if (Batch.empty()) {
if (batch.empty()) {
break;
}

do {
ret = io_submit(Io, Batch.size(), Batch.data());
ret = io_submit(Io, batch.size(), batch.data());
} while (ret == -EINTR);

// kernel queue full, punt the re-submission to later event loop
Expand All @@ -314,16 +325,16 @@ void TAioBackend::ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStat

++queueStats.SubFailed;

if (Batch[0]->data) {
if (batch[0]->data) {
CompleteRequest(
Batch[0],
static_cast<TAioCompoundRequest*>(Batch[0]->data),
batch[0],
static_cast<TAioCompoundRequest*>(batch[0]->data),
VHD_BDEV_IOERR,
queueStats,
now);
} else {
CompleteRequest(
static_cast<TAioRequest*>(Batch[0]),
static_cast<TAioRequest*>(batch[0]),
VHD_BDEV_IOERR,
queueStats,
now);
Expand All @@ -337,10 +348,10 @@ void TAioBackend::ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStat
#ifndef NDEBUG
STORAGE_DEBUG(
"submitted " << ret << ": "
<< ToString(std::span(Batch.data(), ret)));
<< ToString(std::span(batch.data(), ret)));
#endif
// remove submitted items from the batch
Batch.erase(Batch.begin(), Batch.begin() + ret);
batch.erase(batch.begin(), batch.begin() + ret);
}
}

Expand Down
11 changes: 9 additions & 2 deletions cloud/blockstore/vhost-server/backend_null.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ class TNullBackend final: public IBackend
vhd_bdev_info Init(const TOptions& options) override;
void Start() override;
void Stop() override;
void ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats) override;
void ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats) override;
std::optional<TSimpleStats> GetCompletionStats(TDuration timeout) override;
};

Expand Down Expand Up @@ -60,8 +63,12 @@ void TNullBackend::Start()
void TNullBackend::Stop()
{}

void TNullBackend::ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats)
void TNullBackend::ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats)
{
Y_UNUSED(queueIndex);
Y_UNUSED(queueStats);

vhd_request req;
Expand Down
9 changes: 7 additions & 2 deletions cloud/blockstore/vhost-server/backend_rdma.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ class TRdmaBackend final: public IBackend
vhd_bdev_info Init(const TOptions& options) override;
void Start() override;
void Stop() override;
void ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats)
override;
void ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats) override;
std::optional<TSimpleStats> GetCompletionStats(TDuration timeout) override;

private:
Expand Down Expand Up @@ -179,9 +181,12 @@ void TRdmaBackend::Stop()
}

void TRdmaBackend::ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats)
{
Y_UNUSED(queueIndex);

vhd_request req;
while (vhd_dequeue_request(queue, &req)) {
++queueStats.Dequeued;
Expand Down
2 changes: 1 addition & 1 deletion cloud/blockstore/vhost-server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ void TServer::QueueThreadFunc(ui32 queueIndex)
break;
}

Backend->ProcessQueue(queue, queueStats);
Backend->ProcessQueue(queueIndex, queue, queueStats);

SyncQueueStats(queueIndex, queueStats);
}
Expand Down
53 changes: 53 additions & 0 deletions cloud/blockstore/vhost-server/server_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <cloud/contrib/vhost/virtio/virtio_blk_spec.h>

#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/threading/future/subscription/wait_all.h>

#include <util/generic/size_literals.h>
#include <util/system/file.h>
Expand Down Expand Up @@ -386,4 +387,56 @@ Y_UNIT_TEST_SUITE(TServerTest)
}
}
}

Y_UNIT_TEST_F(ShouldHandleMutlipleQueues, TFixture)
{
StartServer();

const ui32 requestCount = 10;
const ui64 sectorsPerChunk = ChunkByteCount / SectorSize;
const ui64 sectorCount = sectorsPerChunk * ChunkCount;

TVector<std::span<char>> statuses;
TVector<NThreading::TFuture<ui32>> futures;

for (ui64 i = 0; i != requestCount; ++i) {
std::span hdr = Hdr(Memory, {
.type = VIRTIO_BLK_T_OUT,
.sector = i % sectorCount
});
std::span sector = Memory.Allocate(SectorSize);
std::span status = Memory.Allocate(1);

UNIT_ASSERT_VALUES_EQUAL(SectorSize, sector.size());
UNIT_ASSERT_VALUES_EQUAL(1, status.size());

memset(sector.data(), 'A' + i % 26, sector.size_bytes());

statuses.push_back(status);
futures.push_back(Client.WriteAsync(
i % QueueCount,
{ hdr, sector },
{ status }
));
}

WaitAll(futures).Wait();

TSimpleStats prevStats;
auto stats = Server->GetStats(prevStats);

UNIT_ASSERT_VALUES_EQUAL(requestCount, stats.Submitted);
UNIT_ASSERT_VALUES_EQUAL(requestCount, stats.Completed);
UNIT_ASSERT_VALUES_EQUAL(0, stats.CompFailed);
UNIT_ASSERT_VALUES_EQUAL(0, stats.SubFailed);

for (ui32 i = 0; i != requestCount; ++i) {
const ui32 len = futures[i].GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(
statuses[i].size(), len,
sectorsPerChunk << " | " << i);
UNIT_ASSERT_VALUES_EQUAL(0, statuses[i][0]);
}
}
}
Loading