diff --git a/cloud/filestore/config/filesystem.proto b/cloud/filestore/config/filesystem.proto index a8402f05061..970ddea0b35 100644 --- a/cloud/filestore/config/filesystem.proto +++ b/cloud/filestore/config/filesystem.proto @@ -42,4 +42,10 @@ message TFileSystemConfig // Period of processing create/destroy handle requests. optional uint32 AsyncHandleOperationPeriod = 12; + + // Enable O_DIRECT when working with files + optional bool DirectIoEnabled = 13; + + // Aligment needed for buffers when using direct io + optional uint32 DirectIoAlign = 14; } diff --git a/cloud/filestore/config/server.proto b/cloud/filestore/config/server.proto index 43cb47572da..c341fa0b507 100644 --- a/cloud/filestore/config/server.proto +++ b/cloud/filestore/config/server.proto @@ -83,6 +83,12 @@ message TLocalServiceConfig // Maximum number of file handles which can be opened by single session // for single local file system optional uint32 MaxHandlePerSessionCount = 8; + + // Enable O_DIRECT when working with files + optional bool DirectIoEnabled = 9; + + // Aligment needed for buffers when using direct io + optional uint32 DirectIoAlign = 10; } //////////////////////////////////////////////////////////////////////////////// diff --git a/cloud/filestore/libs/service/filestore.cpp b/cloud/filestore/libs/service/filestore.cpp index 46e036ee9ca..719fa425bd5 100644 --- a/cloud/filestore/libs/service/filestore.cpp +++ b/cloud/filestore/libs/service/filestore.cpp @@ -33,6 +33,7 @@ constexpr std::array SUPPORTED_HANDLE_FLAGS = { TFlag2Proto{O_NOFOLLOW, TCreateHandleRequest::E_NOFOLLOW}, TFlag2Proto{O_NONBLOCK, TCreateHandleRequest::E_NONBLOCK}, TFlag2Proto{O_PATH, TCreateHandleRequest::E_PATH}, + TFlag2Proto{O_DIRECT, TCreateHandleRequest::E_DIRECT}, }; constexpr std::array SUPPORTED_RENAME_FLAGS = { diff --git a/cloud/filestore/libs/service_local/config.cpp b/cloud/filestore/libs/service_local/config.cpp index bd85861d68e..0aa5e8d8d3a 100644 --- a/cloud/filestore/libs/service_local/config.cpp +++ b/cloud/filestore/libs/service_local/config.cpp @@ -2,6 +2,8 @@ #include +#include + namespace NCloud::NFileStore { namespace { @@ -17,6 +19,8 @@ namespace { xxx(StatePath, TString, "./" )\ xxx(MaxNodeCount, ui32, 1000000 )\ xxx(MaxHandlePerSessionCount, ui32, 10000 )\ + xxx(DirectIoEnabled, bool, false )\ + xxx(DirectIoAlign, ui32, 4_KB )\ // FILESTORE_SERVICE_CONFIG #define FILESTORE_SERVICE_DECLARE_CONFIG(name, type, value) \ diff --git a/cloud/filestore/libs/service_local/config.h b/cloud/filestore/libs/service_local/config.h index a2af2fc7241..24478167b30 100644 --- a/cloud/filestore/libs/service_local/config.h +++ b/cloud/filestore/libs/service_local/config.h @@ -29,6 +29,8 @@ class TLocalFileStoreConfig TString GetStatePath() const; ui32 GetMaxNodeCount() const; ui32 GetMaxHandlePerSessionCount() const; + bool GetDirectIoEnabled() const; + ui32 GetDirectIoAlign() const; void Dump(IOutputStream& out) const; void DumpHtml(IOutputStream& out) const; diff --git a/cloud/filestore/libs/service_local/fs.cpp b/cloud/filestore/libs/service_local/fs.cpp index c35a8ec71ca..376ca930af3 100644 --- a/cloud/filestore/libs/service_local/fs.cpp +++ b/cloud/filestore/libs/service_local/fs.cpp @@ -24,6 +24,11 @@ TLocalFileSystem::TLocalFileSystem( { Log = Logging->CreateLog(Store.GetFileSystemId()); + STORAGE_INFO( + "LocalFileSystemId=" << Store.GetFileSystemId() << + ", DirectIoEnabled=" << Config->GetDirectIoEnabled() << + ", DirectIoAlign=" << Config->GetDirectIoAlign()); + Index = std::make_shared( Root, StatePath, diff --git a/cloud/filestore/libs/service_local/fs_data.cpp b/cloud/filestore/libs/service_local/fs_data.cpp index d866a5a5db4..2a07e36d61a 100644 --- a/cloud/filestore/libs/service_local/fs_data.cpp +++ b/cloud/filestore/libs/service_local/fs_data.cpp @@ -3,6 +3,8 @@ #include "lowlevel.h" #include + +#include #include #include @@ -30,6 +32,9 @@ NProto::TCreateHandleResponse TLocalFileSystem::CreateHandle( } int flags = HandleFlagsToSystem(request.GetFlags()); + if (!Config->GetDirectIoEnabled()) { + flags &= ~O_DIRECT; + } const int mode = request.GetMode() ? request.GetMode() : Config->GetDefaultPermissions(); @@ -97,18 +102,23 @@ TFuture TLocalFileSystem::ReadDataAsync( TErrorResponse(ErrorInvalidHandle(request.GetHandle()))); } - auto b = TString::Uninitialized(request.GetLength()); - NSan::Unpoison(b.data(), b.size()); + auto align = Config->GetDirectIoEnabled() ? Config->GetDirectIoAlign() : 0; + auto b = std::make_shared(request.GetLength(), align); + NSan::Unpoison(b->Begin(), b->Size()); - TArrayRef data(b.begin(), b.vend()); + TArrayRef data(b->Begin(), b->End()); auto promise = NewPromise(); FileIOService->AsyncRead(*handle, request.GetOffset(), data).Subscribe( [b = std::move(b), promise] (const TFuture& f) mutable { NProto::TReadDataResponse response; try { auto bytesRead = f.GetValue(); - b.resize(bytesRead); - response.SetBuffer(std::move(b)); + b->TrimSize(bytesRead); + response.SetBufferOffset(b->AlignedDataOffset()); + response.SetBuffer(std::move(b->AccessBuffer())); + } catch (const TServiceError& e) { + *response.MutableError() = MakeError(MAKE_FILESTORE_ERROR( + ErrnoToFileStoreError(STATUS_FROM_CODE(e.GetCode())))); } catch (...) { *response.MutableError() = MakeError(E_IO, CurrentExceptionMessage()); @@ -131,13 +141,17 @@ TFuture TLocalFileSystem::WriteDataAsync( } auto b = std::move(*request.MutableBuffer()); - TArrayRef data(b.begin(), b.vend()); + auto offset = request.GetBufferOffset(); + TArrayRef data(b.begin() + offset, b.vend()); auto promise = NewPromise(); FileIOService->AsyncWrite(*handle, request.GetOffset(), data).Subscribe( [b = std::move(b), promise] (const TFuture& f) mutable { NProto::TWriteDataResponse response; try { f.GetValue(); + } catch (const TServiceError& e) { + *response.MutableError() = MakeError(MAKE_FILESTORE_ERROR( + ErrnoToFileStoreError(STATUS_FROM_CODE(e.GetCode())))); } catch (...) { *response.MutableError() = MakeError(E_IO, CurrentExceptionMessage()); diff --git a/cloud/filestore/libs/service_local/fs_session.cpp b/cloud/filestore/libs/service_local/fs_session.cpp index ce6eb891424..01416d4c9da 100644 --- a/cloud/filestore/libs/service_local/fs_session.cpp +++ b/cloud/filestore/libs/service_local/fs_session.cpp @@ -75,6 +75,10 @@ NProto::TCreateSessionResponse TLocalFileSystem::CreateSession( NProto::TCreateSessionResponse response; session->GetInfo(*response.MutableSession(), sessionSeqNo); + auto* features = response.MutableFileStore()->MutableFeatures(); + features->SetDirectIoEnabled(Config->GetDirectIoEnabled()); + features->SetDirectIoAlign(Config->GetDirectIoAlign()); + return response; } diff --git a/cloud/filestore/libs/service_local/service_ut.cpp b/cloud/filestore/libs/service_local/service_ut.cpp index 5181814a81e..2e6627e9550 100644 --- a/cloud/filestore/libs/service_local/service_ut.cpp +++ b/cloud/filestore/libs/service_local/service_ut.cpp @@ -6,6 +6,7 @@ #include #include +#include #include #include #include @@ -229,6 +230,9 @@ struct TCreateHandleArgs = ProtoFlag(NProto::TCreateHandleRequest::E_CREATE) | ProtoFlag(NProto::TCreateHandleRequest::E_READ) | ProtoFlag(NProto::TCreateHandleRequest::E_WRITE); + + static constexpr ui32 DIRECT + = ProtoFlag(NProto::TCreateHandleRequest::E_DIRECT); }; //////////////////////////////////////////////////////////////////////////////// @@ -289,12 +293,18 @@ struct TTestBootstrap TTestBootstrap( const TTempDirectoryPtr& cwd = std::make_shared(), ui32 maxNodeCount = 1000, - ui32 maxHandlePerSessionCount = 100) + ui32 maxHandlePerSessionCount = 100, + bool directIoEnabled = false, + ui32 directIoAlign = 4096) : Cwd(cwd) { AIOService->Start(); Store = CreateLocalFileStore( - CreateConfig(maxNodeCount, maxHandlePerSessionCount), + CreateConfig( + maxNodeCount, + maxHandlePerSessionCount, + directIoEnabled, + directIoAlign), Timer, Scheduler, Logging, @@ -308,12 +318,18 @@ struct TTestBootstrap const TString& client = "client", const TString& session = {}, ui32 maxNodeCount = 1000, - ui32 maxHandlePerSessionCount = 100) + ui32 maxHandlePerSessionCount = 100, + bool directIoEnabled = false, + ui32 directIoAlign = 4096) : Cwd(std::make_shared()) { AIOService->Start(); Store = CreateLocalFileStore( - CreateConfig(maxNodeCount, maxHandlePerSessionCount), + CreateConfig( + maxNodeCount, + maxHandlePerSessionCount, + directIoEnabled, + directIoAlign), Timer, Scheduler, Logging, @@ -373,13 +389,17 @@ struct TTestBootstrap TLocalFileStoreConfigPtr CreateConfig( ui32 maxNodeCount, - ui32 maxHandlePerSessionCount) + ui32 maxHandlePerSessionCount, + bool directIoEnabled, + ui32 directIoAlign) { NProto::TLocalServiceConfig config; config.SetRootPath(Cwd->GetName()); config.SetStatePath(Cwd->GetName()); config.SetMaxNodeCount(maxNodeCount); config.SetMaxHandlePerSessionCount(maxHandlePerSessionCount); + config.SetDirectIoEnabled(directIoEnabled); + config.SetDirectIoAlign(directIoAlign); return std::make_shared(config); } @@ -746,6 +766,64 @@ struct TTestBootstrap FILESTORE_SERVICE(FILESTORE_DECLARE_METHOD) #undef FILESTORE_DECLARE_METHOD + + NProto::TWriteDataResponse WriteDataAligned( + ui64 handle, + ui64 offset, + const TString& buffer, + ui32 align) + { + auto request = CreateRequest(); + request->SetHandle(handle); + request->SetOffset(offset); + + TAlignedBuffer alignedBuffer(buffer.size(), align); + memcpy( + (void*)(alignedBuffer.Begin()), + (void*)buffer.data(), + buffer.size()); + request->SetBufferOffset(alignedBuffer.AlignedDataOffset()); + request->SetBuffer(std::move(alignedBuffer.AccessBuffer())); + + auto dbg = request->ShortDebugString(); + auto response = + Store->WriteData(Ctx, std::move(request)).GetValueSync(); + + UNIT_ASSERT_C( + SUCCEEDED(response.GetError().GetCode()), + DumpMessage(response.GetError()) + "@" + dbg); + + return response; + } + + NProto::TWriteDataResponse AssertWriteDataAlignedFailed( + ui64 handle, + ui64 offset, + const TString& buffer, + ui32 align) + { + auto request = CreateRequest(); + request->SetHandle(handle); + request->SetOffset(offset); + + TAlignedBuffer alignedBuffer(buffer.size(), align); + memcpy( + (void*)(alignedBuffer.Begin()), + (void*)buffer.data(), + buffer.size()); + request->SetBufferOffset(alignedBuffer.AlignedDataOffset()); + request->SetBuffer(std::move(alignedBuffer.AccessBuffer())); + + auto dbg = request->ShortDebugString(); + auto response = + Store->WriteData(Ctx, std::move(request)).GetValueSync(); + + UNIT_ASSERT_C( + FAILED(response.GetError().GetCode()), + "WriteDataAligned has not failed as expected " + dbg); + + return response; + } }; //////////////////////////////////////////////////////////////////////////////// @@ -1939,6 +2017,76 @@ Y_UNIT_TEST_SUITE(LocalFileStore) bootstrap.FsyncDir(dirNodeId, dataSync); } } + + void CheckReadAndWriteDataWithDirectIo(ui32 directIoAlign) + { + TTestBootstrap bootstrap( + "fs", + "client", + {}, + 1000, + 1000, + true /* direct io enabled */, + directIoAlign); + + ui64 handle = + bootstrap + .CreateHandle( + RootNodeId, + "file", + TCreateHandleArgs::CREATE | TCreateHandleArgs::DIRECT) + .GetHandle(); + auto readRsp = bootstrap.ReadData(handle, 0, 100); + auto data = readRsp.GetBuffer().substr(readRsp.GetBufferOffset()); + UNIT_ASSERT_VALUES_EQUAL(data.size(), 0); + + data = "aaaabbbbcccccdddddeeee"; + const auto response = bootstrap.AssertWriteDataAlignedFailed( + handle, + 0, + data, + directIoAlign); + const auto& error = response.GetError(); + UNIT_ASSERT_VALUES_EQUAL( + static_cast(NProto::E_FS_INVAL), + STATUS_FROM_CODE(error.GetCode())); + + data.append(directIoAlign-data.size(), 'x'); + data.append(directIoAlign, 'y'); + bootstrap.WriteDataAligned(handle, 0, data, directIoAlign); + + auto readDataWithOffset = + [&bootstrap](ui64 handle, ui64 offset, ui32 len) + { + auto rsp = bootstrap.ReadData(handle, offset, len); + return rsp.GetBuffer().substr(rsp.GetBufferOffset()); + }; + + // read [0, 2*align] + auto buffer = readDataWithOffset(handle, 0, data.size()); + UNIT_ASSERT_VALUES_EQUAL(buffer, data); + + // read [0, align] + buffer = readDataWithOffset(handle, 0, directIoAlign); + UNIT_ASSERT_VALUES_EQUAL(buffer, data.substr(0, directIoAlign)); + + // read [align, align] + buffer = readDataWithOffset(handle, directIoAlign, directIoAlign); + UNIT_ASSERT_VALUES_EQUAL( + buffer, + data.substr(directIoAlign, directIoAlign)); + } + + Y_UNIT_TEST(ShouldReadAndWriteDataWithDirectIoAlignedTo512) + { + CheckReadAndWriteDataWithDirectIo(512); + } + + Y_UNIT_TEST(ShouldReadAndWriteDataWithDirectIoAlignedTo4096) + { + CheckReadAndWriteDataWithDirectIo(4096); + } + }; } // namespace NCloud::NFileStore diff --git a/cloud/filestore/libs/vfs_fuse/config.cpp b/cloud/filestore/libs/vfs_fuse/config.cpp index da7a256c547..16f22fe1a10 100644 --- a/cloud/filestore/libs/vfs_fuse/config.cpp +++ b/cloud/filestore/libs/vfs_fuse/config.cpp @@ -29,6 +29,9 @@ namespace { \ xxx(AsyncDestroyHandleEnabled, bool, false )\ xxx(AsyncHandleOperationPeriod, TDuration, TDuration::MilliSeconds(50) )\ + \ + xxx(DirectIoEnabled, bool, false )\ + xxx(DirectIoAlign, ui32, 4_KB )\ // FILESTORE_FUSE_CONFIG #define FILESTORE_FILESYSTEM_DECLARE_CONFIG(name, type, value) \ diff --git a/cloud/filestore/libs/vfs_fuse/config.h b/cloud/filestore/libs/vfs_fuse/config.h index a1312c49b6e..95b11590563 100644 --- a/cloud/filestore/libs/vfs_fuse/config.h +++ b/cloud/filestore/libs/vfs_fuse/config.h @@ -39,6 +39,9 @@ struct TFileSystemConfig bool GetAsyncDestroyHandleEnabled() const; TDuration GetAsyncHandleOperationPeriod() const; + bool GetDirectIoEnabled() const; + ui32 GetDirectIoAlign() const; + void Dump(IOutputStream& out) const; void DumpHtml(IOutputStream& out) const; }; diff --git a/cloud/filestore/libs/vfs_fuse/fs_impl_data.cpp b/cloud/filestore/libs/vfs_fuse/fs_impl_data.cpp index 35df85099e3..64ee8c3dadb 100644 --- a/cloud/filestore/libs/vfs_fuse/fs_impl_data.cpp +++ b/cloud/filestore/libs/vfs_fuse/fs_impl_data.cpp @@ -5,6 +5,8 @@ #include #include +#include + namespace NCloud::NFileStore::NFuse { using namespace NCloud::NFileStore::NVFS; @@ -219,12 +221,13 @@ void TFileSystem::Read( const auto& response = future.GetValue(); if (auto self = ptr.lock(); CheckResponse(self, *callContext, req, response)) { const auto& buffer = response.GetBuffer(); + ui32 bufferOffset = response.GetBufferOffset(); self->ReplyBuf( *callContext, response.GetError(), req, - buffer.data(), - buffer.size()); + buffer.Data() + bufferOffset, + buffer.Size() - bufferOffset); } }); } @@ -248,10 +251,18 @@ void TFileSystem::Write( callContext->Unaligned = !IsAligned(offset, Config->GetBlockSize()) || !IsAligned(buffer.size(), Config->GetBlockSize()); + auto align = Config->GetDirectIoEnabled() ? Config->GetDirectIoAlign() : 0; + TAlignedBuffer alignedBuffer(buffer.size(), align); + memcpy( + (void*)(alignedBuffer.Begin()), + (void*)buffer.data(), + buffer.size()); + auto request = StartRequest(ino); request->SetHandle(fi->fh); request->SetOffset(offset); - request->SetBuffer(buffer.data(), buffer.size()); + request->SetBufferOffset(alignedBuffer.AlignedDataOffset()); + request->SetBuffer(std::move(alignedBuffer.AccessBuffer())); const auto handle = fi->fh; const auto reqId = callContext->RequestId; @@ -291,10 +302,11 @@ void TFileSystem::WriteBuf( return; } - auto buffer = TString::Uninitialized(size); + auto align = Config->GetDirectIoEnabled() ? Config->GetDirectIoAlign() : 0; + TAlignedBuffer alignedBuffer(size, align); fuse_bufvec dst = FUSE_BUFVEC_INIT(size); - dst.buf[0].mem = (void*)buffer.data(); + dst.buf[0].mem = (void*)(alignedBuffer.Begin()); ssize_t res = fuse_buf_copy( &dst, bufv @@ -309,12 +321,13 @@ void TFileSystem::WriteBuf( Y_ABORT_UNLESS((size_t)res == size); callContext->Unaligned = !IsAligned(offset, Config->GetBlockSize()) - || !IsAligned(buffer.size(), Config->GetBlockSize()); + || !IsAligned(size, Config->GetBlockSize()); auto request = StartRequest(ino); request->SetHandle(fi->fh); request->SetOffset(offset); - request->SetBuffer(std::move(buffer)); + request->SetBufferOffset(alignedBuffer.AlignedDataOffset()); + request->SetBuffer(std::move(alignedBuffer.AccessBuffer())); const auto handle = fi->fh; const auto reqId = callContext->RequestId; diff --git a/cloud/filestore/libs/vfs_fuse/loop.cpp b/cloud/filestore/libs/vfs_fuse/loop.cpp index 3abb29a4ba6..58bb5c16bc1 100644 --- a/cloud/filestore/libs/vfs_fuse/loop.cpp +++ b/cloud/filestore/libs/vfs_fuse/loop.cpp @@ -896,6 +896,10 @@ class TFileSystemLoop final features.GetAsyncDestroyHandleEnabled()); config.SetAsyncHandleOperationPeriod( features.GetAsyncHandleOperationPeriod()); + + config.SetDirectIoEnabled(features.GetDirectIoEnabled()); + config.SetDirectIoAlign(features.GetDirectIoAlign()); + return std::make_shared(config); } diff --git a/cloud/filestore/public/api/protos/data.proto b/cloud/filestore/public/api/protos/data.proto index 5b85bd607fc..7bec0789669 100644 --- a/cloud/filestore/public/api/protos/data.proto +++ b/cloud/filestore/public/api/protos/data.proto @@ -27,11 +27,11 @@ message TCreateHandleRequest E_NOFOLLOW = 9; E_NONBLOCK = 10; E_PATH = 11; + E_DIRECT = 12; // TODO: these flags have no meaning or only useful for the local svc // E_ASYNC // E_CLOEXEC - // E_DIRECT // E_DSYNC // E_LARGEFILE // E_NOCTTY @@ -145,6 +145,9 @@ message TReadDataResponse // Buffer with bytes read. bytes Buffer = 2; + // Bytes read start at buffer offset. + uint32 BufferOffset = 3; + // Optional response headers. TResponseHeaders Headers = 1000; } @@ -171,6 +174,9 @@ message TWriteDataRequest // Buffer with bytes to write. bytes Buffer = 6; + + // Bytes to write start at buffer offset. + uint32 BufferOffset = 7; } message TWriteDataResponse diff --git a/cloud/filestore/public/api/protos/fs.proto b/cloud/filestore/public/api/protos/fs.proto index a9161f78f7b..903793e1343 100644 --- a/cloud/filestore/public/api/protos/fs.proto +++ b/cloud/filestore/public/api/protos/fs.proto @@ -22,6 +22,8 @@ message TFileStoreFeatures uint32 PreferredBlockSize = 7; bool AsyncDestroyHandleEnabled = 8; uint32 AsyncHandleOperationPeriod = 9; + bool DirectIoEnabled = 10; + uint32 DirectIoAlign = 11; } message TFileStore diff --git a/cloud/filestore/tests/fio/qemu-local-noserver-direct-io-test/test.py b/cloud/filestore/tests/fio/qemu-local-noserver-direct-io-test/test.py new file mode 100644 index 00000000000..46e0702642b --- /dev/null +++ b/cloud/filestore/tests/fio/qemu-local-noserver-direct-io-test/test.py @@ -0,0 +1,16 @@ +import pytest + +import cloud.storage.core.tools.testing.fio.lib as fio + +from cloud.filestore.tests.python.lib.common import get_nfs_mount_path + + +TESTS = fio.generate_tests() + + +@pytest.mark.parametrize("name", TESTS.keys()) +def test_fio(name): + mount_dir = get_nfs_mount_path() + file_name = fio.get_file_name(mount_dir, name) + + fio.run_test(file_name, TESTS[name], fail_on_errors=True) diff --git a/cloud/filestore/tests/fio/qemu-local-noserver-direct-io-test/ya.make b/cloud/filestore/tests/fio/qemu-local-noserver-direct-io-test/ya.make new file mode 100644 index 00000000000..2e3d44e31b4 --- /dev/null +++ b/cloud/filestore/tests/fio/qemu-local-noserver-direct-io-test/ya.make @@ -0,0 +1,25 @@ +PY3TEST() + +INCLUDE(${ARCADIA_ROOT}/cloud/filestore/tests/recipes/medium.inc) + +DEPENDS( + cloud/storage/core/tools/testing/fio/bin +) + +PEERDIR( + cloud/filestore/tests/python/lib + cloud/storage/core/tools/testing/fio/lib +) + +TEST_SRCS( + test.py +) + +SET(QEMU_VIRTIO fs) +SET(VHOST_DIRECT_IO 1) + +INCLUDE(${ARCADIA_ROOT}/cloud/filestore/tests/recipes/vhost-local-noserver.inc) +INCLUDE(${ARCADIA_ROOT}/cloud/filestore/tests/recipes/vhost-endpoint.inc) +INCLUDE(${ARCADIA_ROOT}/cloud/storage/core/tests/recipes/qemu.inc) + +END() diff --git a/cloud/filestore/tests/fio/ya.make b/cloud/filestore/tests/fio/ya.make index 4e22856f615..26f4d6dadc2 100644 --- a/cloud/filestore/tests/fio/ya.make +++ b/cloud/filestore/tests/fio/ya.make @@ -4,6 +4,7 @@ RECURSE_FOR_TESTS( qemu-kikimr-nemesis-test qemu-kikimr-newfeatures-test qemu-kikimr-test + qemu-local-noserver-direct-io-test qemu-local-noserver-test qemu-local-test ) diff --git a/cloud/filestore/tests/recipes/vhost-local-noserver.inc b/cloud/filestore/tests/recipes/vhost-local-noserver.inc index cefa1a34e1c..24de3e5bc0e 100644 --- a/cloud/filestore/tests/recipes/vhost-local-noserver.inc +++ b/cloud/filestore/tests/recipes/vhost-local-noserver.inc @@ -15,6 +15,10 @@ SET(RECIPE_ARGS --restart-flag $VHOST_RESTART_FLAG ) +IF (VHOST_DIRECT_IO) + SET_APPEND(RECIPE_ARGS --direct-io) +ENDIF() + IF (NOT OPENSOURCE) SET_APPEND(RECIPE_ARGS --verbose) ENDIF() diff --git a/cloud/filestore/tests/recipes/vhost/__main__.py b/cloud/filestore/tests/recipes/vhost/__main__.py index d2b1d987cc9..3c15b3c00f6 100644 --- a/cloud/filestore/tests/recipes/vhost/__main__.py +++ b/cloud/filestore/tests/recipes/vhost/__main__.py @@ -32,6 +32,8 @@ def start(argv): parser.add_argument("--restart-interval", action="store", default=None) parser.add_argument("--restart-flag", action="store", default=None) parser.add_argument("--storage-config-patch", action="store", default=None) + parser.add_argument("--direct-io", action="store_true", default=False) + args = parser.parse_args(argv) vhost_binary_path = common.binary_path( @@ -69,6 +71,7 @@ def start(argv): fs_root_path = common.ram_drive_path() if fs_root_path: config.VhostServiceConfig.LocalServiceConfig.RootPath = fs_root_path + config.VhostServiceConfig.LocalServiceConfig.DirectIoEnabled = args.direct_io elif service_type == "kikimr": kikimr_port = os.getenv("KIKIMR_SERVER_PORT") domain = os.getenv("NFS_DOMAIN") diff --git a/cloud/storage/core/libs/common/aligned_buffer.cpp b/cloud/storage/core/libs/common/aligned_buffer.cpp new file mode 100644 index 00000000000..a02ab1ee477 --- /dev/null +++ b/cloud/storage/core/libs/common/aligned_buffer.cpp @@ -0,0 +1,124 @@ +#include "aligned_buffer.h" + +#include + +#include + +namespace NCloud { + +//////////////////////////////////////////////////////////////////////////////// + +TStringBuf TAlignedBuffer::ExtractAlignedData( + const TString& buffer, + ui32 align) +{ + auto alignedData = buffer.begin(); + if (align) { + alignedData = AlignUp(buffer.data(), align); + if (alignedData > buffer.end()) { + ythrow TServiceError(E_ARGUMENT) + << "Extracting unaligned buffer " << (void*)buffer.begin() + << " with alignment " << align + << " with size " << buffer.size(); + } + } + + return TStringBuf(alignedData, buffer.end() - alignedData); +} + +TAlignedBuffer::TAlignedBuffer() + : AlignedData(Buffer.begin()) +{} + +TAlignedBuffer::TAlignedBuffer(TAlignedBuffer&& other) + : Buffer(std::move(other.Buffer)) + , AlignedData(std::move(other.AlignedData)) +{ + other.Buffer.clear(); + other.AlignedData = other.Buffer.begin(); +} + +TAlignedBuffer& TAlignedBuffer::operator=(TAlignedBuffer&& other) +{ + Buffer = std::move(other.Buffer); + AlignedData = std::move(other.AlignedData); + other.Buffer.clear(); + other.AlignedData = other.Buffer.begin(); + return *this; +} + +TAlignedBuffer::TAlignedBuffer(ui32 size, ui32 align) + : Buffer(TString::Uninitialized(size + align)) + , AlignedData(Buffer.begin()) +{ + if (align) { + Y_DEBUG_ABORT_UNLESS(IsPowerOf2(align)); // align should be power of 2 + AlignedData = AlignUp(Buffer.data(), align); + auto* bufferMem = Buffer.data(); + Buffer.resize(AlignedData + size - Buffer.begin()); + Y_ABORT_UNLESS(Buffer.data() == bufferMem); + } +} + +TAlignedBuffer::TAlignedBuffer(TString&& buffer, ui32 align) + : Buffer(std::move(buffer)) + , AlignedData(Buffer.begin()) +{ + if (align) { + Y_DEBUG_ABORT_UNLESS(IsPowerOf2(align)); // align should be power of 2 + AlignedData = AlignUp(Buffer.data(), align); + if (AlignedData > Buffer.end()) { + ythrow TServiceError(E_ARGUMENT) + << "Initializing from unaligned buffer " + << (void*)Buffer.begin() + << " with alignment " << align + << " with size " << Buffer.size(); + } + } +} + +size_t TAlignedBuffer::AlignedDataOffset() const +{ + return AlignedData - Buffer.begin(); +} + +char* TAlignedBuffer::Begin() +{ + return const_cast(AlignedData); +} + +const char* TAlignedBuffer::Begin() const +{ + return AlignedData; +} + +char* TAlignedBuffer::End() +{ + return const_cast(Buffer.end()); +} + +const char* TAlignedBuffer::End() const +{ + return Buffer.end(); +} + +size_t TAlignedBuffer::Size() const +{ + return End() - Begin(); +} + +void TAlignedBuffer::TrimSize(size_t size) +{ + if (size > Size()) { + ythrow TServiceError(E_ARGUMENT) + << "Tried to trim to size " << size << " > " << Size(); + } + Buffer.resize(AlignedDataOffset() + size); +} + +TString& TAlignedBuffer::AccessBuffer() +{ + return Buffer; +} + +} // namespace NCloud diff --git a/cloud/storage/core/libs/common/aligned_buffer.h b/cloud/storage/core/libs/common/aligned_buffer.h new file mode 100644 index 00000000000..4d8f004c87a --- /dev/null +++ b/cloud/storage/core/libs/common/aligned_buffer.h @@ -0,0 +1,43 @@ +#pragma once + +#include + +namespace NCloud { + +//////////////////////////////////////////////////////////////////////////////// + +class TAlignedBuffer +{ +private: + TString Buffer; + const char* AlignedData; + +public: + static TStringBuf ExtractAlignedData( + const TString& buffer, + ui32 align); + + TAlignedBuffer(); + TAlignedBuffer(const TAlignedBuffer&) = delete; + TAlignedBuffer& operator=(const TAlignedBuffer&) = delete; + + TAlignedBuffer(TAlignedBuffer&& other); + TAlignedBuffer& operator=(TAlignedBuffer&& other); + + TAlignedBuffer(ui32 size, ui32 align); + TAlignedBuffer(TString&& buffer, ui32 align); + + size_t AlignedDataOffset() const; + + char* Begin(); + const char* Begin() const; + + char* End(); + const char* End() const; + + size_t Size() const; + void TrimSize(size_t size); + + TString& AccessBuffer(); +}; +} // namespace NCloud diff --git a/cloud/storage/core/libs/common/aligned_buffer_ut.cpp b/cloud/storage/core/libs/common/aligned_buffer_ut.cpp new file mode 100644 index 00000000000..f771efd7ea5 --- /dev/null +++ b/cloud/storage/core/libs/common/aligned_buffer_ut.cpp @@ -0,0 +1,148 @@ +#include "aligned_buffer.h" + +#include + +#include + +#include + +namespace NCloud { + +namespace { + +#define UNIT_ASSERT_PTR_EQUAL(A, B) \ + UNIT_ASSERT_VALUES_EQUAL((void*)(A), (void*)(B)) + +} // namespace + +//////////////////////////////////////////////////////////////////////////////// + +Y_UNIT_TEST_SUITE(TAlignedBufferTest) +{ + Y_UNIT_TEST(ShouldAlignToPowerOf2) + { + TVector buffers; + for (ui32 size: {123, 1234, 12345, 123456, 1234567}) { + for (ui32 alignShift = 0; alignShift < 17; alignShift++) { + auto align = 1 << alignShift; + TAlignedBuffer buffer(size, align); + + Cerr << "size=" << size << ", align=" << align + << ", offset=" << buffer.AlignedDataOffset() + << ", begin=" << (void*)buffer.Begin() << Endl; + UNIT_ASSERT_EQUAL(size, buffer.Size()); + UNIT_ASSERT_VALUES_EQUAL_C( + reinterpret_cast(buffer.Begin()) % align, + 0, + "size=" << size << " ,align=" << align + << " ,buffer=" << (void*)buffer.Begin()); + UNIT_ASSERT_VALUES_EQUAL_C( + buffer.AlignedDataOffset(), + buffer.Begin() - buffer.AccessBuffer().begin(), + "size=" << size << " ,align=" << align + << " ,buffer=" << (void*)buffer.Begin()); + buffers.push_back(std::move(buffer)); + } + } + } + + Y_UNIT_TEST(ShouldAlignTo0) + { + TAlignedBuffer buffer(5678, 0); + UNIT_ASSERT_VALUES_EQUAL(buffer.Size(), 5678); + } + + Y_UNIT_TEST(ShouldReconstructAlignedBuffer) + { + ui32 align = 1 << 21; + ui32 size = 5678; + + TAlignedBuffer buffer1(size, align); + UNIT_ASSERT_VALUES_EQUAL(buffer1.Size(), size); + + auto* buffer1Mem = buffer1.Begin(); + + TAlignedBuffer buffer2(std::move(buffer1.AccessBuffer()), align); + UNIT_ASSERT_VALUES_EQUAL(0, buffer1.AccessBuffer().size()); + UNIT_ASSERT_VALUES_EQUAL(size, buffer2.Size()); + + auto* buffer2Mem = buffer2.Begin(); + UNIT_ASSERT_PTR_EQUAL(buffer1Mem, buffer2Mem); + } + + Y_UNIT_TEST(ShouldMoveAlignedBuffer) + { + ui32 align = 1 << 21; + ui32 size = 5678; + + TAlignedBuffer buffer0(size, align); + UNIT_ASSERT_VALUES_EQUAL(buffer0.Size(), size); + + auto* bufferMem = buffer0.Begin(); + + TAlignedBuffer buffer; + UNIT_ASSERT_VALUES_EQUAL(buffer.Size(), 0); + + buffer = std::move(buffer0); + UNIT_ASSERT_PTR_EQUAL(bufferMem, buffer.Begin()); + UNIT_ASSERT_VALUES_EQUAL(buffer.Size(), size); + UNIT_ASSERT_VALUES_EQUAL(0, buffer0.Size()); + + TAlignedBuffer buffer2(std::move(buffer)); + UNIT_ASSERT_PTR_EQUAL(bufferMem, buffer2.Begin()); + UNIT_ASSERT_VALUES_EQUAL(buffer2.Size(), size); + UNIT_ASSERT_VALUES_EQUAL(0, buffer.Size()); + + TAlignedBuffer buffer3 = std::move(buffer2); + UNIT_ASSERT_PTR_EQUAL(bufferMem, buffer3.Begin()); + UNIT_ASSERT_VALUES_EQUAL(buffer3.Size(), size); + UNIT_ASSERT_VALUES_EQUAL(0, buffer2.Size()); + } + + Y_UNIT_TEST(ShouldResizeAlignedBuffer) + { + ui32 align = 1 << 21; + ui32 size = 5678; + + TAlignedBuffer buffer(size, align); + UNIT_ASSERT_VALUES_EQUAL(buffer.Size(), size); + + auto* bufferBegin = buffer.Begin(); + buffer.TrimSize(5555); + UNIT_ASSERT_VALUES_EQUAL(5555, buffer.Size()); + UNIT_ASSERT_PTR_EQUAL(bufferBegin, buffer.Begin()); + + UNIT_ASSERT_EXCEPTION_CONTAINS( + buffer.TrimSize(5556), + TServiceError, + "Tried to trim to size 5556 > 5555"); + } + + Y_UNIT_TEST(ShouldExtractAlignedData) + { + ui32 align = 1 << 21; + ui32 size = 5678; + + TAlignedBuffer buffer(size, align); + UNIT_ASSERT_VALUES_EQUAL(buffer.Size(), size); + + auto extractedBuf = + TAlignedBuffer::ExtractAlignedData(buffer.AccessBuffer(), align); + UNIT_ASSERT_PTR_EQUAL(buffer.Begin(), extractedBuf.Data()); + UNIT_ASSERT_VALUES_EQUAL(buffer.Size(), extractedBuf.Size()); + + TString buffer1 = "abcdefg"; + + UNIT_ASSERT_EXCEPTION_CONTAINS( + TAlignedBuffer::ExtractAlignedData(buffer1, align), + TServiceError, + "Extracting unaligned buffer"); + + UNIT_ASSERT_EXCEPTION_CONTAINS( + TAlignedBuffer(std::move(buffer1), align), + TServiceError, + "Initializing from unaligned buffer"); + } +} + +} // namespace NCloud diff --git a/cloud/storage/core/libs/common/ut/ya.make b/cloud/storage/core/libs/common/ut/ya.make index d1c3104f77b..1c47576f093 100644 --- a/cloud/storage/core/libs/common/ut/ya.make +++ b/cloud/storage/core/libs/common/ut/ya.make @@ -15,6 +15,7 @@ IF (SANITIZER_TYPE != "thread") ENDIF() SRCS( + aligned_buffer_ut.cpp backoff_delay_provider_ut.cpp block_buffer_ut.cpp block_data_ref_ut.cpp diff --git a/cloud/storage/core/libs/common/ya.make b/cloud/storage/core/libs/common/ya.make index 17efe71b686..8bbfdef79a0 100644 --- a/cloud/storage/core/libs/common/ya.make +++ b/cloud/storage/core/libs/common/ya.make @@ -4,6 +4,7 @@ GENERATE_ENUM_SERIALIZATION(error.h) SRCS( affinity.cpp + aligned_buffer.cpp alloc.cpp backoff_delay_provider.cpp block_buffer.cpp