diff --git a/velox/common/file/File.h b/velox/common/file/File.h index 603ed28ba20e..32f8540eb3ac 100644 --- a/velox/common/file/File.h +++ b/velox/common/file/File.h @@ -235,16 +235,27 @@ struct HeapMemoryMock { explicit HeapMemoryMock(void* memory, size_t capacity) : memory_(memory), capacity_(capacity) {} + void reset() { + memory_ = nullptr; + size_ = 0; + capacity_ = 0; + } + bool isValid() const { return memory_ != nullptr; } - void write(const void* data, size_t len) { + void write(const void* src, size_t len) { assert(len <= freeSize()); - memcpy(end(), data, len); + memcpy(end(), src, len); size_ += len; } + void read(void* dst, size_ len, size_t offset) { + assert(offset + len <= size_); + memcpy(dst, (char*)memory_ + offset, len); + } + auto size() const { return size_; } @@ -268,28 +279,29 @@ struct HeapMemoryMock { const size_t kHeapMemoryCapacity = 64 * 1024; -class HeapMemoryManagerMock { +class HeapMemoryMockManager { public: - static HeapMemoryManagerMock& instance() { - static HeapMemoryManagerMock hmmm; + static HeapMemoryMockManager& instance() { + static HeapMemoryMockManager hmmm; return hmmm; } HeapMemoryMock alloc(size_t size) { - HeapMemoryMock handle; + HeapMemoryMock heapMemory; if (size_ + size <= kHeapMemoryCapacity) { - handle.memory_ = malloc(size); - handle.size_ = 0; - handle.capacity_ = size; + heapMemory.memory_ = malloc(size); + heapMemory.size_ = 0; + heapMemory.capacity_ = size; size_ += size; } - return handle; + return heapMemory; } - void free(HeapMemoryMock handle) { - if (handle.isValid()) { - size_ -= handle.size_; - ::free(handle.memory_); + void free(HeapMemoryMock& heapMemory) { + if (heapMemory.isValid()) { + size_ -= heapMemory.size_; + ::free(heapMemory.memory_); + heapMemory.reset(); } } @@ -298,23 +310,69 @@ class HeapMemoryManagerMock { }; inline HeapMemoryMock allocHeapMemory(size_t size) { - return HeapMemoryManagerMock::instance().alloc(size); + return HeapMemoryMockManager::instance().alloc(size); } -inline void freeHeapMemory(HeapMemoryMock handle) { - HeapMemoryManagerMock::instance().free(handle); +inline void freeHeapMemory(HeapMemoryMock& heapMemory) { + HeapMemoryMockManager::instance().free(heapMemory); } -class HeapMemoryWriteFile final : public WriteFile { +class HeapMemoryReadFile : public ReadFile { public: - explicit HeapMemoryWriteFile(HeapMemoryMock handle) : handle_(handle) {} + explicit HeapMemoryReadFile(HeapMemoryMock& heapMemory) + : heapMemory_(heapMemory) {} + + std::string_view pread( + uint64_t offset, + uint64_t length, + void* FOLLY_NONNULL buf) const override { + bytesRead_ += length; + heapMemory_.read(buf, length, offset); + return {static_cast(buf), length}; + } - ~HeapMemoryWriteFile() { - freeHeapMemory(handle_); + std::string pread(uint64_t offset, uint64_t length) const override { + bytesRead_ += length; + assert(offset + lenght <= heapMemory_.size()); + return std::string((char*)heapMemory_.begin() + offset, length); } + uint64_t size() const final { + return heapMemory_.size(); + } + + uint64_t memoryUsage() const final { + return size(); + } + + // Mainly for testing. Coalescing isn't helpful for in memory data. + void setShouldCoalesce(bool shouldCoalesce) { + shouldCoalesce_ = shouldCoalesce; + } + bool shouldCoalesce() const final { + return shouldCoalesce_; + } + + std::string getName() const override { + return ""; + } + + uint64_t getNaturalReadSize() const override { + return 1024; + } + + private: + HeapMemoryMock& heapMemory_; + bool shouldCoalesce_ = false; +}; + +class HeapMemoryWriteFile final : public WriteFile { + public: + explicit HeapMemoryWriteFile(HeapMemoryMock& heapMemory) + : heapMemory_(heapMemory) {} + void append(std::string_view data) final { - handle_.write(data.data(), data.length()); + heapMemory_.write(data.data(), data.length()); } void flush() final {} @@ -322,11 +380,11 @@ class HeapMemoryWriteFile final : public WriteFile { void close() final {} uint64_t size() const final { - return handle_.size_; + return heapMemory_.size_; } private: - HeapMemoryMock handle_; + HeapMemoryMock& heapMemory_; }; // Current implementation for the local version is quite simple (e.g. no diff --git a/velox/exec/Spill.cpp b/velox/exec/Spill.cpp index 0e6a02057e61..926cac340cf3 100644 --- a/velox/exec/Spill.cpp +++ b/velox/exec/Spill.cpp @@ -46,12 +46,14 @@ void SpillMergeStream::pop() { } void SpillFile::newOutput() { - auto handle = allocHeapMemory(targetFileSize_); - if (handle.isValid()) { - output_ = std::make_unique(handle); + heapMemoryMock_ = allocHeapMemory(targetFileSize_); + if (heapMemoryMock_.isValid()) { + output_ = std::make_unique(heapMemoryMock_); + toWhere_ = TO_HEAP; } else { auto fs = filesystems::getFileSystem(path_, nullptr); output_ = fs->openFileForWrite(path_); + toWhere_ = TO_FILE; } } @@ -65,13 +67,23 @@ WriteFile& SpillFile::output() { void SpillFile::startRead() { constexpr uint64_t kMaxReadBufferSize = (1 << 20) - AlignedBuffer::kPaddedSize; // 1MB - padding. + VELOX_CHECK(!output_); VELOX_CHECK(!input_); - auto fs = filesystems::getFileSystem(path_, nullptr); - auto file = fs->openFileForRead(path_); - auto buffer = AlignedBuffer::allocate( - std::min(fileSize_, kMaxReadBufferSize), &pool_); - input_ = std::make_unique(std::move(file), std::move(buffer)); + + if (toWhere_ == TO_FILE) { + auto fs = filesystems::getFileSystem(path_, nullptr); + auto file = fs->openFileForRead(path_); + auto buffer = AlignedBuffer::allocate( + std::min(fileSize_, kMaxReadBufferSize), &pool_); + input_ = std::make_unique(std::move(file), std::move(buffer)); + } else { + std::unique_ptr = + std::make_unique(heapMemoryMock_); + auto buffer = AlignedBuffer::allocate( + std::min(fileSize_, kMaxReadBufferSize), &pool_); + input_ = std::make_unique(std::move(file), std::move(buffer)); + } } bool SpillFile::nextBatch(RowVectorPtr& rowVector) { diff --git a/velox/exec/Spill.h b/velox/exec/Spill.h index eabe302cfd8c..fad81116d32b 100644 --- a/velox/exec/Spill.h +++ b/velox/exec/Spill.h @@ -84,6 +84,12 @@ class SpillFile { sortCompareFlags_.size() == numSortingKeys_); } + ~SpillFile() { + if (heapMemoryMock_.isValid()) { + freeHeapMemory(heapMemoryMock_); + } + } + int32_t numSortingKeys() const { return numSortingKeys_; } @@ -149,6 +155,13 @@ class SpillFile { const int32_t ordinal_; const std::string path_; + enum { + TO_FILE, + TO_HEAP, + } toWhere_ = TO_FILE; + + HeapMemoryMock heapMemoryMock_; + // Byte size of the backing file. Set when finishing writing. uint64_t fileSize_ = 0; uint64_t targetFileSize_ = 0;