Skip to content

Commit

Permalink
xx
Browse files Browse the repository at this point in the history
  • Loading branch information
zuochunwei committed Jul 7, 2023
1 parent d219a03 commit fc1f166
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 32 deletions.
106 changes: 82 additions & 24 deletions velox/common/file/File.h
Original file line number Diff line number Diff line change
Expand Up @@ -235,16 +235,27 @@ struct HeapMemoryMock {
explicit HeapMemoryMock(void* memory, size_t capacity)
: memory_(memory), capacity_(capacity) {}

void reset() {
memory_ = nullptr;
size_ = 0;
capacity_ = 0;
}

bool isValid() const {
return memory_ != nullptr;
}

void write(const void* data, size_t len) {
void write(const void* src, size_t len) {
assert(len <= freeSize());
memcpy(end(), data, len);
memcpy(end(), src, len);
size_ += len;
}

void read(void* dst, size_ len, size_t offset) {
assert(offset + len <= size_);
memcpy(dst, (char*)memory_ + offset, len);
}

auto size() const {
return size_;
}
Expand All @@ -268,28 +279,29 @@ struct HeapMemoryMock {

const size_t kHeapMemoryCapacity = 64 * 1024;

class HeapMemoryManagerMock {
class HeapMemoryMockManager {
public:
static HeapMemoryManagerMock& instance() {
static HeapMemoryManagerMock hmmm;
static HeapMemoryMockManager& instance() {
static HeapMemoryMockManager hmmm;
return hmmm;
}

HeapMemoryMock alloc(size_t size) {
HeapMemoryMock handle;
HeapMemoryMock heapMemory;
if (size_ + size <= kHeapMemoryCapacity) {
handle.memory_ = malloc(size);
handle.size_ = 0;
handle.capacity_ = size;
heapMemory.memory_ = malloc(size);
heapMemory.size_ = 0;
heapMemory.capacity_ = size;
size_ += size;
}
return handle;
return heapMemory;
}

void free(HeapMemoryMock handle) {
if (handle.isValid()) {
size_ -= handle.size_;
::free(handle.memory_);
void free(HeapMemoryMock& heapMemory) {
if (heapMemory.isValid()) {
size_ -= heapMemory.size_;
::free(heapMemory.memory_);
heapMemory.reset();
}
}

Expand All @@ -298,35 +310,81 @@ class HeapMemoryManagerMock {
};

inline HeapMemoryMock allocHeapMemory(size_t size) {
return HeapMemoryManagerMock::instance().alloc(size);
return HeapMemoryMockManager::instance().alloc(size);
}

inline void freeHeapMemory(HeapMemoryMock handle) {
HeapMemoryManagerMock::instance().free(handle);
inline void freeHeapMemory(HeapMemoryMock& heapMemory) {
HeapMemoryMockManager::instance().free(heapMemory);
}

class HeapMemoryWriteFile final : public WriteFile {
class HeapMemoryReadFile : public ReadFile {
public:
explicit HeapMemoryWriteFile(HeapMemoryMock handle) : handle_(handle) {}
explicit HeapMemoryReadFile(HeapMemoryMock& heapMemory)
: heapMemory_(heapMemory) {}

std::string_view pread(
uint64_t offset,
uint64_t length,
void* FOLLY_NONNULL buf) const override {
bytesRead_ += length;
heapMemory_.read(buf, length, offset);
return {static_cast<char*>(buf), length};
}

~HeapMemoryWriteFile() {
freeHeapMemory(handle_);
std::string pread(uint64_t offset, uint64_t length) const override {
bytesRead_ += length;
assert(offset + lenght <= heapMemory_.size());
return std::string((char*)heapMemory_.begin() + offset, length);
}

uint64_t size() const final {
return heapMemory_.size();
}

uint64_t memoryUsage() const final {
return size();
}

// Mainly for testing. Coalescing isn't helpful for in memory data.
void setShouldCoalesce(bool shouldCoalesce) {
shouldCoalesce_ = shouldCoalesce;
}
bool shouldCoalesce() const final {
return shouldCoalesce_;
}

std::string getName() const override {
return "<HeapMemoryReadFile>";
}

uint64_t getNaturalReadSize() const override {
return 1024;
}

private:
HeapMemoryMock& heapMemory_;
bool shouldCoalesce_ = false;
};

class HeapMemoryWriteFile final : public WriteFile {
public:
explicit HeapMemoryWriteFile(HeapMemoryMock& heapMemory)
: heapMemory_(heapMemory) {}

void append(std::string_view data) final {
handle_.write(data.data(), data.length());
heapMemory_.write(data.data(), data.length());
}

void flush() final {}

void close() final {}

uint64_t size() const final {
return handle_.size_;
return heapMemory_.size_;
}

private:
HeapMemoryMock handle_;
HeapMemoryMock& heapMemory_;
};

// Current implementation for the local version is quite simple (e.g. no
Expand Down
28 changes: 20 additions & 8 deletions velox/exec/Spill.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,14 @@ void SpillMergeStream::pop() {
}

void SpillFile::newOutput() {
auto handle = allocHeapMemory(targetFileSize_);
if (handle.isValid()) {
output_ = std::make_unique<HeapMemoryWriteFile>(handle);
heapMemoryMock_ = allocHeapMemory(targetFileSize_);
if (heapMemoryMock_.isValid()) {
output_ = std::make_unique<HeapMemoryWriteFile>(heapMemoryMock_);
toWhere_ = TO_HEAP;
} else {
auto fs = filesystems::getFileSystem(path_, nullptr);
output_ = fs->openFileForWrite(path_);
toWhere_ = TO_FILE;
}
}

Expand All @@ -65,13 +67,23 @@ WriteFile& SpillFile::output() {
void SpillFile::startRead() {
constexpr uint64_t kMaxReadBufferSize =
(1 << 20) - AlignedBuffer::kPaddedSize; // 1MB - padding.

VELOX_CHECK(!output_);
VELOX_CHECK(!input_);
auto fs = filesystems::getFileSystem(path_, nullptr);
auto file = fs->openFileForRead(path_);
auto buffer = AlignedBuffer::allocate<char>(
std::min<uint64_t>(fileSize_, kMaxReadBufferSize), &pool_);
input_ = std::make_unique<SpillInput>(std::move(file), std::move(buffer));

if (toWhere_ == TO_FILE) {
auto fs = filesystems::getFileSystem(path_, nullptr);
auto file = fs->openFileForRead(path_);
auto buffer = AlignedBuffer::allocate<char>(
std::min<uint64_t>(fileSize_, kMaxReadBufferSize), &pool_);
input_ = std::make_unique<SpillInput>(std::move(file), std::move(buffer));
} else {
std::unique_ptr<ReadFile> =
std::make_unique<HeapMemoryReadFile>(heapMemoryMock_);
auto buffer = AlignedBuffer::allocate<char>(
std::min<uint64_t>(fileSize_, kMaxReadBufferSize), &pool_);
input_ = std::make_unique<SpillInput>(std::move(file), std::move(buffer));
}
}

bool SpillFile::nextBatch(RowVectorPtr& rowVector) {
Expand Down
13 changes: 13 additions & 0 deletions velox/exec/Spill.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ class SpillFile {
sortCompareFlags_.size() == numSortingKeys_);
}

~SpillFile() {
if (heapMemoryMock_.isValid()) {
freeHeapMemory(heapMemoryMock_);
}
}

int32_t numSortingKeys() const {
return numSortingKeys_;
}
Expand Down Expand Up @@ -149,6 +155,13 @@ class SpillFile {
const int32_t ordinal_;
const std::string path_;

enum {
TO_FILE,
TO_HEAP,
} toWhere_ = TO_FILE;

HeapMemoryMock heapMemoryMock_;

// Byte size of the backing file. Set when finishing writing.
uint64_t fileSize_ = 0;
uint64_t targetFileSize_ = 0;
Expand Down

0 comments on commit fc1f166

Please sign in to comment.