Skip to content

Commit

Permalink
[VL] enable 11-22 upstreaming velox rebase (#3773)
Browse files Browse the repository at this point in the history
enable 11-22 Velox rebase w/ Meta repo
  • Loading branch information
JkSelf authored Nov 27, 2023
1 parent 5c436eb commit 1e65165
Show file tree
Hide file tree
Showing 12 changed files with 61 additions and 18 deletions.
26 changes: 20 additions & 6 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,20 @@ macro(ADD_VELOX_OBJECTS)
target_link_libraries(velox PUBLIC velox_objects)
endmacro()

macro(add_duckdb)
add_velox_dependency(duckdb::static "${VELOX_BUILD_PATH}/_deps/duckdb-build/src/libduckdb_static.a")
add_velox_dependency(duckdb::jemalloc "${VELOX_BUILD_PATH}/_deps/duckdb-build/extension/jemalloc/libjemalloc_extension.a")
add_velox_dependency(duckdb::fmt "${VELOX_BUILD_PATH}/_deps/duckdb-build/third_party/fmt/libduckdb_fmt.a")
add_velox_dependency(duckdb::query "${VELOX_BUILD_PATH}/_deps/duckdb-build/third_party/libpg_query/libduckdb_pg_query.a")
add_velox_dependency(duckdb::re2 "${VELOX_BUILD_PATH}/_deps/duckdb-build/third_party/re2/libduckdb_re2.a")
add_velox_dependency(duckdb::miniz "${VELOX_BUILD_PATH}/_deps/duckdb-build/third_party/miniz/libduckdb_miniz.a")
add_velox_dependency(duckdb::utf8 "${VELOX_BUILD_PATH}/_deps/duckdb-build/third_party/utf8proc/libduckdb_utf8proc.a")
add_velox_dependency(duckdb::hyperloglog "${VELOX_BUILD_PATH}/_deps/duckdb-build/third_party/hyperloglog/libduckdb_hyperloglog.a")
add_velox_dependency(duckdb::fastpforlib "${VELOX_BUILD_PATH}/_deps/duckdb-build/third_party/fastpforlib/libduckdb_fastpforlib.a")
add_velox_dependency(duckdb::mbedtls "${VELOX_BUILD_PATH}/_deps/duckdb-build/third_party/mbedtls/libduckdb_mbedtls.a")
add_velox_dependency(duckdb::fsst "${VELOX_BUILD_PATH}/_deps/duckdb-build/third_party/fsst/libduckdb_fsst.a")
endmacro()

macro(ADD_VELOX_DEPENDENCIES)
add_velox_objects()
add_velox_dependency(functions::sparksql::lib "${VELOX_COMPONENTS_PATH}/functions/sparksql/libvelox_functions_spark.a")
Expand All @@ -114,12 +128,11 @@ macro(ADD_VELOX_DEPENDENCIES)
endif()
add_velox_dependency(exec "${VELOX_COMPONENTS_PATH}/exec/libvelox_exec.a")
add_velox_dependency(common::test_util "${VELOX_COMPONENTS_PATH}/common/testutil/libvelox_test_util.a")
add_velox_dependency(parse::parser "${VELOX_COMPONENTS_PATH}/parse/libvelox_parse_parser.a")

if(BUILD_TESTS)
add_velox_dependency(parse::parser "${VELOX_COMPONENTS_PATH}/parse/libvelox_parse_parser.a")
add_velox_dependency(duckdb::parser "${VELOX_COMPONENTS_PATH}/duckdb/conversion/libvelox_duckdb_parser.a")
endif()
add_velox_dependency(parse::expression "${VELOX_COMPONENTS_PATH}/parse/libvelox_parse_expression.a")
if(BUILD_TESTS)
add_velox_dependency(parse::expression "${VELOX_COMPONENTS_PATH}/parse/libvelox_parse_expression.a")
add_velox_dependency(parse::utils "${VELOX_COMPONENTS_PATH}/parse/libvelox_parse_utils.a")
add_velox_dependency(function::registry "${VELOX_COMPONENTS_PATH}/functions/libvelox_function_registry.a")
endif()
Expand All @@ -146,7 +159,9 @@ macro(ADD_VELOX_DEPENDENCIES)
add_velox_dependency(dwio::common::utils "${VELOX_COMPONENTS_PATH}/dwio/common/tests/utils/libvelox_dwio_common_test_utils.a")
add_velox_dependency(dwio::dwrf::test_utils "${VELOX_COMPONENTS_PATH}/dwio/dwrf/test/utils/libvelox_dwrf_test_utils.a")
add_velox_dependency(parquet::reader::duckdb_conversion "${VELOX_COMPONENTS_PATH}/duckdb/conversion/libvelox_duckdb_conversion.a")
add_velox_dependency(duckdb::duckdb "${VELOX_COMPONENTS_PATH}/external/duckdb/libduckdb.a")

add_duckdb()

add_velox_dependency(tpch::gen "${VELOX_COMPONENTS_PATH}/tpch/gen/libvelox_tpch_gen.a")
add_velox_dependency(dbgen "${VELOX_COMPONENTS_PATH}/tpch/gen/dbgen/libvelox_dbgen.a")
endif()
Expand Down Expand Up @@ -176,7 +191,6 @@ macro(ADD_VELOX_DEPENDENCIES)
add_velox_dependency(common::base "${VELOX_COMPONENTS_PATH}/common/base/libvelox_common_base.a")
add_velox_dependency(common::memory "${VELOX_COMPONENTS_PATH}/common/memory/libvelox_memory.a")
add_velox_dependency(common::serialization "${VELOX_COMPONENTS_PATH}/common/serialization/libvelox_serialization.a")
add_velox_dependency(spill::config "${VELOX_COMPONENTS_PATH}/common/config/libvelox_spill_config.a")
add_velox_dependency(common::base::exception "${VELOX_COMPONENTS_PATH}/common/base/libvelox_exception.a")

add_velox_dependency(type::tz "${VELOX_COMPONENTS_PATH}/type/tz/libvelox_type_tz.a")
Expand Down
4 changes: 3 additions & 1 deletion cpp/velox/benchmarks/ColumnarToRowBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include "memory/VeloxMemoryManager.h"
#include "operators/serializer/VeloxColumnarToRowConverter.h"
#include "utils/TestUtils.h"
#include "utils/VeloxArrowUtils.h"
#include "utils/macros.h"
#include "velox/vector/arrow/Bridge.h"

Expand Down Expand Up @@ -95,7 +96,8 @@ class GoogleBenchmarkColumnarToRow {
ArrowArray arrowArray;
ArrowSchema arrowSchema;
ASSERT_NOT_OK(arrow::ExportRecordBatch(rb, &arrowArray, &arrowSchema));
return velox::importFromArrowAsOwner(arrowSchema, arrowArray, gluten::defaultLeafVeloxMemoryPool().get());
return velox::importFromArrowAsOwner(
arrowSchema, arrowArray, ArrowUtils::getBridgeOptions(), gluten::defaultLeafVeloxMemoryPool().get());
}

protected:
Expand Down
4 changes: 3 additions & 1 deletion cpp/velox/benchmarks/ParquetWriteBenchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
#include "memory/ColumnarBatch.h"
#include "memory/VeloxMemoryManager.h"
#include "utils/TestUtils.h"
#include "utils/VeloxArrowUtils.h"
#include "utils/macros.h"
#include "velox/dwio/parquet/writer/Writer.h"
#include "velox/vector/arrow/Bridge.h"
Expand Down Expand Up @@ -101,7 +102,8 @@ class GoogleBenchmarkParquetWrite {
ArrowArray arrowArray;
ArrowSchema arrowSchema;
ASSERT_NOT_OK(arrow::ExportRecordBatch(rb, &arrowArray, &arrowSchema));
auto vp = velox::importFromArrowAsOwner(arrowSchema, arrowArray, gluten::defaultLeafVeloxMemoryPool().get());
auto vp = velox::importFromArrowAsOwner(
arrowSchema, arrowArray, gluten::ArrowUtils::getBridgeOptions(), gluten::defaultLeafVeloxMemoryPool().get());
return std::make_shared<VeloxColumnarBatch>(std::dynamic_pointer_cast<velox::RowVector>(vp));
}

Expand Down
6 changes: 5 additions & 1 deletion cpp/velox/benchmarks/common/BenchmarkUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "memory/VeloxColumnarBatch.h"
#include "memory/VeloxMemoryManager.h"
#include "shuffle/Options.h"
#include "utils/VeloxArrowUtils.h"
#include "utils/exception.h"
#include "velox/common/memory/Memory.h"
#include "velox/dwio/common/tests/utils/DataFiles.h"
Expand Down Expand Up @@ -95,7 +96,10 @@ void abortIfFileNotExists(const std::string& filepath);
inline std::shared_ptr<gluten::ColumnarBatch> convertBatch(std::shared_ptr<gluten::ColumnarBatch> cb) {
if (cb->getType() != "velox") {
auto vp = facebook::velox::importFromArrowAsOwner(
*cb->exportArrowSchema(), *cb->exportArrowArray(), gluten::defaultLeafVeloxMemoryPool().get());
*cb->exportArrowSchema(),
*cb->exportArrowArray(),
gluten::ArrowUtils::getBridgeOptions(),
gluten::defaultLeafVeloxMemoryPool().get());
return std::make_shared<gluten::VeloxColumnarBatch>(std::dynamic_pointer_cast<facebook::velox::RowVector>(vp));
} else {
return cb;
Expand Down
5 changes: 5 additions & 0 deletions cpp/velox/compute/WholeStageResultIterator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,9 @@ std::unordered_map<std::string, std::string> WholeStageResultIterator::getQueryC
getConfigValue(confMap_, kBloomFilterNumBits, "8388608");
configs[velox::core::QueryConfig::kSparkBloomFilterMaxNumBits] =
getConfigValue(confMap_, kBloomFilterMaxNumBits, "4194304");

configs[velox::core::QueryConfig::kArrowBridgeTimestampUnit] = 2;

} catch (const std::invalid_argument& err) {
std::string errDetails = err.what();
throw std::runtime_error("Invalid conf arg: " + errDetails);
Expand Down Expand Up @@ -407,6 +410,8 @@ std::shared_ptr<velox::Config> WholeStageResultIterator::createConnectorConfig()
// The semantics of reading as lower case is opposite with case-sensitive.
configs[velox::connector::hive::HiveConfig::kFileColumnNamesReadAsLowerCase] =
getConfigValue(confMap_, kCaseSensitive, "false") == "false" ? "true" : "false";
configs[velox::connector::hive::HiveConfig::kArrowBridgeTimestampUnit] = 2;

return std::make_shared<velox::core::MemConfig>(configs);
}

Expand Down
8 changes: 5 additions & 3 deletions cpp/velox/memory/VeloxColumnarBatch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
#include "VeloxColumnarBatch.h"
#include "compute/VeloxRuntime.h"
#include "utils/VeloxArrowUtils.h"
#include "velox/row/UnsafeRowFast.h"
#include "velox/type/Type.h"
#include "velox/vector/FlatVector.h"
Expand Down Expand Up @@ -65,14 +66,14 @@ void VeloxColumnarBatch::ensureFlattened() {
std::shared_ptr<ArrowSchema> VeloxColumnarBatch::exportArrowSchema() {
auto out = std::make_shared<ArrowSchema>();
ensureFlattened();
velox::exportToArrow(flattened_, *out);
velox::exportToArrow(flattened_, ArrowUtils::getBridgeOptions(), *out);
return out;
}

std::shared_ptr<ArrowArray> VeloxColumnarBatch::exportArrowArray() {
auto out = std::make_shared<ArrowArray>();
ensureFlattened();
velox::exportToArrow(flattened_, *out, flattened_->pool());
velox::exportToArrow(flattened_, ArrowUtils::getBridgeOptions(), *out, flattened_->pool());
return out;
}

Expand Down Expand Up @@ -117,7 +118,8 @@ std::shared_ptr<VeloxColumnarBatch> VeloxColumnarBatch::from(
auto compositeVeloxVector = makeRowVector(childNames, childVectors, cb->numRows(), pool);
return std::make_shared<VeloxColumnarBatch>(compositeVeloxVector);
}
auto vp = velox::importFromArrowAsOwner(*cb->exportArrowSchema(), *cb->exportArrowArray(), pool);
auto vp = velox::importFromArrowAsOwner(
*cb->exportArrowSchema(), *cb->exportArrowArray(), ArrowUtils::getBridgeOptions(), pool);
return std::make_shared<VeloxColumnarBatch>(std::dynamic_pointer_cast<velox::RowVector>(vp));
}

Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ function(add_velox_test TEST_EXEC)
message(FATAL_ERROR "No sources specified for test ${TEST_NAME}")
endif()
add_executable(${TEST_EXEC} ${SOURCES})
target_include_directories(${TEST_EXEC} PRIVATE ${CMAKE_SOURCE_DIR}/velox ${CMAKE_SOURCE_DIR}/src)
message(STATUS "!!!!!${VELOX_BUILD_PATH}/_deps/duckdb-src/include/")
target_include_directories(${TEST_EXEC} PRIVATE ${CMAKE_SOURCE_DIR}/velox ${CMAKE_SOURCE_DIR}/src ${VELOX_BUILD_PATH}/_deps/duckdb-src/src/include)
target_link_libraries(${TEST_EXEC} velox GTest::gtest GTest::gtest_main google::glog benchmark::benchmark simdjson)
gtest_discover_tests(${TEST_EXEC} DISCOVERY_MODE PRE_TEST)
endfunction()
Expand Down
3 changes: 2 additions & 1 deletion cpp/velox/tests/VeloxColumnarBatchSerializerTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "memory/VeloxColumnarBatch.h"
#include "memory/VeloxMemoryManager.h"
#include "operators/serializer/VeloxColumnarBatchSerializer.h"
#include "utils/VeloxArrowUtils.h"
#include "velox/vector/arrow/Bridge.h"
#include "velox/vector/tests/utils/VectorTestBase.h"

Expand Down Expand Up @@ -52,7 +53,7 @@ TEST_F(VeloxColumnarBatchSerializerTest, serialize) {
auto buffer = serializer->serializeColumnarBatches({batch});

ArrowSchema cSchema;
exportToArrow(vector, cSchema);
exportToArrow(vector, ArrowUtils::getBridgeOptions(), cSchema);
auto deserializer = std::make_shared<VeloxColumnarBatchSerializer>(arrowPool_.get(), veloxPool_, &cSchema);
auto deserialized = deserializer->deserialize(const_cast<uint8_t*>(buffer->data()), buffer->size());
auto deserializedVector = std::dynamic_pointer_cast<VeloxColumnarBatch>(deserialized)->getRowVector();
Expand Down
2 changes: 1 addition & 1 deletion cpp/velox/tests/VeloxSubstraitRoundTripTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ TEST_F(VeloxSubstraitRoundTripTest, project) {
auto vectors = makeVectors(3, 4, 2);
createDuckDbTable(vectors);
auto plan = PlanBuilder().values(vectors).project({"c0 + c1", "c1 / c2"}).planNode();
assertPlanConversion(plan, "SELECT c0 + c1, c1 / c2 FROM tmp");
assertPlanConversion(plan, "SELECT c0 + c1, c1 // c2 FROM tmp");
}

TEST_F(VeloxSubstraitRoundTripTest, cast) {
Expand Down
5 changes: 3 additions & 2 deletions cpp/velox/utils/VeloxArrowUtils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ namespace gluten {
using namespace facebook;

void toArrowSchema(const velox::TypePtr& rowType, facebook::velox::memory::MemoryPool* pool, struct ArrowSchema* out) {
exportToArrow(velox::BaseVector::create(rowType, 0, pool), *out);
exportToArrow(velox::BaseVector::create(rowType, 0, pool), ArrowUtils::getBridgeOptions(), *out);
}

std::shared_ptr<arrow::Schema> toArrowSchema(const velox::TypePtr& rowType, facebook::velox::memory::MemoryPool* pool) {
Expand All @@ -50,7 +50,8 @@ arrow::Result<std::shared_ptr<ColumnarBatch>> recordBatch2VeloxColumnarBatch(con
ArrowArray arrowArray;
ArrowSchema arrowSchema;
RETURN_NOT_OK(arrow::ExportRecordBatch(rb, &arrowArray, &arrowSchema));
auto vp = velox::importFromArrowAsOwner(arrowSchema, arrowArray, gluten::defaultLeafVeloxMemoryPool().get());
auto vp = velox::importFromArrowAsOwner(
arrowSchema, arrowArray, ArrowUtils::getBridgeOptions(), gluten::defaultLeafVeloxMemoryPool().get());
return std::make_shared<VeloxColumnarBatch>(std::dynamic_pointer_cast<velox::RowVector>(vp));
}

Expand Down
10 changes: 10 additions & 0 deletions cpp/velox/utils/VeloxArrowUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,19 @@
#include "velox/buffer/Buffer.h"
#include "velox/common/memory/MemoryPool.h"
#include "velox/type/Type.h"
#include "velox/vector/arrow/Bridge.h"

namespace gluten {

class ArrowUtils {
public:
static facebook::velox::BridgeOptions getBridgeOptions() {
facebook::velox::BridgeOptions options;
options.timestampUnit = static_cast<facebook::velox::TimestampUnit>(2);
return options;
}
};

void toArrowSchema(
const facebook::velox::TypePtr& rowType,
facebook::velox::memory::MemoryPool* pool,
Expand Down
3 changes: 2 additions & 1 deletion ep/build-velox/src/build_velox.sh
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ function compile {
fi
fi

COMPILE_OPTION="-DVELOX_ENABLE_PARQUET=ON -DVELOX_BUILD_TESTING=OFF -DVELOX_BUILD_TEST_UTILS=OFF -DVELOX_ENABLE_DUCKDB=OFF"
COMPILE_OPTION="-DVELOX_ENABLE_PARQUET=ON -DVELOX_BUILD_TESTING=OFF -DVELOX_BUILD_TEST_UTILS=OFF -DVELOX_ENABLE_DUCKDB=OFF -DVELOX_ENABLE_PARSE=OFF"
if [ $ENABLE_BENCHMARK == "ON" ]; then
COMPILE_OPTION="$COMPILE_OPTION -DVELOX_BUILD_BENCHMARKS=ON"
fi
Expand All @@ -125,6 +125,7 @@ function compile {
echo "COMPILE_OPTION: "$COMPILE_OPTION

export simdjson_SOURCE=BUNDLED
export duckdb_SOURCE=BUNDLED
if [ $ARCH == 'x86_64' ]; then
make $COMPILE_TYPE EXTRA_CMAKE_FLAGS="${COMPILE_OPTION}"
elif [[ "$ARCH" == 'arm64' || "$ARCH" == 'aarch64' ]]; then
Expand Down

0 comments on commit 1e65165

Please sign in to comment.