Skip to content

Commit

Permalink
issue-1541: implemented a ring buffer over a file to use it in Handle…
Browse files Browse the repository at this point in the history
…OpsQueue (#2047)

* issue-1541: ring buffer over a file

* issue-1541: TFileRingBufferTest::Should{Restore,Validate} + fixes

* issue-1541: randomized test for TFileRingBuffer + fixes

* issue-1541: moved TFileRingBuffer implementation to .cpp

* issue-1541: minor rename
  • Loading branch information
qkrorlqr authored Sep 19, 2024
1 parent fdf1cd3 commit cd838f8
Show file tree
Hide file tree
Showing 7 changed files with 704 additions and 14 deletions.
312 changes: 312 additions & 0 deletions cloud/filestore/libs/vfs_fuse/file_ring_buffer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
#include "file_ring_buffer.h"

#include <library/cpp/digest/crc32c/crc32c.h>

#include <util/generic/size_literals.h>
#include <util/stream/mem.h>
#include <util/system/filemap.h>

#include <functional>

namespace NCloud::NFileStore {

namespace {

////////////////////////////////////////////////////////////////////////////////

constexpr ui32 VERSION = 1;
constexpr ui32 INVALID_POS = Max<ui32>();
constexpr TStringBuf INVALID_MARKER = "invalid_entry_marker";

////////////////////////////////////////////////////////////////////////////////

struct THeader
{
ui32 Version = 0;
ui32 Capacity = 0;
ui32 ReadPos = 0;
ui32 WritePos = 0;
};

struct TEntryHeader
{
ui32 Size = 0;
ui32 Checksum = 0;
};

void WriteEntry(IOutputStream& os, TStringBuf data)
{
TEntryHeader eh;
eh.Size = data.Size();
eh.Checksum = Crc32c(data.Data(), data.Size());
os.Write(&eh, sizeof(eh));
os.Write(data);
}

} // namespace

////////////////////////////////////////////////////////////////////////////////

class TFileRingBuffer::TImpl
{
private:
TFileMap Map;
const ui32 MaxEntrySize;

char* Data = nullptr;
ui32 Count = 0;
const char* End = nullptr;

private:
THeader* Header()
{
return reinterpret_cast<THeader*>(Map.Ptr());
}

const THeader* Header() const
{
return reinterpret_cast<THeader*>(Map.Ptr());
}

void SkipSlackSpace()
{
if (Header()->ReadPos == Header()->WritePos) {
Header()->ReadPos = 0;
Header()->WritePos = 0;
return;
}

const auto* b = Data + Header()->ReadPos;
const auto* eh = reinterpret_cast<const TEntryHeader*>(b);
if (eh->Size == 0) {
Header()->ReadPos = 0;
}
}

using TVisitor = std::function<void(ui32 checksum, TStringBuf entry)>;

ui32 VisitEntry(const TVisitor& visitor, ui32 pos) const
{
const auto* b = Data + pos;
if (b + sizeof(TEntryHeader) > End) {
visitor(0, INVALID_MARKER);
return INVALID_POS;
}

const auto* eh = reinterpret_cast<const TEntryHeader*>(b);
if (eh->Size == 0) {
return 0;
}

TStringBuf entry(b + sizeof(TEntryHeader), eh->Size);
if (entry.Data() + entry.Size() > End) {
visitor(eh->Checksum, INVALID_MARKER);
return INVALID_POS;
}
visitor(eh->Checksum, {b + sizeof(TEntryHeader), eh->Size});
return pos + sizeof(TEntryHeader) + eh->Size;
}

void Visit(const TVisitor& visitor) const
{
ui32 pos = Header()->ReadPos;
while (pos > Header()->WritePos && pos != INVALID_POS) {
pos = VisitEntry(visitor, pos);
}

while (pos < Header()->WritePos && pos != INVALID_POS) {
pos = VisitEntry(visitor, pos);
if (!pos) {
// can happen if the buffer is corrupted
break;
}
}
}

public:
TImpl(const TString& filePath, ui32 capacity, ui32 maxEntrySize)
: Map(filePath, TMemoryMapCommon::oRdWr)
, MaxEntrySize(maxEntrySize)
{
Y_ABORT_UNLESS(MaxEntrySize + sizeof(TEntryHeader) <= capacity);

const ui32 realSize = sizeof(THeader) + capacity;
if (Map.Length() < realSize) {
Map.ResizeAndRemap(0, realSize);
} else {
Map.Map(0, realSize);
}

if (Header()->Version) {
Y_ABORT_UNLESS(Header()->Version == VERSION);
Y_ABORT_UNLESS(Header()->Capacity == capacity);
} else {
Header()->Capacity = capacity;
Header()->Version = VERSION;
}

Data = static_cast<char*>(Map.Ptr()) + sizeof(THeader);
End = Data + capacity;

SkipSlackSpace();
Visit([this] (ui32 checksum, TStringBuf entry) {
Y_UNUSED(checksum);
Y_UNUSED(entry);
++Count;
});
}

public:
bool Push(TStringBuf data)
{
if (data.Empty() || data.Size() > MaxEntrySize) {
return false;
}

const auto sz = data.Size() + sizeof(TEntryHeader);
auto* ptr = Data + Header()->WritePos;

if (!Empty()) {
// checking that we have a contiguous chunk of sz + 1 bytes
// 1 extra byte is needed to distinguish between an empty buffer
// and a buffer which is completely full
if (Header()->ReadPos < Header()->WritePos) {
// we have a single contiguous occupied region
ui32 freeSpace = Header()->Capacity - Header()->WritePos;
if (freeSpace <= sz) {
if (Header()->ReadPos <= sz) {
// out of space
return false;
}

memset(ptr, 0, freeSpace);
ptr = Data;
}
} else {
// we have two occupied regions
ui32 freeSpace = Header()->ReadPos - Header()->WritePos;
if (freeSpace <= sz) {
// out of space
return false;
}
}
}

TMemoryOutput mo(ptr, sz);
WriteEntry(mo, data);

Header()->WritePos = ptr - Data + sz;
++Count;

return true;
}

TStringBuf Front() const
{
if (Empty()) {
return {};
}

const auto* b = Data + Header()->ReadPos;
if (b + sizeof(TEntryHeader) > End) {
// corruption
// TODO: report?
return {};
}

const auto* eh = reinterpret_cast<const TEntryHeader*>(b);
TStringBuf result{b + sizeof(TEntryHeader), eh->Size};
if (result.Data() + result.Size() > End) {
// corruption
// TODO: report?
return {};
}

return result;
}

void Pop()
{
auto data = Front();
if (!data) {
return;
}

Header()->ReadPos += sizeof(TEntryHeader) + data.Size();
--Count;

SkipSlackSpace();
}

ui32 Size() const
{
return Count;
}

bool Empty() const
{
const bool result = Header()->ReadPos == Header()->WritePos;
Y_DEBUG_ABORT_UNLESS(result == (Count == 0));
return result;
}

auto Validate() const
{
TVector<TBrokenFileRingBufferEntry> entries;

Visit([&] (ui32 checksum, TStringBuf entry) {
const ui32 actualChecksum = Crc32c(entry.Data(), entry.Size());
if (actualChecksum != checksum) {
entries.push_back({
TString(entry),
checksum,
actualChecksum});
}
});

return entries;
}
};

////////////////////////////////////////////////////////////////////////////////

TFileRingBuffer::TFileRingBuffer(
const TString& filePath,
ui32 capacity,
ui32 maxEntrySize)
: Impl(new TImpl(filePath, capacity, maxEntrySize))
{}

TFileRingBuffer::~TFileRingBuffer() = default;

bool TFileRingBuffer::Push(TStringBuf data)
{
return Impl->Push(data);
}

TStringBuf TFileRingBuffer::Front() const
{
return Impl->Front();
}

void TFileRingBuffer::Pop()
{
Impl->Pop();
}

ui32 TFileRingBuffer::Size() const
{
return Impl->Size();
}

bool TFileRingBuffer::Empty() const
{
return Impl->Empty();
}

TVector<TBrokenFileRingBufferEntry> TFileRingBuffer::Validate() const
{
return Impl->Validate();
}

} // namespace NCloud::NFileStore
36 changes: 36 additions & 0 deletions cloud/filestore/libs/vfs_fuse/file_ring_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#pragma once

#include <util/generic/string.h>
#include <util/generic/vector.h>

namespace NCloud::NFileStore {

////////////////////////////////////////////////////////////////////////////////

struct TBrokenFileRingBufferEntry
{
TString Data;
ui32 ExpectedChecksum = 0;
ui32 ActualChecksum = 0;
};

class TFileRingBuffer
{
private:
class TImpl;
std::unique_ptr<TImpl> Impl;

public:
TFileRingBuffer(const TString& filePath, ui32 capacity, ui32 maxEntrySize);
~TFileRingBuffer();

public:
bool Push(TStringBuf data);
TStringBuf Front() const;
void Pop();
ui32 Size() const;
bool Empty() const;
TVector<TBrokenFileRingBufferEntry> Validate() const;
};

} // namespace NCloud::NFileStore
Loading

0 comments on commit cd838f8

Please sign in to comment.