Skip to content

Commit

Permalink
YQ-3722 add simdjson parser into RD (ydb-platform#10204)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored and kardymonds committed Oct 16, 2024
1 parent e6f602e commit a0656e0
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 465 deletions.
51 changes: 28 additions & 23 deletions ydb/core/fq/libs/row_dispatcher/json_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase {
TVector<NYT::TNode> Schemas;
};

class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, TList<TString>>> {
class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>&>> {
public:
TFilterInputConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -91,28 +91,32 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer<std::pair<ui64, T
}
}

void OnObject(std::pair<ui64, TList<TString>> value) override {
void OnObject(std::pair<ui64, const TVector<TVector<std::string_view>>&> values) override {
Y_ENSURE(FieldsPositions.size() == values.second.size());

NKikimr::NMiniKQL::TThrowingBindTerminator bind;

with_lock (Worker->GetScopedAlloc()) {
auto& holderFactory = Worker->GetGraph().GetHolderFactory();
NYql::NUdf::TUnboxedValue* items = nullptr;

NYql::NUdf::TUnboxedValue result = Cache.NewArray(
holderFactory,
static_cast<ui32>(value.second.size() + 1),
items);

items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(value.first);
// TODO: use blocks here
for (size_t rowId = 0; rowId < values.second.front().size(); ++rowId) {
NYql::NUdf::TUnboxedValue* items = nullptr;

NYql::NUdf::TUnboxedValue result = Cache.NewArray(
holderFactory,
static_cast<ui32>(values.second.size() + 1),
items);

Y_ENSURE(FieldsPositions.size() == value.second.size());
items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first++);

size_t i = 0;
for (const auto& v : value.second) {
NYql::NUdf::TStringValue str(v);
items[FieldsPositions[i++]] = NYql::NUdf::TUnboxedValuePod(std::move(str));
size_t fieldId = 0;
for (const auto& column : values.second) {
NYql::NUdf::TStringValue str(column[rowId]);
items[FieldsPositions[fieldId++]] = NYql::NUdf::TUnboxedValuePod(std::move(str));
}

Worker->Push(std::move(result));
}
Worker->Push(std::move(result));
}
}

Expand Down Expand Up @@ -196,7 +200,7 @@ struct NYql::NPureCalc::TInputSpecTraits<TFilterInputSpec> {
static constexpr bool IsPartial = false;
static constexpr bool SupportPushStreamMode = true;

using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, TList<TString>>>>;
using TConsumerType = THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>&>>>;

static TConsumerType MakeConsumer(
const TFilterInputSpec& spec,
Expand Down Expand Up @@ -238,8 +242,9 @@ class TJsonFilter::TImpl {
LOG_ROW_DISPATCHER_DEBUG("Program created");
}

void Push(ui64 offset, const TList<TString>& value) {
InputConsumer->OnObject(std::make_pair(offset, value));
void Push(ui64 offset, const TVector<TVector<std::string_view>>& values) {
Y_ENSURE(values, "Expected non empty schema");
InputConsumer->OnObject(std::make_pair(offset, values));
}

TString GetSql() const {
Expand All @@ -266,7 +271,7 @@ class TJsonFilter::TImpl {

private:
THolder<NYql::NPureCalc::TPushStreamProgram<TFilterInputSpec, TFilterOutputSpec>> Program;
THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, TList<TString>>>> InputConsumer;
THolder<NYql::NPureCalc::IConsumer<std::pair<ui64, const TVector<TVector<std::string_view>>&>>> InputConsumer;
const TString Sql;
};

Expand All @@ -280,9 +285,9 @@ TJsonFilter::TJsonFilter(

TJsonFilter::~TJsonFilter() {
}
void TJsonFilter::Push(ui64 offset, const TList<TString>& value) {
Impl->Push(offset, value);

void TJsonFilter::Push(ui64 offset, const TVector<TVector<std::string_view>>& values) {
Impl->Push(offset, values);
}

TString TJsonFilter::GetSql() {
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/fq/libs/row_dispatcher/json_filter.h
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@

#pragma once

namespace NFq {

#include <ydb/library/yql/public/udf/udf_data_type.h>
#include <ydb/library/yql/public/udf/udf_value.h>

namespace NFq {

class TJsonFilter {
public:
using TCallback = std::function<void(ui64, const TString&)>;

public:
TJsonFilter(
const TVector<TString>& columns,
const TVector<TString>& columns,
const TVector<TString>& types,
const TString& whereFilter,
TCallback callback);

~TJsonFilter();
void Push(ui64 offset, const TList<TString>& value);

void Push(ui64 offset, const TVector<TVector<std::string_view>>& values);
TString GetSql();

private:
Expand Down
Loading

0 comments on commit a0656e0

Please sign in to comment.