Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
marin-ma committed Oct 31, 2023
1 parent fc3dbbd commit 1c4cef5
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 135 deletions.
59 changes: 1 addition & 58 deletions cpp/core/benchmarks/CompressionBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,62 +65,6 @@ const int32_t kQplGzip = 2;
const int32_t kLZ4 = 3;
const int32_t kZstd = 4;

class LimitedMemoryPool final : public arrow::MemoryPool {
public:
explicit LimitedMemoryPool() {}

Status Allocate(int64_t size, int64_t alignment, uint8_t** out) override {
RETURN_NOT_OK(pool_->Allocate(size, out));
stats_.UpdateAllocatedBytes(size);
// std::cout << "Allocate: size = " << size << " addr = " << std::hex <<
// (uint64_t)*out << std::dec << std::endl; print_trace();
return arrow::Status::OK();
}

Status Reallocate(int64_t oldSize, int64_t newSize, int64_t alignment, uint8_t** ptr) override {
// auto old_ptr = *ptr;
RETURN_NOT_OK(pool_->Reallocate(oldSize, newSize, ptr));
stats_.UpdateAllocatedBytes(newSize - oldSize);
// std::cout << "Reallocate: old_size = " << old_size << " old_ptr = " <<
// std::hex << (uint64_t)old_ptr << std::dec << " new_size = " << new_size
// << " addr = " << std::hex << (uint64_t)*ptr << std::dec << std::endl;
// print_trace();
return arrow::Status::OK();
}

void Free(uint8_t* buffer, int64_t size, int64_t alignment) override {
pool_->Free(buffer, size);
stats_.UpdateAllocatedBytes(-size);
// std::cout << "Free: size = " << size << " addr = " << std::hex <<
// (uint64_t)buffer
// << std::dec << std::endl; print_trace();
}

int64_t bytes_allocated() const override {
return stats_.bytes_allocated();
}

int64_t max_memory() const override {
return pool_->max_memory();
}

std::string backend_name() const override {
return pool_->backend_name();
}

int64_t total_bytes_allocated() const override {
return pool_->total_bytes_allocated();
}

int64_t num_allocations() const override {
throw pool_->num_allocations();
}

private:
arrow::MemoryPool* pool_ = arrow::default_memory_pool();
arrow::internal::MemoryPoolStats stats_;
};

class BenchmarkCompression {
public:
explicit BenchmarkCompression(const std::string& fileName, uint32_t compressBufferSize) {
Expand Down Expand Up @@ -195,8 +139,7 @@ class BenchmarkCompression {
default:
throw GlutenException("Codec not supported. Only support LZ4 or QATGzip");
}
std::shared_ptr<arrow::MemoryPool> pool = std::make_shared<LimitedMemoryPool>();
ipcWriteOptions.memory_pool = pool.get();
ipcWriteOptions.memory_pool = arrow::default_memory_pool();

int64_t elapseRead = 0;
int64_t numBatches = 0;
Expand Down
36 changes: 18 additions & 18 deletions cpp/core/shuffle/LocalPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class LocalPartitionWriter::LocalEvictHandle : public EvictHandle {
const std::shared_ptr<SpillInfo>& spillInfo,
bool flush);

bool finished() override {
bool finished() {
return finished_;
};

Expand Down Expand Up @@ -86,9 +86,9 @@ class CacheEvictHandle final : public LocalPartitionWriter::LocalEvictHandle {
if (written > 0) {
spillInfo_->empty = false;
}
}

finished_ = true;
finished_ = true;
}
return arrow::Status::OK();
}

Expand All @@ -102,9 +102,7 @@ class CacheEvictHandle final : public LocalPartitionWriter::LocalEvictHandle {
// Clear cached batches before creating the payloads, to avoid spilling this partition.
partitionCachedPayload_[partitionId].clear();
for (auto& payload : payloads) {
#ifndef SKIPWRITE
RETURN_NOT_OK(arrow::ipc::WriteIpcPayload(*payload, options_, os, &metadataLength));
#endif
}
return arrow::Status::OK();
}
Expand All @@ -128,9 +126,7 @@ class FlushOnSpillEvictHandle final : public LocalPartitionWriter::LocalEvictHan
int32_t metadataLength = 0; // unused.

ARROW_ASSIGN_OR_RAISE(auto start, os_->Tell());
#ifndef SKIPWRITE
RETURN_NOT_OK(arrow::ipc::WriteIpcPayload(*payload, options_, os_.get(), &metadataLength));
#endif
ARROW_ASSIGN_OR_RAISE(auto end, os_->Tell());
DEBUG_OUT << "Spilled partition " << partitionId << " file start: " << start << ", file end: " << end << std::endl;
spillInfo_->partitionSpillInfos.push_back({partitionId, end - start});
Expand Down Expand Up @@ -269,10 +265,8 @@ arrow::Status LocalPartitionWriter::stop() {
writeTimer.start();
if (lastPayload) {
int32_t metadataLength = 0; // unused
#ifndef SKIPWRITE
RETURN_NOT_OK(arrow::ipc::WriteIpcPayload(
*lastPayload, shuffleWriter_->options().ipc_write_options, dataFileOs_.get(), &metadataLength));
#endif
}
ARROW_ASSIGN_OR_RAISE(endInFinalFile, dataFileOs_->Tell());
if (endInFinalFile != startInFinalFile && shuffleWriter_->options().write_eos) {
Expand Down Expand Up @@ -312,15 +306,7 @@ arrow::Status LocalPartitionWriter::stop() {
}

arrow::Status LocalPartitionWriter::requestNextEvict(bool flush) {
if (auto handle = getEvictHandle()) {
RETURN_NOT_OK(handle->finish());
// Discard last SpillInfo if not data spilled.
auto lastSpillInfo = spills_.back();
if (lastSpillInfo->empty) {
RETURN_NOT_OK(fs_->DeleteFile(lastSpillInfo->spilledFile));
spills_.pop_back();
}
}
RETURN_NOT_OK(finishEvict());
ARROW_ASSIGN_OR_RAISE(auto spilledFile, createTempShuffleFile(nextSpilledFileDir()));
auto spillInfo = std::make_shared<SpillInfo>(spilledFile);
spills_.push_back(spillInfo);
Expand All @@ -336,6 +322,20 @@ EvictHandle* LocalPartitionWriter::getEvictHandle() {
return nullptr;
}

arrow::Status LocalPartitionWriter::finishEvict() {
if (auto handle = getEvictHandle()) {
RETURN_NOT_OK(handle->finish());
// The spilled file should not be empty. However, defensively
// discard the last SpillInfo to avoid iterating over invalid ones.
auto lastSpillInfo = spills_.back();
if (lastSpillInfo->empty) {
RETURN_NOT_OK(fs_->DeleteFile(lastSpillInfo->spilledFile));
spills_.pop_back();
}
}
return arrow::Status::OK();
}

LocalPartitionWriterCreator::LocalPartitionWriterCreator() : PartitionWriterCreator() {}

arrow::Result<std::shared_ptr<ShuffleWriter::PartitionWriter>> LocalPartitionWriterCreator::make(
Expand Down
2 changes: 2 additions & 0 deletions cpp/core/shuffle/LocalPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class LocalPartitionWriter : public ShuffleWriter::PartitionWriter {

EvictHandle* getEvictHandle() override;

arrow::Status finishEvict() override;

/// The stop function performs several tasks:
/// 1. Opens the final data file.
/// 2. Iterates over each partition ID (pid) to:
Expand Down
4 changes: 2 additions & 2 deletions cpp/core/shuffle/PartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ class EvictHandle {
virtual arrow::Status evict(uint32_t partitionId, std::unique_ptr<arrow::ipc::IpcPayload> payload) = 0;

virtual arrow::Status finish() = 0;

virtual bool finished() = 0;
};

class ShuffleWriter::PartitionWriter {
Expand All @@ -55,6 +53,8 @@ class ShuffleWriter::PartitionWriter {
/// \return
virtual EvictHandle* getEvictHandle() = 0;

virtual arrow::Status finishEvict() = 0;

ShuffleWriter* shuffleWriter_;
};

Expand Down
10 changes: 4 additions & 6 deletions cpp/core/shuffle/rss/CelebornPartitionWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ class CelebornEvictHandle final : public EvictHandle {
// Copy payload to arrow buffered os.
ARROW_ASSIGN_OR_RAISE(auto celebornBufferOs, arrow::io::BufferOutputStream::Create(bufferSize_, pool_));
int32_t metadataLength = 0; // unused
#ifndef SKIPWRITE
RETURN_NOT_OK(arrow::ipc::WriteIpcPayload(*payload, options_, celebornBufferOs.get(), &metadataLength));
#endif
payload = nullptr; // Invalidate payload immediately.

// Push.
Expand All @@ -49,10 +47,6 @@ class CelebornEvictHandle final : public EvictHandle {
return arrow::Status::OK();
}

bool finished() override {
return true;
}

private:
int64_t bufferSize_;
arrow::ipc::IpcWriteOptions options_;
Expand Down Expand Up @@ -92,6 +86,10 @@ EvictHandle* CelebornPartitionWriter::getEvictHandle() {
return evictHandle_.get();
}

arrow::Status CelebornPartitionWriter::finishEvict() {
return evictHandle_->finish();
}

CelebornPartitionWriterCreator::CelebornPartitionWriterCreator(std::shared_ptr<RssClient> client)
: PartitionWriterCreator(), client_(client) {}

Expand Down
2 changes: 2 additions & 0 deletions cpp/core/shuffle/rss/CelebornPartitionWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class CelebornPartitionWriter final : public RemotePartitionWriter {

EvictHandle* getEvictHandle() override;

arrow::Status finishEvict() override;

arrow::Status init() override;

arrow::Status stop() override;
Expand Down
14 changes: 9 additions & 5 deletions cpp/velox/shuffle/VeloxShuffleWriter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@ namespace gluten {
#define PREFETCHT1(ptr) __builtin_prefetch(ptr, 0, 2)
#define PREFETCHT2(ptr) __builtin_prefetch(ptr, 0, 1)
#endif
// #define SKIPWRITE

namespace {

Expand Down Expand Up @@ -622,6 +621,10 @@ arrow::Status VeloxShuffleWriter::updateInputHasNull(const facebook::velox::RowV
return arrow::Status::OK();
}

void VeloxShuffleWriter::setSplitState(SplitState state) {
splitState_ = state;
}

arrow::Status VeloxShuffleWriter::doSplit(const facebook::velox::RowVector& rv, int64_t memLimit) {
auto rowNum = rv.size();
RETURN_NOT_OK(buildPartition2Row(rowNum));
Expand Down Expand Up @@ -1114,8 +1117,9 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel

memLimit += cachedPayloadSize();
// make sure split buffer uses 128M memory at least, let's hardcode it here for now
if (memLimit < kMinMemLimit)
if (memLimit < kMinMemLimit) {
memLimit = kMinMemLimit;
}

uint64_t preAllocRowCnt =
memLimit > 0 && bytesPerRow > 0 ? memLimit / bytesPerRow / numPartitions_ >> 2 : options_.buffer_size;
Expand Down Expand Up @@ -1447,9 +1451,9 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
}
auto evicted = beforeEvict;

if (auto evictHandle = partitionWriter_->getEvictHandle()) {
{
ScopedTimer evictTime(totalEvictTime_);
RETURN_NOT_OK(evictHandle->finish());
RETURN_NOT_OK(partitionWriter_->finishEvict());
}

if (auto afterEvict = cachedPayloadSize()) {
Expand Down Expand Up @@ -1694,7 +1698,7 @@ arrow::Status VeloxShuffleWriter::splitFixedWidthValueBuffer(const facebook::vel
}
}
spillTime.start();
RETURN_NOT_OK(evictHandle->finish());
RETURN_NOT_OK(partitionWriter_->finishEvict());
spillTime.stop();
totalEvictTime_ += spillTime.realTimeUsed();
}
Expand Down
12 changes: 2 additions & 10 deletions cpp/velox/shuffle/VeloxShuffleWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -183,16 +183,6 @@ class VeloxShuffleWriter final : public ShuffleWriter {
VS_PRINT_CONTAINER(input_has_null_);
}

// Public for test only.
void setSplitState(SplitState state) {
splitState_ = state;
}

// For test only.
SplitState getSplitState() {
return splitState_;
}

protected:
VeloxShuffleWriter(
uint32_t numPartitions,
Expand Down Expand Up @@ -221,6 +211,8 @@ class VeloxShuffleWriter final : public ShuffleWriter {

arrow::Status updateInputHasNull(const facebook::velox::RowVector& rv);

void setSplitState(SplitState state);

arrow::Status doSplit(const facebook::velox::RowVector& rv, int64_t memLimit);

bool beyondThreshold(uint32_t partitionId, uint64_t newSize);
Expand Down
40 changes: 4 additions & 36 deletions cpp/velox/tests/VeloxShuffleWriterTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -284,17 +284,7 @@ TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceRealloc) {
auto shuffleWriter = createShuffleWriter();

// First spilt no null.
std::vector<VectorPtr> noNull = {
makeFlatVector<int8_t>({0, 1}),
makeFlatVector<int8_t>({0, -1}),
makeFlatVector<int32_t>({0, 100}),
makeFlatVector<int64_t>({0, 1}),
makeFlatVector<float>({0, 0.142857}),
makeFlatVector<bool>({false, true}),
makeFlatVector<velox::StringView>({"", "alice"}),
makeFlatVector<velox::StringView>({"alice", ""}),
};
auto inputNoNull = makeRowVector(noNull);
auto inputNoNull = inputVectorNoNull_;

// Second split has null. Continue filling current partition buffers.
std::vector<VectorPtr> intHasNull = {
Expand Down Expand Up @@ -348,31 +338,9 @@ TEST_P(RoundRobinPartitioningShuffleWriter, preAllocForceReuse) {
auto shuffleWriter = createShuffleWriter();

// First spilt no null.
std::vector<VectorPtr> noNull = {
makeFlatVector<int8_t>({0, 1}),
makeFlatVector<int8_t>({0, -1}),
makeFlatVector<int32_t>({0, 100}),
makeFlatVector<int64_t>({0, 1}),
makeFlatVector<float>({0, 0.142857}),
makeFlatVector<bool>({false, true}),
makeFlatVector<velox::StringView>({"", "alice"}),
makeFlatVector<velox::StringView>({"alice", ""}),
};
auto inputNoNull = makeRowVector(noNull);

// Second split has null int.
std::vector<VectorPtr> fixedWithdHasNull = {
makeNullableFlatVector<int8_t>({0, 1, std::nullopt, std::nullopt}),
makeNullableFlatVector<int8_t>({0, -1, std::nullopt, std::nullopt}),
makeNullableFlatVector<int32_t>({0, 100, std::nullopt, std::nullopt}),
makeNullableFlatVector<int64_t>({0, 1, std::nullopt, std::nullopt}),
makeNullableFlatVector<float>({0, 0.142857, std::nullopt, std::nullopt}),
makeNullableFlatVector<bool>({false, true, std::nullopt, std::nullopt}),
makeNullableFlatVector<velox::StringView>({"", "alice", "", ""}),
makeNullableFlatVector<velox::StringView>({"alice", "", "", ""}),
};
auto inputFixedWidthHasNull = makeRowVector(fixedWithdHasNull);

auto inputNoNull = inputVectorNoNull_;
// Second split has null int, null string and non-null string,
auto inputFixedWidthHasNull = inputVector1_;
// Third split has null string.
std::vector<VectorPtr> stringHasNull = {
makeNullableFlatVector<int8_t>({0, 1}),
Expand Down
14 changes: 14 additions & 0 deletions cpp/velox/utils/tests/VeloxShuffleWriterTestBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,20 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase
makeNullableFlatVector<facebook::velox::StringView>({std::nullopt, std::nullopt}),
};

childrenNoNull_ = {
makeFlatVector<int8_t>({0, 1}),
makeFlatVector<int8_t>({0, -1}),
makeFlatVector<int32_t>({0, 100}),
makeFlatVector<int64_t>({0, 1}),
makeFlatVector<float>({0, 0.142857}),
makeFlatVector<bool>({false, true}),
makeFlatVector<facebook::velox::StringView>({"", "alice"}),
makeFlatVector<facebook::velox::StringView>({"alice", ""}),
};

inputVector1_ = makeRowVector(children1_);
inputVector2_ = makeRowVector(children2_);
inputVectorNoNull_ = makeRowVector(childrenNoNull_);
}

arrow::Status splitRowVector(VeloxShuffleWriter& shuffleWriter, facebook::velox::RowVectorPtr vector) {
Expand All @@ -126,9 +138,11 @@ class VeloxShuffleWriterTestBase : public facebook::velox::test::VectorTestBase

std::vector<facebook::velox::VectorPtr> children1_;
std::vector<facebook::velox::VectorPtr> children2_;
std::vector<facebook::velox::VectorPtr> childrenNoNull_;

facebook::velox::RowVectorPtr inputVector1_;
facebook::velox::RowVectorPtr inputVector2_;
facebook::velox::RowVectorPtr inputVectorNoNull_;
};

class VeloxShuffleWriterTest : public ::testing::TestWithParam<ShuffleTestParams>, public VeloxShuffleWriterTestBase {
Expand Down

0 comments on commit 1c4cef5

Please sign in to comment.