Skip to content

Commit

Permalink
Register custom row constructor function for Gluten. (oap-project#404)
Browse files Browse the repository at this point in the history
  • Loading branch information
lviiii authored Oct 11, 2022
1 parent a78d733 commit fd1a43b
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 4 deletions.
1 change: 1 addition & 0 deletions cpp/velox/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ set(VELOX_SRCS
compute/VeloxPlanConverter.cc
compute/ArrowTypeUtils.cc
compute/VeloxToRowConverter.cc
compute/RowConstructor.cc
compute/DwrfDatasource.cc
compute/bridge.cc
memory/velox_memory_pool.cc
Expand Down
39 changes: 39 additions & 0 deletions cpp/velox/compute/RegistrationAllFunctions.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RegistrationAllFunctions.h"
#include "RowConstructor.cc"
#include "velox/functions/sparksql/Register.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;

namespace velox::compute {

void registerCustomFunctions() {
exec::registerVectorFunction(
"row_constructor",
std::vector<std::shared_ptr<exec::FunctionSignature>>{},
std::make_unique<RowConstructor>());
}

void registerAllFunctions() {
functions::prestosql::registerAllScalarFunctions();
functions::sparksql::registerFunctions("");
registerCustomFunctions();
}

} // namespace velox::compute
22 changes: 22 additions & 0 deletions cpp/velox/compute/RegistrationAllFunctions.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

namespace velox::compute {

void registerAllFunctions();

} // namespace velox::compute
69 changes: 69 additions & 0 deletions cpp/velox/compute/RowConstructor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/expression/VectorFunction.h"
#include "velox/type/Type.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;

namespace velox::compute {

namespace {
class RowConstructor : public exec::VectorFunction {
void apply(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& outputType,
exec::EvalCtx& context,
VectorPtr& result) const override {
auto argsCopy = args;

BufferPtr nulls = AlignedBuffer::allocate<char>(
bits::nbytes(rows.size()), context.pool(), 1);
auto* nullsPtr = nulls->asMutable<uint64_t>();
auto cntNull = 0;
rows.applyToSelected([&](vector_size_t i) {
bits::clearNull(nullsPtr, i);
if (!bits::isBitNull(nullsPtr, i)) {
for (size_t c = 0; c < argsCopy.size(); c++) {
auto arg = argsCopy[c].get();
if (arg->mayHaveNulls() && arg->isNullAt(i)) {
bits::setNull(nullsPtr, i, true);
cntNull++;
break;
}
}
}
});

RowVectorPtr localResult = std::make_shared<RowVector>(
context.pool(),
outputType,
nulls,
rows.size(),
std::move(argsCopy),
cntNull /*nullCount*/);
context.moveOrCopyResult(localResult, rows, result);
}

bool isDefaultNullBehavior() const override {
return false;
}
};

} // namespace
} // namespace velox::compute
7 changes: 3 additions & 4 deletions cpp/velox/compute/VeloxPlanConverter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <string>

#include "ArrowTypeUtils.h"
#include "RegistrationAllFunctions.cc"
#include "arrow/c/Bridge.h"
#include "arrow/c/bridge.h"
#include "bridge.h"
Expand All @@ -33,7 +34,6 @@
#include "velox/functions/prestosql/aggregates/AverageAggregate.h"
#include "velox/functions/prestosql/aggregates/CountAggregate.h"
#include "velox/functions/prestosql/aggregates/MinMaxAggregates.h"
#include "velox/functions/sparksql/Register.h"

using namespace facebook::velox;
using namespace facebook::velox::exec;
Expand Down Expand Up @@ -95,9 +95,8 @@ void VeloxInitializer::Init() {
parquet::registerParquetReaderFactory(ParquetReaderType::NATIVE);
// parquet::registerParquetReaderFactory(ParquetReaderType::DUCKDB);
dwrf::registerDwrfReaderFactory();
// Register Velox functions.
functions::sparksql::registerFunctions("");
functions::prestosql::registerAllScalarFunctions();
// Register Velox functions
registerAllFunctions();
aggregate::registerSumAggregate<aggregate::SumAggregate>("sum");
aggregate::registerAverageAggregate("avg");
aggregate::registerCountAggregate("count");
Expand Down
2 changes: 2 additions & 0 deletions cpp/velox/jni/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <jni/jni_common.h>
#include "compute/DwrfDatasource.h"
#include "compute/RegistrationAllFunctions.h"
#include "compute/VeloxPlanConverter.h"
#include "jni/jni_errors.h"
#include "memory/velox_memory_pool.h"
Expand Down Expand Up @@ -127,6 +128,7 @@ Java_io_glutenproject_vectorized_ExpressionEvaluatorJniWrapper_nativeDoValidate(
std::unique_ptr<core::ExecCtx> execCtx_ =
std::make_unique<core::ExecCtx>(pool.get(), queryCtx_.get());

velox::compute::registerAllFunctions();
auto planValidator = std::make_shared<
facebook::velox::substrait::SubstraitToVeloxPlanValidator>(
pool.get(), execCtx_.get());
Expand Down

0 comments on commit fd1a43b

Please sign in to comment.