Skip to content

Commit

Permalink
directly using velox arrow and patch the arrow patch into velox arrow
Browse files Browse the repository at this point in the history
  • Loading branch information
JkSelf committed Aug 1, 2023
1 parent d8bc7d5 commit c489e50
Show file tree
Hide file tree
Showing 3 changed files with 246 additions and 47 deletions.
49 changes: 2 additions & 47 deletions third_party/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -24,54 +24,7 @@ if(VELOX_ENABLE_ARROW)
else()
set(THRIFT_SOURCE "BUNDLED")
endif()
# Use external arrow & parquet only if <package>_DIR is defined.
if(DEFINED Arrow_HOME)
find_package(Arrow PATHS "${Arrow_HOME}/arrow_install" NO_DEFAULT_PATH)
find_package(Parquet PATHS "${Arrow_HOME}/arrow_install" NO_DEFAULT_PATH)
if(Arrow_FOUND AND Parquet_FOUND)
add_library(arrow INTERFACE)
add_library(parquet INTERFACE)

if(TARGET Arrow::arrow_static)
target_link_libraries(arrow INTERFACE Arrow::arrow_static)
else()
target_link_libraries(arrow INTERFACE Arrow::arrow_shared)
endif()

if(TARGET Parquet::parquet_static)
target_link_libraries(parquet INTERFACE Parquet::parquet_static)
else()
target_link_libraries(parquet INTERFACE Parquet::parquet_shared)
endif()

message(STATUS "Using pre-builded arrow")
endif()

if(Thrift_FOUND)
add_library(thrift INTERFACE)
target_link_libraries(thrift INTERFACE thrift::thrift)
message(STATUS "Using system thrift")
else()
add_library(thrift STATIC IMPORTED GLOBAL)
if(NOT Thrift_FOUND)
set(THRIFT_ROOT ${Arrow_HOME}/arrow_ep/cpp/build/thrift_ep-install)
if(CMAKE_BUILD_TYPE STREQUAL "Debug")
set(THRIFT_LIB ${THRIFT_ROOT}/lib/libthriftd.a)
else()
set(THRIFT_LIB ${THRIFT_ROOT}/lib/libthrift.a)
endif()

file(MAKE_DIRECTORY ${THRIFT_ROOT}/include)
set(THRIFT_INCLUDE_DIR ${THRIFT_ROOT}/include)
endif()

set_property(TARGET thrift PROPERTY INTERFACE_INCLUDE_DIRECTORIES
${THRIFT_INCLUDE_DIR})
set_property(TARGET thrift PROPERTY IMPORTED_LOCATION ${THRIFT_LIB})
message(STATUS "Using pre-builded thrift")
endif()
return()
endif()
set(ARROW_PREFIX "${CMAKE_CURRENT_BINARY_DIR}/arrow_ep")
set(ARROW_CMAKE_ARGS
-DARROW_PARQUET=ON
Expand Down Expand Up @@ -119,6 +72,8 @@ if(VELOX_ENABLE_ARROW)
arrow_ep
PREFIX ${ARROW_PREFIX}
URL ${VELOX_ARROW_SOURCE_URL}
PATCH_COMMAND patch -p1 < ${CMAKE_CURRENT_SOURCE_DIR}/arrow_patches/memorypool.patch &&
patch -p1 < ${CMAKE_CURRENT_SOURCE_DIR}/arrow_patches/custom-codec.patch
URL_HASH ${VELOX_ARROW_BUILD_SHA256_CHECKSUM}
SOURCE_SUBDIR cpp
CMAKE_ARGS ${ARROW_CMAKE_ARGS}
Expand Down
222 changes: 222 additions & 0 deletions third_party/arrow_patches/custom-codec.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
From 6d7aa045030a7714a28b96a48ceaba6ec49fe0d1 Mon Sep 17 00:00:00 2001
From: Rong Ma <[email protected]>
Date: Mon, 16 Jan 2023 19:05:02 +0800
Subject: [PATCH] Add custom codec

---
cpp/src/arrow/ipc/metadata_internal.cc | 2 ++
cpp/src/arrow/ipc/options.cc | 6 ++++--
cpp/src/arrow/ipc/options.h | 3 +++
cpp/src/arrow/ipc/reader.cc | 2 ++
cpp/src/arrow/util/compression.cc | 14 ++++++++++++++
cpp/src/arrow/util/compression.h | 11 +++++++++++
cpp/src/arrow/util/type_fwd.h | 3 ++-
cpp/src/generated/Message_generated.h | 13 ++++++++-----
format/Message.fbs | 5 ++++-
9 files changed, 50 insertions(+), 9 deletions(-)

diff --git a/cpp/src/arrow/ipc/metadata_internal.cc b/cpp/src/arrow/ipc/metadata_internal.cc
index d2f2b20d1..f9a67134d 100644
--- a/cpp/src/arrow/ipc/metadata_internal.cc
+++ b/cpp/src/arrow/ipc/metadata_internal.cc
@@ -936,6 +936,8 @@ static Status GetBodyCompression(FBB& fbb, const IpcWriteOptions& options,
codec = flatbuf::CompressionType::LZ4_FRAME;
} else if (options.codec->compression_type() == Compression::ZSTD) {
codec = flatbuf::CompressionType::ZSTD;
+ } else if (options.codec->compression_type() == Compression::CUSTOM) {
+ codec = flatbuf::CompressionType::CUSTOM;
} else {
return Status::Invalid("Unsupported IPC compression codec: ",
options.codec->name());
diff --git a/cpp/src/arrow/ipc/options.cc b/cpp/src/arrow/ipc/options.cc
index e5b14a47f..b038dd6e3 100644
--- a/cpp/src/arrow/ipc/options.cc
+++ b/cpp/src/arrow/ipc/options.cc
@@ -29,8 +29,10 @@ IpcReadOptions IpcReadOptions::Defaults() { return IpcReadOptions(); }
namespace internal {

Status CheckCompressionSupported(Compression::type codec) {
- if (!(codec == Compression::LZ4_FRAME || codec == Compression::ZSTD)) {
- return Status::Invalid("Only LZ4_FRAME and ZSTD compression allowed");
+ if (std::none_of(
+ kSupportedCodec.cbegin(), kSupportedCodec.cend(),
+ [&codec](const Compression::type& supported) { return codec == supported; })) {
+ return Status::Invalid("Only LZ4_FRAME, ZSTD and CUSTOM compression allowed");
}
return Status::OK();
}
diff --git a/cpp/src/arrow/ipc/options.h b/cpp/src/arrow/ipc/options.h
index 2af9d8e9c..037c2fe9c 100644
--- a/cpp/src/arrow/ipc/options.h
+++ b/cpp/src/arrow/ipc/options.h
@@ -159,6 +159,9 @@ struct ARROW_EXPORT IpcReadOptions {

namespace internal {

+static const std::vector<Compression::type> kSupportedCodec = {
+ Compression::LZ4_FRAME, Compression::ZSTD, Compression::CUSTOM};
+
Status CheckCompressionSupported(Compression::type codec);

} // namespace internal
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 7875cd3cd..74f4b2919 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -598,6 +598,8 @@ Status GetCompression(const flatbuf::RecordBatch* batch, Compression::type* out)
*out = Compression::LZ4_FRAME;
} else if (compression->codec() == flatbuf::CompressionType::ZSTD) {
*out = Compression::ZSTD;
+ } else if (compression->codec() == flatbuf::CompressionType::CUSTOM) {
+ *out = Compression::CUSTOM;
} else {
return Status::Invalid("Unsupported codec in RecordBatch::compression metadata");
}
diff --git a/cpp/src/arrow/util/compression.cc b/cpp/src/arrow/util/compression.cc
index c67cb4539..a0da548bf 100644
--- a/cpp/src/arrow/util/compression.cc
+++ b/cpp/src/arrow/util/compression.cc
@@ -98,6 +98,8 @@ Result<Compression::type> Codec::GetCompressionType(const std::string& name) {
return Compression::ZSTD;
} else if (name == "bz2") {
return Compression::BZ2;
+ } else if (name == "custom") {
+ return Compression::CUSTOM;
} else {
return Status::Invalid("Unrecognized compression type: ", name);
}
@@ -201,6 +203,12 @@ Result<std::unique_ptr<Codec>> Codec::Create(Compression::type codec_type,
codec = internal::MakeBZ2Codec(compression_level);
#endif
break;
+ case Compression::CUSTOM:
+ if (codec_factory == nullptr) {
+ return Status::Invalid("Custom codec is not registered.");
+ }
+ codec = codec_factory(compression_level);
+ break;
default:
break;
}
@@ -254,10 +262,16 @@ bool Codec::IsAvailable(Compression::type codec_type) {
#else
return false;
#endif
+ case Compression::CUSTOM:
+ return codec_factory != nullptr;
default:
return false;
}
}

+void RegisterCustomCodec(const CodecFactory& factory) {
+ std::call_once(custom_codec_registered, [&factory]() { codec_factory = factory; });
+}
+
} // namespace util
} // namespace arrow
diff --git a/cpp/src/arrow/util/compression.h b/cpp/src/arrow/util/compression.h
index 0832e82a6..b8767186b 100644
--- a/cpp/src/arrow/util/compression.h
+++ b/cpp/src/arrow/util/compression.h
@@ -18,8 +18,10 @@
#pragma once

#include <cstdint>
+#include <functional>
#include <limits>
#include <memory>
+#include <mutex>
#include <string>

#include "arrow/result.h"
@@ -198,5 +200,14 @@ class ARROW_EXPORT Codec {
virtual Status Init();
};

+typedef std::function<std::unique_ptr<Codec>(int compression_level)> CodecFactory;
+
+static CodecFactory codec_factory;
+static std::once_flag custom_codec_registered;
+
+/// Register a factory that is used to create user-defined codec.
+ARROW_EXPORT
+void RegisterCustomCodec(const CodecFactory& codec_factory);
+
} // namespace util
} // namespace arrow
diff --git a/cpp/src/arrow/util/type_fwd.h b/cpp/src/arrow/util/type_fwd.h
index ca107c2c6..d8f00c140 100644
--- a/cpp/src/arrow/util/type_fwd.h
+++ b/cpp/src/arrow/util/type_fwd.h
@@ -49,7 +49,8 @@ struct Compression {
LZ4_FRAME,
LZO,
BZ2,
- LZ4_HADOOP
+ LZ4_HADOOP,
+ CUSTOM
};
};

diff --git a/cpp/src/generated/Message_generated.h b/cpp/src/generated/Message_generated.h
index 1c51c6eaf..5308808c9 100644
--- a/cpp/src/generated/Message_generated.h
+++ b/cpp/src/generated/Message_generated.h
@@ -32,29 +32,32 @@ struct MessageBuilder;
enum class CompressionType : int8_t {
LZ4_FRAME = 0,
ZSTD = 1,
+ CUSTOM = 2,
MIN = LZ4_FRAME,
- MAX = ZSTD
+ MAX = CUSTOM
};

-inline const CompressionType (&EnumValuesCompressionType())[2] {
+inline const CompressionType (&EnumValuesCompressionType())[3] {
static const CompressionType values[] = {
CompressionType::LZ4_FRAME,
- CompressionType::ZSTD
+ CompressionType::ZSTD,
+ CompressionType::CUSTOM
};
return values;
}

inline const char * const *EnumNamesCompressionType() {
- static const char * const names[3] = {
+ static const char * const names[4] = {
"LZ4_FRAME",
"ZSTD",
+ "CUSTOM",
nullptr
};
return names;
}

inline const char *EnumNameCompressionType(CompressionType e) {
- if (flatbuffers::IsOutRange(e, CompressionType::LZ4_FRAME, CompressionType::ZSTD)) return "";
+ if (flatbuffers::IsOutRange(e, CompressionType::LZ4_FRAME, CompressionType::CUSTOM)) return "";
const size_t index = static_cast<size_t>(e);
return EnumNamesCompressionType()[index];
}
diff --git a/format/Message.fbs b/format/Message.fbs
index 170ea8fbc..98a7653d6 100644
--- a/format/Message.fbs
+++ b/format/Message.fbs
@@ -49,7 +49,10 @@ enum CompressionType:byte {
LZ4_FRAME,

// Zstandard
- ZSTD
+ ZSTD,
+
+ // Pluggable custom codec
+ CUSTOM
}

/// Provided for forward compatibility in case we need to support different
--
2.25.1

22 changes: 22 additions & 0 deletions third_party/arrow_patches/memorypool.patch
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
diff --git a/cpp/src/arrow/ipc/writer.cc b/cpp/src/arrow/ipc/writer.cc
index 4a7671e15..1c38e8f95 100644
--- a/cpp/src/arrow/ipc/writer.cc
+++ b/cpp/src/arrow/ipc/writer.cc
@@ -181,12 +181,16 @@ class RecordBatchSerializer {
std::shared_ptr<Buffer>* out) {
// Convert buffer to uncompressed-length-prefixed compressed buffer
int64_t maximum_length = codec->MaxCompressedLen(buffer.size(), buffer.data());
- ARROW_ASSIGN_OR_RAISE(auto result, AllocateBuffer(maximum_length + sizeof(int64_t)));
+ ARROW_ASSIGN_OR_RAISE(
+ auto result,
+ AllocateResizableBuffer(maximum_length + sizeof(int64_t), options_.memory_pool));

int64_t actual_length;
ARROW_ASSIGN_OR_RAISE(actual_length,
codec->Compress(buffer.size(), buffer.data(), maximum_length,
result->mutable_data() + sizeof(int64_t)));
+ RETURN_NOT_OK(
+ result->Resize(actual_length + sizeof(int64_t), /* shrink_to_fit= */ true));
*reinterpret_cast<int64_t*>(result->mutable_data()) =
bit_util::ToLittleEndian(buffer.size());
*out = SliceBuffer(std::move(result), /*offset=*/0, actual_length + sizeof(int64_t));

0 comments on commit c489e50

Please sign in to comment.