Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove substrait #403

Open
wants to merge 2 commits into
base: wip_func_bp
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions velox/functions/sparksql/Register.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ void registerFunctions(const std::string& prefix) {
prefix + "instr", instrSignatures(), makeInstr);
exec::registerStatefulVectorFunction(
prefix + "length", lengthSignatures(), makeLength);
exec::registerStatefulVectorFunction(
prefix + "concat_ws", concatWsSignatures(), makeConcatWs);

registerFunction<Md5Function, Varchar, Varbinary>({prefix + "md5"});
registerFunction<Sha1HexStringFunction, Varchar, Varbinary>(
Expand Down
184 changes: 184 additions & 0 deletions velox/functions/sparksql/String.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,165 @@ class Length : public exec::VectorFunction {
}
};

class ConcatWsVariableParameters : public exec::VectorFunction {
public:
explicit ConcatWsVariableParameters(const std::string& connector)
: connector_{connector} {}

void apply(
const SelectivityVector& rows,
std::vector<VectorPtr>& args,
const TypePtr& /* outputType */,
exec::EvalCtx& context,
VectorPtr& result) const override {
std::vector<column_index_t> argMapping;
std::vector<std::string> constantStrings;
std::vector<StringView> constantStringViews;
auto numArgs = args.size();

// Save constant values to constantStrings_.
// Identify and combine consecutive constant inputs.
argMapping.reserve(numArgs - 1);
constantStrings.reserve(numArgs - 1);
for (auto i = 1; i < numArgs; ++i) {
argMapping.push_back(i);
if (args[i] && args[i]->as<ConstantVector<StringView>>() &&
!args[i]->as<ConstantVector<StringView>>()->isNullAt(0)) {
std::string value =
args[i]->as<ConstantVector<StringView>>()->valueAt(0).str();
column_index_t j = i + 1;
for (; j < args.size(); ++j) {
if (!args[j] || !args[j]->as<ConstantVector<StringView>>() ||
args[j]->as<ConstantVector<StringView>>()->isNullAt(0)) {
break;
}

value += connector_ +
args[j]->as<ConstantVector<StringView>>()->valueAt(0).str();
}
constantStrings.push_back(std::string(value.data(), value.size()));
i = j - 1;
} else {
constantStrings.push_back(std::string());
}
}

// Create StringViews for constant strings.
constantStringViews.reserve(numArgs - 1);
for (const auto& constantString : constantStrings) {
constantStringViews.push_back(
StringView(constantString.data(), constantString.size()));
}

auto flatResult = result->asFlatVector<StringView>();
auto numCols = argMapping.size();
std::vector<exec::LocalDecodedVector> decodedArgs;
decodedArgs.reserve(numCols);

for (auto i = 0; i < numCols; ++i) {
auto index = argMapping[i];
if (constantStringViews[i].empty()) {
decodedArgs.emplace_back(context, *args[index], rows);
} else {
// Do not decode constant inputs.
decodedArgs.emplace_back(context);
}
}

size_t totalResultBytes = 0;
rows.applyToSelected([&](auto row) {
auto isFirst = true;
for (int i = 0; i < numCols; i++) {
auto value = constantStringViews[i].empty()
? decodedArgs[i]->valueAt<StringView>(row)
: constantStringViews[i];
if (!value.empty()) {
if (isFirst) {
isFirst = false;
} else {
totalResultBytes += connector_.size();
}
totalResultBytes += value.size();
}
}
});

// Allocate a string buffer.
auto rawBuffer = flatResult->getRawStringBufferWithSpace(totalResultBytes);
size_t offset = 0;
rows.applyToSelected([&](int row) {
const char* start = rawBuffer + offset;
size_t combinedSize = 0;
auto isFirst = true;
for (int i = 0; i < numCols; i++) {
StringView value;
if (constantStringViews[i].empty()) {
value = decodedArgs[i]->valueAt<StringView>(row);
} else {
value = constantStringViews[i];
}
auto size = value.size();
if (size > 0) {
if (isFirst) {
isFirst = false;
} else {
memcpy(rawBuffer + offset, connector_.data(), connector_.size());
offset += connector_.size();
combinedSize += connector_.size();
}
memcpy(rawBuffer + offset, value.data(), size);
combinedSize += size;
offset += size;
}
}
flatResult->setNoCopy(row, StringView(start, combinedSize));
});
}

private:
const std::string connector_;
};

class ConcatWs : public exec::VectorFunction {
bool isDefaultNullBehavior() const override {
return false;
}

void apply(
const SelectivityVector& selected,
std::vector<VectorPtr>& args,
const TypePtr& /* outputType */,
exec::EvalCtx& context,
VectorPtr& result) const override {
auto numArgs = args.size();
VELOX_USER_CHECK(
numArgs >= 1,
"concat_ws requires one arguments at least, but got {}",
numArgs);

context.ensureWritable(selected, VARCHAR(), result);
if (args[0]->isNullAt(0)) {
selected.applyToSelected([&](int row) { result->setNull(row, true); });
return;
}

if (numArgs == 1) {
auto flatResult = result->asFlatVector<StringView>();
selected.applyToSelected(
[&](int row) { flatResult->setNoCopy(row, StringView("")); });
return;
}

auto connectorVector = args[0]->as<ConstantVector<StringView>>();
VELOX_USER_CHECK(
connectorVector, "concat_ws function supports only constant connector");
auto connector = connectorVector->valueAt(0).str();

ConcatWsVariableParameters concatWsVariableParameters(connector);
concatWsVariableParameters.apply(selected, args, nullptr, context, result);
}
};

} // namespace

std::vector<std::shared_ptr<exec::FunctionSignature>> instrSignatures() {
Expand Down Expand Up @@ -142,6 +301,31 @@ std::shared_ptr<exec::VectorFunction> makeLength(
return kLengthFunction;
}

std::vector<std::shared_ptr<exec::FunctionSignature>> concatWsSignatures() {
return {
// varchar, varchar,... -> varchar.
exec::FunctionSignatureBuilder()
.returnType("varchar")
.constantArgumentType("varchar")
.argumentType("varchar")
.variableArity()
.build(),
// varchar, array(varchar) -> varchar.
exec::FunctionSignatureBuilder()
.returnType("varchar")
.constantArgumentType("varchar")
.argumentType("array(varchar)")
.build(),
};
}

std::shared_ptr<exec::VectorFunction> makeConcatWs(
const std::string& name,
const std::vector<exec::VectorFunctionArg>& inputArgs) {
static const auto kConcatWsFunction = std::make_shared<ConcatWs>();
return kConcatWsFunction;
}

void encodeDigestToBase16(uint8_t* output, int digestSize) {
static unsigned char const kHexCodes[] = "0123456789abcdef";
for (int i = digestSize - 1; i >= 0; --i) {
Expand Down
6 changes: 6 additions & 0 deletions velox/functions/sparksql/String.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,12 @@ std::shared_ptr<exec::VectorFunction> makeLength(
const std::string& name,
const std::vector<exec::VectorFunctionArg>& inputArgs);

std::vector<std::shared_ptr<exec::FunctionSignature>> concatWsSignatures();

std::shared_ptr<exec::VectorFunction> makeConcatWs(
const std::string& name,
const std::vector<exec::VectorFunctionArg>& inputArgs);

/// Expands each char of the digest data to two chars,
/// representing the hex value of each digest char, in order.
/// Note: digestSize must be one-half of outputSize.
Expand Down
66 changes: 0 additions & 66 deletions velox/substrait/CMakeLists.txt

This file was deleted.

90 changes: 0 additions & 90 deletions velox/substrait/SubstraitExtensionCollector.cpp

This file was deleted.

Loading
Loading