From cd838f87416a1c78b929be11cb22a19e64cd3f0c Mon Sep 17 00:00:00 2001 From: Andrei Strelkovskii Date: Thu, 19 Sep 2024 19:38:23 +0300 Subject: [PATCH] issue-1541: implemented a ring buffer over a file to use it in HandleOpsQueue (#2047) * issue-1541: ring buffer over a file * issue-1541: TFileRingBufferTest::Should{Restore,Validate} + fixes * issue-1541: randomized test for TFileRingBuffer + fixes * issue-1541: moved TFileRingBuffer implementation to .cpp * issue-1541: minor rename --- .../libs/vfs_fuse/file_ring_buffer.cpp | 312 ++++++++++++++++ .../libs/vfs_fuse/file_ring_buffer.h | 36 ++ .../libs/vfs_fuse/file_ring_buffer_ut.cpp | 338 ++++++++++++++++++ .../libs/vfs_fuse/handle_ops_queue.cpp | 16 +- .../libs/vfs_fuse/handle_ops_queue.h | 14 +- cloud/filestore/libs/vfs_fuse/ut/ya.make | 1 + cloud/filestore/libs/vfs_fuse/ya.make.inc | 1 + 7 files changed, 704 insertions(+), 14 deletions(-) create mode 100644 cloud/filestore/libs/vfs_fuse/file_ring_buffer.cpp create mode 100644 cloud/filestore/libs/vfs_fuse/file_ring_buffer.h create mode 100644 cloud/filestore/libs/vfs_fuse/file_ring_buffer_ut.cpp diff --git a/cloud/filestore/libs/vfs_fuse/file_ring_buffer.cpp b/cloud/filestore/libs/vfs_fuse/file_ring_buffer.cpp new file mode 100644 index 00000000000..25282617f9b --- /dev/null +++ b/cloud/filestore/libs/vfs_fuse/file_ring_buffer.cpp @@ -0,0 +1,312 @@ +#include "file_ring_buffer.h" + +#include + +#include +#include +#include + +#include + +namespace NCloud::NFileStore { + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +constexpr ui32 VERSION = 1; +constexpr ui32 INVALID_POS = Max(); +constexpr TStringBuf INVALID_MARKER = "invalid_entry_marker"; + +//////////////////////////////////////////////////////////////////////////////// + +struct THeader +{ + ui32 Version = 0; + ui32 Capacity = 0; + ui32 ReadPos = 0; + ui32 WritePos = 0; +}; + +struct TEntryHeader +{ + ui32 Size = 0; + ui32 Checksum = 0; +}; + +void WriteEntry(IOutputStream& os, TStringBuf data) +{ + TEntryHeader eh; + eh.Size = data.Size(); + eh.Checksum = Crc32c(data.Data(), data.Size()); + os.Write(&eh, sizeof(eh)); + os.Write(data); +} + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +class TFileRingBuffer::TImpl +{ +private: + TFileMap Map; + const ui32 MaxEntrySize; + + char* Data = nullptr; + ui32 Count = 0; + const char* End = nullptr; + +private: + THeader* Header() + { + return reinterpret_cast(Map.Ptr()); + } + + const THeader* Header() const + { + return reinterpret_cast(Map.Ptr()); + } + + void SkipSlackSpace() + { + if (Header()->ReadPos == Header()->WritePos) { + Header()->ReadPos = 0; + Header()->WritePos = 0; + return; + } + + const auto* b = Data + Header()->ReadPos; + const auto* eh = reinterpret_cast(b); + if (eh->Size == 0) { + Header()->ReadPos = 0; + } + } + + using TVisitor = std::function; + + ui32 VisitEntry(const TVisitor& visitor, ui32 pos) const + { + const auto* b = Data + pos; + if (b + sizeof(TEntryHeader) > End) { + visitor(0, INVALID_MARKER); + return INVALID_POS; + } + + const auto* eh = reinterpret_cast(b); + if (eh->Size == 0) { + return 0; + } + + TStringBuf entry(b + sizeof(TEntryHeader), eh->Size); + if (entry.Data() + entry.Size() > End) { + visitor(eh->Checksum, INVALID_MARKER); + return INVALID_POS; + } + visitor(eh->Checksum, {b + sizeof(TEntryHeader), eh->Size}); + return pos + sizeof(TEntryHeader) + eh->Size; + } + + void Visit(const TVisitor& visitor) const + { + ui32 pos = Header()->ReadPos; + while (pos > Header()->WritePos && pos != INVALID_POS) { + pos = VisitEntry(visitor, pos); + } + + while (pos < Header()->WritePos && pos != INVALID_POS) { + pos = VisitEntry(visitor, pos); + if (!pos) { + // can happen if the buffer is corrupted + break; + } + } + } + +public: + TImpl(const TString& filePath, ui32 capacity, ui32 maxEntrySize) + : Map(filePath, TMemoryMapCommon::oRdWr) + , MaxEntrySize(maxEntrySize) + { + Y_ABORT_UNLESS(MaxEntrySize + sizeof(TEntryHeader) <= capacity); + + const ui32 realSize = sizeof(THeader) + capacity; + if (Map.Length() < realSize) { + Map.ResizeAndRemap(0, realSize); + } else { + Map.Map(0, realSize); + } + + if (Header()->Version) { + Y_ABORT_UNLESS(Header()->Version == VERSION); + Y_ABORT_UNLESS(Header()->Capacity == capacity); + } else { + Header()->Capacity = capacity; + Header()->Version = VERSION; + } + + Data = static_cast(Map.Ptr()) + sizeof(THeader); + End = Data + capacity; + + SkipSlackSpace(); + Visit([this] (ui32 checksum, TStringBuf entry) { + Y_UNUSED(checksum); + Y_UNUSED(entry); + ++Count; + }); + } + +public: + bool Push(TStringBuf data) + { + if (data.Empty() || data.Size() > MaxEntrySize) { + return false; + } + + const auto sz = data.Size() + sizeof(TEntryHeader); + auto* ptr = Data + Header()->WritePos; + + if (!Empty()) { + // checking that we have a contiguous chunk of sz + 1 bytes + // 1 extra byte is needed to distinguish between an empty buffer + // and a buffer which is completely full + if (Header()->ReadPos < Header()->WritePos) { + // we have a single contiguous occupied region + ui32 freeSpace = Header()->Capacity - Header()->WritePos; + if (freeSpace <= sz) { + if (Header()->ReadPos <= sz) { + // out of space + return false; + } + + memset(ptr, 0, freeSpace); + ptr = Data; + } + } else { + // we have two occupied regions + ui32 freeSpace = Header()->ReadPos - Header()->WritePos; + if (freeSpace <= sz) { + // out of space + return false; + } + } + } + + TMemoryOutput mo(ptr, sz); + WriteEntry(mo, data); + + Header()->WritePos = ptr - Data + sz; + ++Count; + + return true; + } + + TStringBuf Front() const + { + if (Empty()) { + return {}; + } + + const auto* b = Data + Header()->ReadPos; + if (b + sizeof(TEntryHeader) > End) { + // corruption + // TODO: report? + return {}; + } + + const auto* eh = reinterpret_cast(b); + TStringBuf result{b + sizeof(TEntryHeader), eh->Size}; + if (result.Data() + result.Size() > End) { + // corruption + // TODO: report? + return {}; + } + + return result; + } + + void Pop() + { + auto data = Front(); + if (!data) { + return; + } + + Header()->ReadPos += sizeof(TEntryHeader) + data.Size(); + --Count; + + SkipSlackSpace(); + } + + ui32 Size() const + { + return Count; + } + + bool Empty() const + { + const bool result = Header()->ReadPos == Header()->WritePos; + Y_DEBUG_ABORT_UNLESS(result == (Count == 0)); + return result; + } + + auto Validate() const + { + TVector entries; + + Visit([&] (ui32 checksum, TStringBuf entry) { + const ui32 actualChecksum = Crc32c(entry.Data(), entry.Size()); + if (actualChecksum != checksum) { + entries.push_back({ + TString(entry), + checksum, + actualChecksum}); + } + }); + + return entries; + } +}; + +//////////////////////////////////////////////////////////////////////////////// + +TFileRingBuffer::TFileRingBuffer( + const TString& filePath, + ui32 capacity, + ui32 maxEntrySize) + : Impl(new TImpl(filePath, capacity, maxEntrySize)) +{} + +TFileRingBuffer::~TFileRingBuffer() = default; + +bool TFileRingBuffer::Push(TStringBuf data) +{ + return Impl->Push(data); +} + +TStringBuf TFileRingBuffer::Front() const +{ + return Impl->Front(); +} + +void TFileRingBuffer::Pop() +{ + Impl->Pop(); +} + +ui32 TFileRingBuffer::Size() const +{ + return Impl->Size(); +} + +bool TFileRingBuffer::Empty() const +{ + return Impl->Empty(); +} + +TVector TFileRingBuffer::Validate() const +{ + return Impl->Validate(); +} + +} // namespace NCloud::NFileStore diff --git a/cloud/filestore/libs/vfs_fuse/file_ring_buffer.h b/cloud/filestore/libs/vfs_fuse/file_ring_buffer.h new file mode 100644 index 00000000000..8da9fa935ae --- /dev/null +++ b/cloud/filestore/libs/vfs_fuse/file_ring_buffer.h @@ -0,0 +1,36 @@ +#pragma once + +#include +#include + +namespace NCloud::NFileStore { + +//////////////////////////////////////////////////////////////////////////////// + +struct TBrokenFileRingBufferEntry +{ + TString Data; + ui32 ExpectedChecksum = 0; + ui32 ActualChecksum = 0; +}; + +class TFileRingBuffer +{ +private: + class TImpl; + std::unique_ptr Impl; + +public: + TFileRingBuffer(const TString& filePath, ui32 capacity, ui32 maxEntrySize); + ~TFileRingBuffer(); + +public: + bool Push(TStringBuf data); + TStringBuf Front() const; + void Pop(); + ui32 Size() const; + bool Empty() const; + TVector Validate() const; +}; + +} // namespace NCloud::NFileStore diff --git a/cloud/filestore/libs/vfs_fuse/file_ring_buffer_ut.cpp b/cloud/filestore/libs/vfs_fuse/file_ring_buffer_ut.cpp new file mode 100644 index 00000000000..7e789467bc8 --- /dev/null +++ b/cloud/filestore/libs/vfs_fuse/file_ring_buffer_ut.cpp @@ -0,0 +1,338 @@ +#include "file_ring_buffer.h" + +#include + +#include +#include +#include +#include +#include + +namespace NCloud::NFileStore { + +namespace { + +//////////////////////////////////////////////////////////////////////////////// + +TString Dump(const TVector& entries) +{ + TStringBuilder sb; + + for (ui32 i = 0; i < entries.size(); ++i) { + if (i) { + sb << ", "; + } + + sb << "data=" << entries[i].Data + << " ecsum=" << entries[i].ExpectedChecksum + << " csum=" << entries[i].ActualChecksum; + } + + return sb; +} + +TString PopAll(TFileRingBuffer& rb) +{ + TStringBuilder sb; + + while (!rb.Empty()) { + if (sb.Size()) { + sb << ", "; + } + + sb << rb.Front(); + rb.Pop(); + } + + return sb; +} + +//////////////////////////////////////////////////////////////////////////////// + +struct TReferenceImplementation +{ + static constexpr ui32 EntryOverhead = 8; + + const ui32 MaxWeight; + const ui32 MaxEntrySize; + + TDeque Q; + ui32 ReadPos = 0; + ui32 WritePos = 0; + ui32 SlackSpace = 0; + + explicit TReferenceImplementation(ui32 maxWeight, ui32 maxEntrySize) + : MaxWeight(maxWeight) + , MaxEntrySize(maxEntrySize) + {} + + bool Push(TStringBuf data) + { + if (data.Empty() || data.Size() > MaxEntrySize) { + return false; + } + + const ui32 sz = EntryOverhead + data.Size(); + + if (!Empty()) { + if (ReadPos < WritePos) { + const auto avail = MaxWeight - WritePos; + if (avail <= sz) { + if (ReadPos <= sz) { + // out of space + return false; + } + + SlackSpace = avail; + WritePos = 0; + } + } else { + const auto avail = ReadPos - WritePos; + if (avail <= sz) { + // out of space + return false; + } + } + } + + WritePos += sz; + Q.emplace_back(data); + return true; + } + + TStringBuf Front() const + { + if (!Q) { + return {}; + } + + return Q.front(); + } + + void Pop() + { + if (!Q) { + return; + } + + const ui32 sz = Q.front().Size() + EntryOverhead; + ReadPos += sz; + if (MaxWeight - ReadPos <= SlackSpace) { + UNIT_ASSERT_VALUES_EQUAL(SlackSpace, MaxWeight - ReadPos); + if (ReadPos == WritePos) { + WritePos = 0; + } + ReadPos = 0; + SlackSpace = 0; + } + + Q.pop_front(); + } + + bool Empty() const + { + return Q.empty(); + } + + ui32 Size() const + { + return Q.size(); + } + + auto Validate() const + { + return TVector(); + } +}; + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +Y_UNIT_TEST_SUITE(TFileRingBufferTest) +{ + template + void DoTestShouldPushPop(TRingBuffer& rb) + { + UNIT_ASSERT_VALUES_EQUAL(0, rb.Size()); + UNIT_ASSERT(rb.Empty()); + + UNIT_ASSERT(!rb.Push("longvasya11")); // too long + UNIT_ASSERT(!rb.Push("")); // empty + UNIT_ASSERT(rb.Push("vasya")); + UNIT_ASSERT(rb.Push("petya")); + UNIT_ASSERT(rb.Push("vasya2")); + UNIT_ASSERT(rb.Push("petya2")); + UNIT_ASSERT(!rb.Push("vasya3")); // out of space + + UNIT_ASSERT_VALUES_EQUAL("", Dump(rb.Validate())); + UNIT_ASSERT_VALUES_EQUAL(4, rb.Size()); + UNIT_ASSERT_VALUES_EQUAL("vasya", rb.Front()); + rb.Pop(); + + UNIT_ASSERT_VALUES_EQUAL("", Dump(rb.Validate())); + UNIT_ASSERT_VALUES_EQUAL(3, rb.Size()); + UNIT_ASSERT(!rb.Push("vasya3")); + + UNIT_ASSERT_VALUES_EQUAL("petya", rb.Front()); + rb.Pop(); + + UNIT_ASSERT_VALUES_EQUAL("", Dump(rb.Validate())); + UNIT_ASSERT_VALUES_EQUAL(2, rb.Size()); + UNIT_ASSERT(rb.Push("vasya3")); + + UNIT_ASSERT_VALUES_EQUAL("", Dump(rb.Validate())); + UNIT_ASSERT_VALUES_EQUAL(3, rb.Size()); + UNIT_ASSERT_VALUES_EQUAL("vasya2", rb.Front()); + rb.Pop(); + + UNIT_ASSERT_VALUES_EQUAL("", Dump(rb.Validate())); + UNIT_ASSERT_VALUES_EQUAL(2, rb.Size()); + UNIT_ASSERT_VALUES_EQUAL("petya2", rb.Front()); + rb.Pop(); + + UNIT_ASSERT_VALUES_EQUAL("", Dump(rb.Validate())); + UNIT_ASSERT_VALUES_EQUAL(1, rb.Size()); + UNIT_ASSERT_VALUES_EQUAL("vasya3", rb.Front()); + rb.Pop(); + + UNIT_ASSERT_VALUES_EQUAL("", Dump(rb.Validate())); + UNIT_ASSERT_VALUES_EQUAL(0, rb.Size()); + UNIT_ASSERT(rb.Empty()); + } + + Y_UNIT_TEST(ShouldPushPop) + { + const auto f = TTempFileHandle(); + const ui32 len = 64; + const ui32 maxEntrySize = 10; + TFileRingBuffer rb(f.GetName(), len, maxEntrySize); + + DoTestShouldPushPop(rb); + } + + Y_UNIT_TEST(ShouldPushPopReferenceImplementation) + { + const ui32 len = 64; + const ui32 maxEntrySize = 10; + TReferenceImplementation rb(len, maxEntrySize); + + DoTestShouldPushPop(rb); + } + + Y_UNIT_TEST(ShouldRestore) + { + const auto f = TTempFileHandle(); + const ui32 len = 64; + const ui32 maxEntrySize = 10; + auto rb = std::make_unique( + f.GetName(), + len, + maxEntrySize); + + UNIT_ASSERT(rb->Push("vasya")); + UNIT_ASSERT(rb->Push("petya")); + UNIT_ASSERT(rb->Push("vasya2")); + UNIT_ASSERT(rb->Push("petya2")); + rb->Pop(); + rb->Pop(); + UNIT_ASSERT(rb->Push("vasya3")); + UNIT_ASSERT(rb->Push("xxx")); + + rb = std::make_unique( + f.GetName(), + len, + maxEntrySize); + + UNIT_ASSERT_VALUES_EQUAL("", Dump(rb->Validate())); + UNIT_ASSERT_VALUES_EQUAL(4, rb->Size()); + + UNIT_ASSERT_VALUES_EQUAL("vasya2, petya2, vasya3, xxx", PopAll(*rb)); + } + + Y_UNIT_TEST(ShouldValidate) + { + const auto f = TTempFileHandle(); + const ui32 len = 64; + const ui32 maxEntrySize = 10; + TFileRingBuffer rb(f.GetName(), len, maxEntrySize); + + UNIT_ASSERT(rb.Push("vasya")); + UNIT_ASSERT(rb.Push("petya")); + UNIT_ASSERT(rb.Push("vasya2")); + UNIT_ASSERT(rb.Push("petya2")); + + UNIT_ASSERT_VALUES_EQUAL("", Dump(rb.Validate())); + TFileMap m(f.GetName(), TMemoryMapCommon::oRdWr); + m.Map(0, len); + char* data = static_cast(m.Ptr()); + data[10] = 'A'; + + UNIT_ASSERT_VALUES_EQUAL( + "data=invalid_entry_marker ecsum=0 csum=11034342", + Dump(rb.Validate())); + } + + TString GenerateData(ui32 sz) + { + TString s(sz, 0); + for (ui32 i = 0; i < sz; ++i) { + s[i] = 'a' + RandomNumber('z' - 'a' + 1); + } + return s; + } + + Y_UNIT_TEST(RandomizedPushPopRestore) + { + const auto f = TTempFileHandle(); + const ui32 len = 1_MB; + const ui32 testBytes = 16_MB; + const ui32 maxEntrySize = 4_KB; + const ui32 testUpToEntrySize = 5_KB; + const double restoreProbability = 0.05; + std::unique_ptr rb; + TReferenceImplementation ri(len, maxEntrySize); + + auto restore = [&] () { + rb = std::make_unique( + f.GetName(), + len, + maxEntrySize); + }; + + restore(); + + ui32 remainingBytes = testBytes; + while (remainingBytes || !ri.Empty()) { + const bool shouldPush = remainingBytes && RandomNumber(); + if (shouldPush) { + const ui32 entrySize = + RandomNumber(Min(remainingBytes + 1, testUpToEntrySize)); + const auto data = GenerateData(entrySize); + const bool pushed = ri.Push(data); + UNIT_ASSERT_VALUES_EQUAL(pushed, rb->Push(data)); + if (pushed) { + remainingBytes -= entrySize; + // Cerr << "PUSH\t" << data << Endl; + } + } else { + UNIT_ASSERT_VALUES_EQUAL(ri.Front(), rb->Front()); + // Cerr << "POP\t" << ri.Front() << Endl; + ri.Pop(); + rb->Pop(); + } + + // Cerr << ri.Size() << " " << remainingBytes << Endl; + + if (RandomNumber() < restoreProbability) { + restore(); + } + + UNIT_ASSERT_VALUES_EQUAL(ri.Size(), rb->Size()); + UNIT_ASSERT_VALUES_EQUAL(ri.Empty(), rb->Empty()); + UNIT_ASSERT_VALUES_EQUAL("", Dump(rb->Validate())); + } + } +} + +} // namespace NCloud::NFileStore diff --git a/cloud/filestore/libs/vfs_fuse/handle_ops_queue.cpp b/cloud/filestore/libs/vfs_fuse/handle_ops_queue.cpp index 9ec05a7338d..a4ac62ba5db 100644 --- a/cloud/filestore/libs/vfs_fuse/handle_ops_queue.cpp +++ b/cloud/filestore/libs/vfs_fuse/handle_ops_queue.cpp @@ -6,30 +6,30 @@ namespace NCloud::NFileStore::NFuse { void THandleOpsQueue::AddDestroyRequest(ui64 nodeId, ui64 handle) { - NProto::TQueueEntry request; - request.MutableDestroyHandleRequest()->SetHandle(handle); - request.MutableDestroyHandleRequest()->SetNodeId(nodeId); - Requests.push(request); + NProto::TQueueEntry request; + request.MutableDestroyHandleRequest()->SetHandle(handle); + request.MutableDestroyHandleRequest()->SetNodeId(nodeId); + Requests.push(request); } const NProto::TQueueEntry& THandleOpsQueue::Front() { - return Requests.front(); + return Requests.front(); } bool THandleOpsQueue::Empty() const { - return Requests.empty(); + return Requests.empty(); } void THandleOpsQueue::Pop() { - Requests.pop(); + Requests.pop(); } ui64 THandleOpsQueue::Size() const { - return Requests.size(); + return Requests.size(); } } // namespace NCloud::NFileStore::NFuse diff --git a/cloud/filestore/libs/vfs_fuse/handle_ops_queue.h b/cloud/filestore/libs/vfs_fuse/handle_ops_queue.h index 849e16bd9bd..2b3a8af430f 100644 --- a/cloud/filestore/libs/vfs_fuse/handle_ops_queue.h +++ b/cloud/filestore/libs/vfs_fuse/handle_ops_queue.h @@ -1,3 +1,5 @@ +#pragma once + #include #include @@ -9,14 +11,14 @@ namespace NCloud::NFileStore::NFuse { class THandleOpsQueue { private: - TQueue Requests; + TQueue Requests; public: - void AddDestroyRequest(ui64 nodeId, ui64 handle); - const NProto::TQueueEntry& Front(); - void Pop(); - ui64 Size() const; - bool Empty() const; + void AddDestroyRequest(ui64 nodeId, ui64 handle); + const NProto::TQueueEntry& Front(); + void Pop(); + ui64 Size() const; + bool Empty() const; }; } // namespace NCloud::NFileStore::NFuse diff --git a/cloud/filestore/libs/vfs_fuse/ut/ya.make b/cloud/filestore/libs/vfs_fuse/ut/ya.make index d92df2d4e6c..37b10750f8b 100644 --- a/cloud/filestore/libs/vfs_fuse/ut/ya.make +++ b/cloud/filestore/libs/vfs_fuse/ut/ya.make @@ -5,6 +5,7 @@ INCLUDE(${ARCADIA_ROOT}/cloud/filestore/tests/recipes/medium.inc) SRCDIR(cloud/filestore/libs/vfs_fuse) SRCS( + file_ring_buffer_ut.cpp fs_ut.cpp ) diff --git a/cloud/filestore/libs/vfs_fuse/ya.make.inc b/cloud/filestore/libs/vfs_fuse/ya.make.inc index 0d9604a29c5..e01bfd59275 100644 --- a/cloud/filestore/libs/vfs_fuse/ya.make.inc +++ b/cloud/filestore/libs/vfs_fuse/ya.make.inc @@ -1,6 +1,7 @@ SRCS( cache.cpp config.cpp + file_ring_buffer.cpp fs.cpp fs_impl.cpp fs_impl_attr.cpp