Skip to content

Commit

Permalink
fix: vhost-server aio backend was incorrectly batching ios (#248) (#251)
Browse files Browse the repository at this point in the history
After refactoring the `Batch` array of iocb was incorrectly shared between
different vhost queues. This lead to memory corruption which was not detected
since tests were running with only 1 vhost queue configured.

```
qemu ... -device vhost-user-blk-pci,chardev=vhost0,id=vhost-user-blk0,num-queues=1
```
  • Loading branch information
budevg committed Jan 26, 2024
1 parent 5a49fab commit 3ffd1e2
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 18 deletions.
1 change: 1 addition & 0 deletions cloud/blockstore/vhost-server/backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ struct IBackend: public IStartable

virtual vhd_bdev_info Init(const TOptions& options) = 0;
virtual void ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats) = 0;
virtual std::optional<TSimpleStats> GetCompletionStats(
Expand Down
37 changes: 24 additions & 13 deletions cloud/blockstore/vhost-server/backend_aio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class TAioBackend final: public IBackend
io_context_t Io = {};

ui32 BatchSize = 0;
TVector<iocb*> Batch;
TVector<TVector<iocb*>> Batches;

std::thread CompletionThread;

Expand All @@ -158,7 +158,10 @@ class TAioBackend final: public IBackend
vhd_bdev_info Init(const TOptions& options) override;
void Start() override;
void Stop() override;
void ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats) override;
void ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats) override;
std::optional<TSimpleStats> GetCompletionStats(TDuration timeout) override;

private:
Expand Down Expand Up @@ -186,7 +189,10 @@ vhd_bdev_info TAioBackend::Init(const TOptions& options)

Y_ABORT_UNLESS(io_setup(BatchSize, &Io) >= 0, "io_setup");

Batch.reserve(BatchSize);
for (ui32 i = 0; i < options.QueueCount; i++) {
Batches.emplace_back();
Batches.back().reserve(BatchSize);
}

EOpenMode flags =
EOpenModeFlag::OpenExisting | EOpenModeFlag::DirectAligned |
Expand Down Expand Up @@ -283,22 +289,27 @@ void TAioBackend::Stop()
Devices.clear();
}

void TAioBackend::ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats)
void TAioBackend::ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats)
{
int ret;

auto& batch = Batches[queueIndex];

for (;;) {
const TCpuCycles now = GetCycleCount();

// append new requests to the tail of the batch
queueStats.Dequeued += PrepareBatch(queue, Batch, now);
queueStats.Dequeued += PrepareBatch(queue, batch, now);

if (Batch.empty()) {
if (batch.empty()) {
break;
}

do {
ret = io_submit(Io, Batch.size(), Batch.data());
ret = io_submit(Io, batch.size(), batch.data());
} while (ret == -EINTR);

// kernel queue full, punt the re-submission to later event loop
Expand All @@ -314,16 +325,16 @@ void TAioBackend::ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStat

++queueStats.SubFailed;

if (Batch[0]->data) {
if (batch[0]->data) {
CompleteRequest(
Batch[0],
static_cast<TAioCompoundRequest*>(Batch[0]->data),
batch[0],
static_cast<TAioCompoundRequest*>(batch[0]->data),
VHD_BDEV_IOERR,
queueStats,
now);
} else {
CompleteRequest(
static_cast<TAioRequest*>(Batch[0]),
static_cast<TAioRequest*>(batch[0]),
VHD_BDEV_IOERR,
queueStats,
now);
Expand All @@ -337,10 +348,10 @@ void TAioBackend::ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStat
#ifndef NDEBUG
STORAGE_DEBUG(
"submitted " << ret << ": "
<< ToString(std::span(Batch.data(), ret)));
<< ToString(std::span(batch.data(), ret)));
#endif
// remove submitted items from the batch
Batch.erase(Batch.begin(), Batch.begin() + ret);
batch.erase(batch.begin(), batch.begin() + ret);
}
}

Expand Down
11 changes: 9 additions & 2 deletions cloud/blockstore/vhost-server/backend_null.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ class TNullBackend final: public IBackend
vhd_bdev_info Init(const TOptions& options) override;
void Start() override;
void Stop() override;
void ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats) override;
void ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats) override;
std::optional<TSimpleStats> GetCompletionStats(TDuration timeout) override;
};

Expand Down Expand Up @@ -60,8 +63,12 @@ void TNullBackend::Start()
void TNullBackend::Stop()
{}

void TNullBackend::ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats)
void TNullBackend::ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats)
{
Y_UNUSED(queueIndex);
Y_UNUSED(queueStats);

vhd_request req;
Expand Down
9 changes: 7 additions & 2 deletions cloud/blockstore/vhost-server/backend_rdma.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ class TRdmaBackend final: public IBackend
vhd_bdev_info Init(const TOptions& options) override;
void Start() override;
void Stop() override;
void ProcessQueue(vhd_request_queue* queue, TSimpleStats& queueStats)
override;
void ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats) override;
std::optional<TSimpleStats> GetCompletionStats(TDuration timeout) override;

private:
Expand Down Expand Up @@ -179,9 +181,12 @@ void TRdmaBackend::Stop()
}

void TRdmaBackend::ProcessQueue(
ui32 queueIndex,
vhd_request_queue* queue,
TSimpleStats& queueStats)
{
Y_UNUSED(queueIndex);

vhd_request req;
while (vhd_dequeue_request(queue, &req)) {
++queueStats.Dequeued;
Expand Down
2 changes: 1 addition & 1 deletion cloud/blockstore/vhost-server/server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ void TServer::QueueThreadFunc(ui32 queueIndex)
break;
}

Backend->ProcessQueue(queue, queueStats);
Backend->ProcessQueue(queueIndex, queue, queueStats);

SyncQueueStats(queueIndex, queueStats);
}
Expand Down
53 changes: 53 additions & 0 deletions cloud/blockstore/vhost-server/server_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <cloud/contrib/vhost/virtio/virtio_blk_spec.h>

#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/threading/future/subscription/wait_all.h>

#include <util/generic/size_literals.h>
#include <util/system/file.h>
Expand Down Expand Up @@ -386,4 +387,56 @@ Y_UNIT_TEST_SUITE(TServerTest)
}
}
}

Y_UNIT_TEST_F(ShouldHandleMutlipleQueues, TFixture)
{
StartServer();

const ui32 requestCount = 10;
const ui64 sectorsPerChunk = ChunkByteCount / SectorSize;
const ui64 sectorCount = sectorsPerChunk * ChunkCount;

TVector<std::span<char>> statuses;
TVector<NThreading::TFuture<ui32>> futures;

for (ui64 i = 0; i != requestCount; ++i) {
std::span hdr = Hdr(Memory, {
.type = VIRTIO_BLK_T_OUT,
.sector = i % sectorCount
});
std::span sector = Memory.Allocate(SectorSize);
std::span status = Memory.Allocate(1);

UNIT_ASSERT_VALUES_EQUAL(SectorSize, sector.size());
UNIT_ASSERT_VALUES_EQUAL(1, status.size());

memset(sector.data(), 'A' + i % 26, sector.size_bytes());

statuses.push_back(status);
futures.push_back(Client.WriteAsync(
i % QueueCount,
{ hdr, sector },
{ status }
));
}

WaitAll(futures).Wait();

TSimpleStats prevStats;
auto stats = Server->GetStats(prevStats);

UNIT_ASSERT_VALUES_EQUAL(requestCount, stats.Submitted);
UNIT_ASSERT_VALUES_EQUAL(requestCount, stats.Completed);
UNIT_ASSERT_VALUES_EQUAL(0, stats.CompFailed);
UNIT_ASSERT_VALUES_EQUAL(0, stats.SubFailed);

for (ui32 i = 0; i != requestCount; ++i) {
const ui32 len = futures[i].GetValueSync();

UNIT_ASSERT_VALUES_EQUAL_C(
statuses[i].size(), len,
sectorsPerChunk << " | " << i);
UNIT_ASSERT_VALUES_EQUAL(0, statuses[i][0]);
}
}
}

0 comments on commit 3ffd1e2

Please sign in to comment.