diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp index 8b7d46a690f2..b432494d6623 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.cpp @@ -53,7 +53,7 @@ class TFilterInputSpec : public NYql::NPureCalc::TInputSpecBase { TVector Schemas; }; -class TFilterInputConsumer : public NYql::NPureCalc::IConsumer>> { +class TFilterInputConsumer : public NYql::NPureCalc::IConsumer>&>> { public: TFilterInputConsumer( const TFilterInputSpec& spec, @@ -91,28 +91,32 @@ class TFilterInputConsumer : public NYql::NPureCalc::IConsumer> value) override { + void OnObject(std::pair>&> values) override { + Y_ENSURE(FieldsPositions.size() == values.second.size()); + NKikimr::NMiniKQL::TThrowingBindTerminator bind; - with_lock (Worker->GetScopedAlloc()) { auto& holderFactory = Worker->GetGraph().GetHolderFactory(); - NYql::NUdf::TUnboxedValue* items = nullptr; - NYql::NUdf::TUnboxedValue result = Cache.NewArray( - holderFactory, - static_cast(value.second.size() + 1), - items); - - items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(value.first); + // TODO: use blocks here + for (size_t rowId = 0; rowId < values.second.front().size(); ++rowId) { + NYql::NUdf::TUnboxedValue* items = nullptr; + + NYql::NUdf::TUnboxedValue result = Cache.NewArray( + holderFactory, + static_cast(values.second.size() + 1), + items); - Y_ENSURE(FieldsPositions.size() == value.second.size()); + items[OffsetPosition] = NYql::NUdf::TUnboxedValuePod(values.first++); - size_t i = 0; - for (const auto& v : value.second) { - NYql::NUdf::TStringValue str(v); - items[FieldsPositions[i++]] = NYql::NUdf::TUnboxedValuePod(std::move(str)); + size_t fieldId = 0; + for (const auto& column : values.second) { + NYql::NUdf::TStringValue str(column[rowId]); + items[FieldsPositions[fieldId++]] = NYql::NUdf::TUnboxedValuePod(std::move(str)); + } + + Worker->Push(std::move(result)); } - Worker->Push(std::move(result)); } } @@ -196,7 +200,7 @@ struct NYql::NPureCalc::TInputSpecTraits { static constexpr bool IsPartial = false; static constexpr bool SupportPushStreamMode = true; - using TConsumerType = THolder>>>; + using TConsumerType = THolder>&>>>; static TConsumerType MakeConsumer( const TFilterInputSpec& spec, @@ -238,8 +242,9 @@ class TJsonFilter::TImpl { LOG_ROW_DISPATCHER_DEBUG("Program created"); } - void Push(ui64 offset, const TList& value) { - InputConsumer->OnObject(std::make_pair(offset, value)); + void Push(ui64 offset, const TVector>& values) { + Y_ENSURE(values, "Expected non empty schema"); + InputConsumer->OnObject(std::make_pair(offset, values)); } TString GetSql() const { @@ -266,7 +271,7 @@ class TJsonFilter::TImpl { private: THolder> Program; - THolder>>> InputConsumer; + THolder>&>>> InputConsumer; const TString Sql; }; @@ -280,9 +285,9 @@ TJsonFilter::TJsonFilter( TJsonFilter::~TJsonFilter() { } - -void TJsonFilter::Push(ui64 offset, const TList& value) { - Impl->Push(offset, value); + +void TJsonFilter::Push(ui64 offset, const TVector>& values) { + Impl->Push(offset, values); } TString TJsonFilter::GetSql() { diff --git a/ydb/core/fq/libs/row_dispatcher/json_filter.h b/ydb/core/fq/libs/row_dispatcher/json_filter.h index f1694a277fbb..f9e26afc1dfa 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_filter.h +++ b/ydb/core/fq/libs/row_dispatcher/json_filter.h @@ -1,23 +1,24 @@ - #pragma once -namespace NFq { - #include #include +namespace NFq { + class TJsonFilter { public: using TCallback = std::function; - + public: TJsonFilter( - const TVector& columns, + const TVector& columns, const TVector& types, const TString& whereFilter, TCallback callback); + ~TJsonFilter(); - void Push(ui64 offset, const TList& value); + + void Push(ui64 offset, const TVector>& values); TString GetSql(); private: diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp index 84ca3018b509..f0e9ab7122d6 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.cpp +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.cpp @@ -1,337 +1,184 @@ -#include +#include "json_parser.h" -#include -#include -#include -#include #include +#include + +#include namespace { -using TCallback = NFq::TJsonParser::TCallback; -using TInputConsumerArg = std::pair; -const char* OffsetFieldName = "_offset"; TString LogPrefix = "JsonParser: "; -void AddField(NYT::TNode& node, const TString& fieldName, const TString& fieldType) { - node.Add( - NYT::TNode::CreateList() - .Add(fieldName) - .Add(NYT::TNode::CreateList().Add("DataType").Add(fieldType)) - ); -} - -NYT::TNode MakeInputSchema() { - auto structMembers = NYT::TNode::CreateList(); - AddField(structMembers, OffsetFieldName, "Uint64"); - AddField(structMembers, "data", "String"); - return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers)); -} - -NYT::TNode MakeOutputSchema(const TVector& columns) { - auto structMembers = NYT::TNode::CreateList(); - AddField(structMembers, OffsetFieldName, "Uint64"); - for (const auto& col : columns) { - AddField(structMembers, col, "String"); - } - return NYT::TNode::CreateList().Add("StructType").Add(std::move(structMembers)); -} - -class TParserInputConsumer : public NYql::NPureCalc::IConsumer { -public: - explicit TParserInputConsumer(NYql::NPureCalc::TWorkerHolder worker) - : Worker(std::move(worker)) { - } - - ~TParserInputConsumer() override { - with_lock(Worker->GetScopedAlloc()) { - Cache.Clear(); - } - } - - void OnObject(std::pair value) override { - NKikimr::NMiniKQL::TThrowingBindTerminator bind; - - with_lock (Worker->GetScopedAlloc()) { - auto& holderFactory = Worker->GetGraph().GetHolderFactory(); - NYql::NUdf::TUnboxedValue* items = nullptr; - - NYql::NUdf::TUnboxedValue result = Cache.NewArray( - holderFactory, - static_cast(2), - items); - - items[0] = NYql::NUdf::TUnboxedValuePod(value.first); - NYql::NUdf::TStringValue str(value.second.Size()); - std::memcpy(str.Data(), value.second.Data(), value.second.Size()); - items[1] = NYql::NUdf::TUnboxedValuePod(std::move(str)); - Worker->Push(std::move(result)); - } - } +} // anonymous namespace - void OnFinish() override { - NKikimr::NMiniKQL::TBindTerminator bind(Worker->GetGraph().GetTerminator()); - with_lock(Worker->GetScopedAlloc()) { - Worker->OnFinish(); - } - } +namespace NFq { -private: - NYql::NPureCalc::TWorkerHolder Worker; - NKikimr::NMiniKQL::TPlainContainerCache Cache; -}; +//// TParserBuffer +TJsonParserBuffer::TJsonParserBuffer() + : NumberValues(0) + , Finished(false) +{} -class TParserInputSpec : public NYql::NPureCalc::TInputSpecBase { -public: - TParserInputSpec() { - Schemas = {MakeInputSchema()}; - } - - const TVector& GetSchemas() const override { - return Schemas; - } +void TJsonParserBuffer::Reserve(size_t size) { + Y_ENSURE(!Finished, "Cannot reserve finished buffer"); + Values.reserve(2 * (size + simdjson::SIMDJSON_PADDING)); +} -private: - TVector Schemas; -}; +void TJsonParserBuffer::AddValue(const TString& value) { + Y_ENSURE(!Finished, "Cannot add value into finished buffer"); + NumberValues++; + Values << value; +} +std::string_view TJsonParserBuffer::AddHolder(std::string_view value) { + Y_ENSURE(Values.size() + value.size() <= Values.capacity(), "Requested too large holders"); + const size_t startPos = Values.size(); + Values << value; + return std::string_view(Values).substr(startPos, value.length()); +} -class TParserOutputConsumer: public NYql::NPureCalc::IConsumer>> { -public: - TParserOutputConsumer(TCallback callback) - : Callback(callback) { - } +std::pair TJsonParserBuffer::Finish() { + Y_ENSURE(!Finished, "Cannot finish buffer twice"); + Finished = true; + Values << TString(simdjson::SIMDJSON_PADDING, ' '); + Values.reserve(2 * Values.size()); + return {Values.data(), Values.size()}; +} - void OnObject(std::pair> value) override { - Callback(value.first, std::move(value.second)); - } +void TJsonParserBuffer::Clear() { + Y_ENSURE(Finished, "Cannot clear not finished buffer"); + NumberValues = 0; + Finished = false; + Values.clear(); +} - void OnFinish() override { - Y_UNREACHABLE(); - } -private: - TCallback Callback; -}; +//// TJsonParser -class TParserOutputSpec: public NYql::NPureCalc::TOutputSpecBase { +class TJsonParser::TImpl { public: - explicit TParserOutputSpec(const NYT::TNode& schema) - : Schema(schema) - {} + TImpl(const TVector& columns, const TVector& types) + : ParsedValues(columns.size()) + { + Y_UNUSED(types); // TODO: Will be used for UV creation + + Columns.reserve(columns.size()); + for (const auto& column : columns) { + Columns.emplace_back(column); + } -public: - const NYT::TNode& GetSchema() const override { - return Schema; + ColumnsIndex.reserve(columns.size()); + for (size_t i = 0; i < Columns.size(); i++) { + ColumnsIndex.emplace(std::string_view(Columns[i]), i); + } } -private: - NYT::TNode Schema; -}; - -struct TFieldsMapping{ - TVector FieldsPositions; - size_t OffsetPosition; - - TFieldsMapping(const NYT::TNode& schema, const NKikimr::NMiniKQL::TType* outputType) { - THashMap outputPositions; - Y_ENSURE(outputType->IsStruct()); - const auto structType = static_cast(outputType); - const auto count = structType->GetMembersCount(); + const TVector>& Parse() { + const auto [values, size] = Buffer.Finish(); + LOG_ROW_DISPATCHER_TRACE("Parse values:\n" << values); - for (ui32 i = 1; i < count; ++i) { // 0 index - OffsetFieldName - const auto name = structType->GetMemberName(i); - outputPositions[name] = i; + for (auto& parsedColumn : ParsedValues) { + parsedColumn.clear(); + parsedColumn.reserve(Buffer.GetNumberValues()); } - const auto& fields = schema[1]; - Y_ENSURE(fields.IsList()); - Y_ENSURE(count == fields.Size()); - for (size_t i = 0; i < fields.Size(); ++i) { - auto name = fields[i][0].AsString(); - if (name == OffsetFieldName) { - OffsetPosition = i; - continue; + simdjson::ondemand::parser parser; + parser.threaded = false; + + simdjson::ondemand::document_stream documents = parser.iterate_many(values, size, simdjson::dom::DEFAULT_BATCH_SIZE); + for (auto document : documents) { + for (auto item : document.get_object()) { + const auto it = ColumnsIndex.find(item.escaped_key().value()); + if (it == ColumnsIndex.end()) { + continue; + } + + auto& parsedColumn = ParsedValues[it->second]; + if (item.value().is_string()) { + parsedColumn.emplace_back(CreateHolderIfNeeded( + values, size, item.value().get_string().value() + )); + } else { + parsedColumn.emplace_back(CreateHolderIfNeeded( + values, size, item.value().raw_json_token().value() + )); + } } - FieldsPositions.push_back(outputPositions[name]); } + return ParsedValues; } -}; - -class TParserPushRelayImpl: public NYql::NPureCalc::IConsumer { -public: - TParserPushRelayImpl(const TParserOutputSpec& outputSpec, NYql::NPureCalc::IPushStreamWorker* worker, THolder>>> underlying) - : Underlying(std::move(underlying)) - , Worker(worker) - , FieldsMapping(outputSpec.GetSchema(), Worker->GetOutputType()) - { } -public: - void OnObject(const NYql::NUdf::TUnboxedValue* value) override { - auto unguard = Unguard(Worker->GetScopedAlloc()); - TList result; - - Y_ENSURE(value->GetListLength() == FieldsMapping.FieldsPositions.size() + 1); - ui64 offset = value->GetElement(FieldsMapping.OffsetPosition).Get(); - - for (auto pos : FieldsMapping.FieldsPositions) { - const auto& cell = value->GetElement(pos); - - NYql::NUdf::TStringRef strRef(cell.AsStringRef()); - result.emplace_back(strRef.Data(), strRef.Size()); + TJsonParserBuffer& GetBuffer() { + if (Buffer.GetFinished()) { + Buffer.Clear(); } - - Underlying->OnObject(std::make_pair(offset, std::move(result))); - } - - void OnFinish() override { - auto unguard = Unguard(Worker->GetScopedAlloc()); - Underlying->OnFinish(); - } - -private: - THolder>>> Underlying; - NYql::NPureCalc::IWorker* Worker; - TFieldsMapping FieldsMapping; -}; - -} - -template <> -struct NYql::NPureCalc::TInputSpecTraits { - static constexpr bool IsPartial = false; - static constexpr bool SupportPushStreamMode = true; - - using TConsumerType = THolder>; - - static TConsumerType MakeConsumer( - const TParserInputSpec& spec, - NYql::NPureCalc::TWorkerHolder worker - ) { - Y_UNUSED(spec); - return MakeHolder(std::move(worker)); + return Buffer; } -}; - -template <> -struct NYql::NPureCalc::TOutputSpecTraits { - static const constexpr bool IsPartial = false; - static const constexpr bool SupportPushStreamMode = true; - static void SetConsumerToWorker(const TParserOutputSpec& outputSpec, NYql::NPureCalc::IPushStreamWorker* worker, THolder>>> consumer) { - worker->SetConsumer(MakeHolder(outputSpec, worker, std::move(consumer))); - } -}; - -namespace NFq { - -class TJsonParser::TImpl { -public: - TImpl( - const TVector& columns, - const TVector& types, - TCallback callback) - : Sql(GenerateSql(columns, types)) { - auto options = NYql::NPureCalc::TProgramFactoryOptions(); - auto factory = NYql::NPureCalc::MakeProgramFactory(options); - - LOG_ROW_DISPATCHER_DEBUG("Creating program..."); - Program = factory->MakePushStreamProgram( - TParserInputSpec(), - TParserOutputSpec(MakeOutputSchema(columns)), - Sql, - NYql::NPureCalc::ETranslationMode::SExpr - ); - LOG_ROW_DISPATCHER_DEBUG("Program created"); - InputConsumer = Program->Apply(MakeHolder(callback)); - LOG_ROW_DISPATCHER_DEBUG("InputConsumer created"); - } - - void Push( ui64 offset, const TString& value) { - LOG_ROW_DISPATCHER_TRACE("Push " << value); - InputConsumer->OnObject(std::make_pair(offset, value)); - } - - TString GetSql() const { - return Sql; + TString GetDescription() const { + TStringBuilder description = TStringBuilder() << "Columns: "; + for (const auto& column : Columns) { + description << "'" << column << "' "; + } + description << "\nBuffer size: " << Buffer.GetNumberValues() << ", finished: " << Buffer.GetFinished(); + return description; } -private: - TString GenerateSql(const TVector& columnNames, const TVector& columnTypes) { - Y_ABORT_UNLESS(columnNames.size() == columnTypes.size(), "Unexpected column types size"); - - TStringStream udfOutputType; - TStringStream resultType; - for (size_t i = 0; i < columnNames.size(); ++i) { - const TString& lastSymbol = i + 1 == columnNames.size() ? "" : " "; - const TString& column = columnNames[i]; - const TString& type = SkipOptional(columnTypes[i]); - - udfOutputType << "'('" << column << " (DataType '" << type << "))" << lastSymbol; - resultType << "'('" << column << " (SafeCast (Member $parsed '" << column << ") $string_type))" << lastSymbol; + TString GetDebugString(const TVector>& parsedValues) const { + TStringBuilder result; + for (size_t i = 0; i < Columns.size(); ++i) { + result << "Parsed column '" << Columns[i] << "': "; + for (const auto& value : parsedValues[i]) { + result << "'" << value << "' "; + } + result << "\n"; } - - TStringStream str; - str << R"( - ( - (let $string_type (DataType 'String)) - - (let $input_type (TupleType $string_type (DataType 'Uint64))) - (let $output_type (TupleType (StructType )" << udfOutputType.Str() << R"() (DataType 'Uint64))) - (let $udf_argument_type (TupleType $input_type (StructType) $output_type)) - (let $udf_callable_type (CallableType '('1) '((StreamType $output_type)) '((StreamType $input_type)) '((OptionalType (DataType 'Utf8))))) - (let $udf (Udf 'ClickHouseClient.ParseFormat (Void) $udf_argument_type 'json_each_row $udf_callable_type (VoidType) '"" '())) - - (return (Map (Apply $udf (Map (Self '0) (lambda '($input) (block '( - (return '((Member $input 'data) (Member $input ')" << OffsetFieldName << R"())) - ))))) (lambda '($output) (block '( - (let $parsed (Nth $output '0)) - (return (AsStruct '(')" << OffsetFieldName << R"( (Nth $output '1)) )" << resultType.Str() << R"()) - ))))) - ) - )"; - LOG_ROW_DISPATCHER_DEBUG("GenerateSql " << str.Str()); - return str.Str(); + return result; } - static TString SkipOptional(TStringBuf type) { - if (type.StartsWith("Optional")) { - Y_ABORT_UNLESS(type.SkipPrefix("Optional<")); - Y_ABORT_UNLESS(type.ChopSuffix(">")); +private: + std::string_view CreateHolderIfNeeded(const char* dataHolder, size_t size, std::string_view value) { + ptrdiff_t diff = value.data() - dataHolder; + if (0 <= diff && static_cast(diff) < size) { + return value; } - return TString(type); + return Buffer.AddHolder(value); } private: - THolder> Program; - THolder> InputConsumer; - const TString Sql; + TVector Columns; + absl::flat_hash_map ColumnsIndex; + + TJsonParserBuffer Buffer; + TVector> ParsedValues; }; -TJsonParser::TJsonParser( - const TVector& columns, - const TVector& types, - TCallback callback) - : Impl(std::make_unique(columns, types, callback)) { -} +TJsonParser::TJsonParser(const TVector& columns, const TVector& types) + : Impl(std::make_unique(columns, types)) +{} TJsonParser::~TJsonParser() { } - -void TJsonParser::Push(ui64 offset, const TString& value) { - Impl->Push(offset, value); + +TJsonParserBuffer& TJsonParser::GetBuffer() { + return Impl->GetBuffer(); +} + +const TVector>& TJsonParser::Parse() { + return Impl->Parse(); +} + +TString TJsonParser::GetDescription() const { + return Impl->GetDescription(); } -TString TJsonParser::GetSql() { - return Impl->GetSql(); +TString TJsonParser::GetDebugString(const TVector>& parsedValues) const { + return Impl->GetDebugString(parsedValues); } -std::unique_ptr NewJsonParser( - const TVector& columns, - const TVector& types, - TCallback callback) { - return std::unique_ptr(new TJsonParser(columns, types, callback)); +std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types) { + return std::unique_ptr(new TJsonParser(columns, types)); } } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/json_parser.h b/ydb/core/fq/libs/row_dispatcher/json_parser.h index cb5137105e6b..fcdf707236b1 100644 --- a/ydb/core/fq/libs/row_dispatcher/json_parser.h +++ b/ydb/core/fq/libs/row_dispatcher/json_parser.h @@ -1,32 +1,46 @@ #pragma once -#include +#include -#include +#include namespace NFq { -class TJsonParser { +class TJsonParserBuffer { public: - using TCallback = std::function&&)>; - + TJsonParserBuffer(); + + void Reserve(size_t size); + + void AddValue(const TString& value); + std::string_view AddHolder(std::string_view value); + + std::pair Finish(); + void Clear(); + +private: + YDB_READONLY_DEF(size_t, NumberValues); + YDB_READONLY_DEF(bool, Finished) + + TStringBuilder Values; +}; + +class TJsonParser { public: - TJsonParser( - const TVector& columns, - const TVector& types, - TCallback callback); + TJsonParser(const TVector& columns, const TVector& types); ~TJsonParser(); - void Push(ui64 offset, const TString& value); - TString GetSql(); + + TJsonParserBuffer& GetBuffer(); + const TVector>& Parse(); + + TString GetDescription() const; + TString GetDebugString(const TVector>& parsedValues) const; private: class TImpl; const std::unique_ptr Impl; }; -std::unique_ptr NewJsonParser( - const TVector& columns, - const TVector& types, - TJsonParser::TCallback callback); +std::unique_ptr NewJsonParser(const TVector& columns, const TVector& types); } // namespace NFq diff --git a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp index a1dc73bfef63..d1ae9fd1f73c 100644 --- a/ydb/core/fq/libs/row_dispatcher/topic_session.cpp +++ b/ydb/core/fq/libs/row_dispatcher/topic_session.cpp @@ -50,7 +50,6 @@ struct TEvPrivate { EvPqEventsReady = EvBegin + 10, EvCreateSession, EvStatus, - EvDataParsed, EvDataAfterFilteration, EvDataFiltered, EvPrintState, @@ -63,20 +62,15 @@ struct TEvPrivate { struct TEvCreateSession : public NActors::TEventLocal {}; struct TEvPrintState : public NActors::TEventLocal {}; struct TEvStatus : public NActors::TEventLocal {}; - struct TEvDataParsed : public NActors::TEventLocal { - TEvDataParsed(ui64 offset, TList&& value) - : Offset(offset) - , Value(std::move(value)) - {} - ui64 Offset = 0; - TList Value; - }; struct TEvDataFiltered : public NActors::TEventLocal { - TEvDataFiltered(ui64 offset) + TEvDataFiltered(ui64 offset, ui64 numberValues) : Offset(offset) + , NumberValues(numberValues) {} - ui64 Offset = 0; + + const ui64 Offset; + const ui64 NumberValues; }; struct TEvDataAfterFilteration : public NActors::TEventLocal { @@ -179,7 +173,8 @@ class TTopicSession : public TActorBootstrapped { void CreateTopicSession(); void CloseTopicSession(); void SubscribeOnNextEvent(); - void SendToParsing(ui64 offset, const TString& message); + void SendToParsing(const TVector& messages); + void SendToFiltering(ui64 offset, const TVector>& parsedValues); void SendData(ClientsInfo& info); void InitParser(const NYql::NPq::NProto::TDqPqTopicSource& sourceParams); void FatalError(const TString& message, const std::unique_ptr* filter = nullptr); @@ -194,7 +189,6 @@ class TTopicSession : public TActorBootstrapped { void Handle(NFq::TEvPrivate::TEvPqEventsReady::TPtr&); void Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&); - void Handle(NFq::TEvPrivate::TEvDataParsed::TPtr&); void Handle(NFq::TEvPrivate::TEvDataAfterFilteration::TPtr&); void Handle(NFq::TEvPrivate::TEvStatus::TPtr&); void Handle(NFq::TEvPrivate::TEvDataFiltered::TPtr&); @@ -212,7 +206,6 @@ class TTopicSession : public TActorBootstrapped { STRICT_STFUNC_EXC(StateFunc, hFunc(NFq::TEvPrivate::TEvPqEventsReady, Handle); hFunc(NFq::TEvPrivate::TEvCreateSession, Handle); - hFunc(NFq::TEvPrivate::TEvDataParsed, Handle); hFunc(NFq::TEvPrivate::TEvDataAfterFilteration, Handle); hFunc(NFq::TEvPrivate::TEvStatus, Handle); hFunc(NFq::TEvPrivate::TEvDataFiltered, Handle); @@ -228,7 +221,6 @@ class TTopicSession : public TActorBootstrapped { cFunc(NActors::TEvents::TEvPoisonPill::EventType, PassAway); IgnoreFunc(NFq::TEvPrivate::TEvPqEventsReady); IgnoreFunc(NFq::TEvPrivate::TEvCreateSession); - IgnoreFunc(NFq::TEvPrivate::TEvDataParsed); IgnoreFunc(NFq::TEvPrivate::TEvDataAfterFilteration); IgnoreFunc(NFq::TEvPrivate::TEvStatus); IgnoreFunc(NFq::TEvPrivate::TEvDataFiltered); @@ -370,27 +362,6 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvCreateSession::TPtr&) { CreateTopicSession(); } -void TTopicSession::Handle(NFq::TEvPrivate::TEvDataParsed::TPtr& ev) { - LOG_ROW_DISPATCHER_TRACE("TEvDataParsed, offset " << ev->Get()->Offset); - - for (auto v: ev->Get()->Value) { - LOG_ROW_DISPATCHER_TRACE("v " << v); - } - - for (auto& [actorId, info] : Clients) { - try { - if (!info.Filter) { - continue; - } - info.Filter->Push(ev->Get()->Offset, ev->Get()->Value); - } catch (const std::exception& e) { - FatalError(e.what(), &info.Filter); - } - } - auto event = std::make_unique(ev->Get()->Offset); - Send(SelfId(), event.release()); -} - void TTopicSession::Handle(NFq::TEvPrivate::TEvDataAfterFilteration::TPtr& ev) { LOG_ROW_DISPATCHER_TRACE("TEvDataAfterFilteration, read actor id " << ev->Get()->ReadActorId.ToString()); auto it = Clients.find(ev->Get()->ReadActorId); @@ -422,11 +393,11 @@ void TTopicSession::Handle(NFq::TEvPrivate::TEvStatus::TPtr&) { } void TTopicSession::Handle(NFq::TEvPrivate::TEvDataFiltered::TPtr& ev) { - LOG_ROW_DISPATCHER_TRACE("TEvDataFiltered, offset " << ev->Get()->Offset); + LOG_ROW_DISPATCHER_TRACE("TEvDataFiltered, offset " << ev->Get()->Offset << ", number values " << ev->Get()->NumberValues); + const ui64 offset = ev->Get()->Offset + ev->Get()->NumberValues; for (auto& [actorId, info] : Clients) { - if (!info.NextMessageOffset - || *info.NextMessageOffset < ev->Get()->Offset + 1) { - info.NextMessageOffset = ev->Get()->Offset + 1; + if (!info.NextMessageOffset || *info.NextMessageOffset < offset) { + info.NextMessageOffset = offset; } } } @@ -472,14 +443,13 @@ void TTopicSession::CloseTopicSession() { void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TReadSessionEvent::TDataReceivedEvent& event) { Self.Metrics.RowsRead->Add(event.GetMessages().size()); for (const auto& message : event.GetMessages()) { - const TString& data = message.GetData(); - Self.IngressStats.Bytes += data.size(); LOG_ROW_DISPATCHER_TRACE("Data received: " << message.DebugString(true)); - TString item = message.GetData(); - Self.SendToParsing(message.GetOffset(), item); + Self.IngressStats.Bytes += message.GetData().size(); Self.LastMessageOffset = message.GetOffset(); } + + Self.SendToParsing(event.GetMessages()); } void TTopicSession::TTopicEventProcessor::operator()(NYdb::NTopic::TSessionClosedEvent& ev) { @@ -528,30 +498,60 @@ TString TTopicSession::GetSessionId() const { return ReadSession ? ReadSession->GetSessionId() : TString{"empty"}; } -void TTopicSession::SendToParsing(ui64 offset, const TString& message) { - LOG_ROW_DISPATCHER_TRACE("SendToParsing, message " << message); +void TTopicSession::SendToParsing(const TVector& messages) { + size_t valuesSize = 0; + for (const auto& message : messages) { + const auto& data = message.GetData(); + valuesSize += data.size(); - for (auto& readActorId : ClientsWithoutPredicate) { - auto it = Clients.find(readActorId); - Y_ENSURE(it != Clients.end(), "Internal error: unknown client"); - auto& info = it->second; - if (!info.Filter) { - LOG_ROW_DISPATCHER_TRACE("Send message to client without parsing/filtering"); - AddDataToClient(info, offset, message); + for (const auto& readActorId : ClientsWithoutPredicate) { + const auto it = Clients.find(readActorId); + Y_ENSURE(it != Clients.end(), "Internal error: unknown client"); + if (auto& info = it->second; !info.Filter) { + LOG_ROW_DISPATCHER_TRACE("Send message to client without parsing/filtering"); + AddDataToClient(info, message.GetOffset(), data); + } } } - if (ClientsWithoutPredicate.size() == Clients.size()) { + if (ClientsWithoutPredicate.size() == Clients.size() || messages.empty()) { return; } + TJsonParserBuffer& buffer = Parser->GetBuffer(); + buffer.Reserve(valuesSize); + for (const auto& message : messages) { + buffer.AddValue(message.GetData()); + } + + const ui64 offset = messages.front().GetOffset(); + LOG_ROW_DISPATCHER_TRACE("SendToParsing, buffer with offset " << offset << " and size " << buffer.GetNumberValues()); + try { - Parser->Push(offset, message); + const auto& parsedValues = Parser->Parse(); + SendToFiltering(offset, parsedValues); } catch (const std::exception& e) { FatalError(e.what()); } } +void TTopicSession::SendToFiltering(ui64 offset, const TVector>& parsedValues) { + Y_ENSURE(parsedValues, "Expected non empty schema"); + LOG_ROW_DISPATCHER_TRACE("TEvDataParsed, offset " << offset << ", data:\n" << Parser->GetDebugString(parsedValues)); + + for (auto& [actorId, info] : Clients) { + try { + if (info.Filter) { + info.Filter->Push(offset, parsedValues); + } + } catch (const std::exception& e) { + FatalError(e.what(), &info.Filter); + } + } + + Send(SelfId(), new TEvPrivate::TEvDataFiltered(offset, parsedValues.front().size())); +} + void TTopicSession::SendData(ClientsInfo& info) { info.DataArrivedSent = false; if (info.Buffer.empty()) { @@ -678,13 +678,7 @@ void TTopicSession::InitParser(const NYql::NPq::NProto::TDqPqTopicSource& source } try { CurrentParserTypes = std::make_pair(GetVector(sourceParams.GetColumns()), GetVector(sourceParams.GetColumnTypes())); - NActors::TActorSystem* actorSystem = NActors::TActivationContext::ActorSystem(); - Parser = NewJsonParser( - GetVector(sourceParams.GetColumns()), - GetVector(sourceParams.GetColumnTypes()), - [actorSystem, selfId = SelfId()](ui64 offset, TList&& value){ - actorSystem->Send(selfId, new NFq::TEvPrivate::TEvDataParsed(offset, std::move(value))); - }); + Parser = NewJsonParser(GetVector(sourceParams.GetColumns()), GetVector(sourceParams.GetColumnTypes())); } catch (const NYql::NPureCalc::TCompileError& e) { FatalError(e.GetIssues()); } @@ -694,10 +688,10 @@ void TTopicSession::FatalError(const TString& message, const std::unique_ptrGetSql(); + str << ", parser description:\n" << Parser->GetDescription(); } if (filter) { - str << ", filter sql:" << (*filter)->GetSql(); + str << ", filter sql:\n" << (*filter)->GetSql(); } LOG_ROW_DISPATCHER_ERROR("FatalError: " << str.Str()); diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp index 1645f521051d..61bae7e6afbd 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_filter_ut.cpp @@ -56,8 +56,8 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push(5, {"hello1", "99"}); - Filter->Push(6, {"hello2", "101"}); + Filter->Push(5, {{"hello1"}, {"99"}}); + Filter->Push(6, {{"hello2"}, {"101"}}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]); } @@ -71,20 +71,34 @@ Y_UNIT_TEST_SUITE(TJsonFilterTests) { [&](ui64 offset, const TString& json) { result[offset] = json; }); - Filter->Push(5, {"99", "hello1"}); - Filter->Push(6, {"101", "hello2"}); + Filter->Push(5, {{"99"}, {"hello1"}}); + Filter->Push(6, {{"101"}, {"hello2"}}); UNIT_ASSERT_VALUES_EQUAL(1, result.size()); UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]); } - Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { + Y_UNIT_TEST_F(ManyValues, TFixture) { + TMap result; + MakeFilter( + {"a1", "a2"}, + {"String", "UInt64"}, + "where a2 > 100", + [&](ui64 offset, const TString& json) { + result[offset] = json; + }); + Filter->Push(5, {{"hello1", "hello2"}, {"99", "101"}}); + UNIT_ASSERT_VALUES_EQUAL(1, result.size()); + UNIT_ASSERT_VALUES_EQUAL(R"({"a1":"hello2","a2":101})", result[6]); + } + + Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { MakeFilter( {"a1", "a2"}, {"String", "UInt64"}, "where Unwrap(a2) = 1", [&](ui64, const TString&) { }); - UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push(5, {"99", "hello1"}), yexception, "Failed to unwrap empty optional"); - } + UNIT_ASSERT_EXCEPTION_CONTAINS(Filter->Push(5, {{"99"}, {"hello1"}}), yexception, "Failed to unwrap empty optional"); + } } } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp index a9c389d3900f..54fde6580784 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/json_parser_ut.cpp @@ -7,10 +7,10 @@ #include #include -#include - #include +#include + namespace { using namespace NKikimr; @@ -24,8 +24,9 @@ class TFixture : public NUnitTest::TBaseFixture { void SetUp(NUnitTest::TTestContext&) override { TAutoPtr app = new TAppPrepare(); + Runtime.SetLogBackend(CreateStderrBackend()); + Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_TRACE); Runtime.Initialize(app->Unwrap()); - Runtime.SetLogPriority(NKikimrServices::FQ_ROW_DISPATCHER, NLog::PRI_DEBUG); } void TearDown(NUnitTest::TTestContext& /* context */) override { @@ -34,89 +35,108 @@ class TFixture : public NUnitTest::TBaseFixture { } } - void MakeParser(TVector columns, TVector types, NFq::TJsonParser::TCallback callback) { - try { - Parser = NFq::NewJsonParser( - columns, - types, - callback); - } catch (NYql::NPureCalc::TCompileError compileError) { - UNIT_ASSERT_C(false, TStringBuilder() << "Failed to create json parser: " << compileError.what() << "\nQuery text:\n" << compileError.GetYql() << "Reason:\n" << compileError.GetIssues()); - } + void MakeParser(TVector columns, TVector types) { + Parser = NFq::NewJsonParser(columns, types); + } + + void MakeParser(TVector columns) { + MakeParser(columns, TVector(columns.size(), "String")); } - void MakeParser(TVector columns, NFq::TJsonParser::TCallback callback) { - MakeParser(columns, TVector(columns.size(), "String"), callback); + void PushToParser(const TString& data) { + TJsonParserBuffer& buffer = Parser->GetBuffer(); + buffer.Reserve(data.size()); + buffer.AddValue(data); + + ParsedValues = Parser->Parse(); + ResultNumberValues = ParsedValues ? ParsedValues.front().size() : 0; + } + + TVector GetParsedRow(size_t id) const { + TVector result; + result.reserve(ParsedValues.size()); + for (const auto& columnResult : ParsedValues) { + result.emplace_back(columnResult[id]); + } + return result; } TActorSystemStub actorSystemStub; NActors::TTestActorRuntime Runtime; std::unique_ptr Parser; + + ui64 ResultNumberValues = 0; + TVector> ParsedValues; }; Y_UNIT_TEST_SUITE(TJsonParserTests) { - Y_UNIT_TEST_F(Simple1, TFixture) { - TList result; - ui64 resultOffset; - MakeParser({"a1", "a2"}, {"String", "Optional"}, [&](ui64 offset, TList&& value){ - resultOffset = offset; - result = std::move(value); - }); - Parser->Push(5, R"({"a1": "hello1", "a2": 101, "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(5, resultOffset); + Y_UNIT_TEST_F(Simple1, TFixture) { + MakeParser({"a1", "a2"}, {"String", "Optional"}); + PushToParser(R"({"a1": "hello1", "a2": 101, "event": "event1"})"); + UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); + + const auto& result = GetParsedRow(0); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); UNIT_ASSERT_VALUES_EQUAL("hello1", result.front()); UNIT_ASSERT_VALUES_EQUAL("101", result.back()); } - Y_UNIT_TEST_F(Simple2, TFixture) { - TList result; - ui64 resultOffset; - MakeParser({"a2", "a1"}, [&](ui64 offset, TList&& value){ - resultOffset = offset; - result = std::move(value); - }); - Parser->Push(5, R"({"a1": "hello1", "a2": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(5, resultOffset); + Y_UNIT_TEST_F(Simple2, TFixture) { + MakeParser({"a2", "a1"}); + PushToParser(R"({"a1": "hello1", "a2": "101", "event": "event1"})"); + UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); + + const auto& result = GetParsedRow(0); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); UNIT_ASSERT_VALUES_EQUAL("101", result.front()); UNIT_ASSERT_VALUES_EQUAL("hello1", result.back()); } - Y_UNIT_TEST_F(Simple3, TFixture) { - TList result; - ui64 resultOffset; - MakeParser({"a1", "a2"}, [&](ui64 offset, TList&& value){ - resultOffset = offset; - result = std::move(value); - }); - Parser->Push(5, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(5, resultOffset); + Y_UNIT_TEST_F(Simple3, TFixture) { + MakeParser({"a1", "a2"}); + PushToParser(R"({"a2": "hello1", "a1": "101", "event": "event1"})"); + UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); + + const auto& result = GetParsedRow(0); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); UNIT_ASSERT_VALUES_EQUAL("101", result.front()); UNIT_ASSERT_VALUES_EQUAL("hello1", result.back()); } - Y_UNIT_TEST_F(Simple4, TFixture) { - TList result; - ui64 resultOffset; - MakeParser({"a2", "a1"}, [&](ui64 offset, TList&& value){ - resultOffset = offset; - result = std::move(value); - }); - Parser->Push(5, R"({"a2": "hello1", "a1": "101", "event": "event1"})"); - UNIT_ASSERT_VALUES_EQUAL(5, resultOffset); + Y_UNIT_TEST_F(Simple4, TFixture) { + MakeParser({"a2", "a1"}); + PushToParser(R"({"a2": "hello1", "a1": "101", "event": "event1"})"); + UNIT_ASSERT_VALUES_EQUAL(1, ResultNumberValues); + + const auto& result = GetParsedRow(0); UNIT_ASSERT_VALUES_EQUAL(2, result.size()); UNIT_ASSERT_VALUES_EQUAL("hello1", result.front()); UNIT_ASSERT_VALUES_EQUAL("101", result.back()); } - Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { + Y_UNIT_TEST_F(ManyValues, TFixture) { + MakeParser({"a1", "a2"}); + + TJsonParserBuffer& buffer = Parser->GetBuffer(); + buffer.AddValue(R"({"a1": "hello1", "a2": 101, "event": "event1"})"); + buffer.AddValue(R"({"a1": "hello1", "a2": "101", "event": "event2"})"); + buffer.AddValue(R"({"a2": "101", "a1": "hello1", "event": "event3"})"); + + ParsedValues = Parser->Parse(); + ResultNumberValues = ParsedValues.front().size(); + UNIT_ASSERT_VALUES_EQUAL(3, ResultNumberValues); + for (size_t i = 0; i < ResultNumberValues; ++i) { + const auto& result = GetParsedRow(i); + UNIT_ASSERT_VALUES_EQUAL_C(2, result.size(), i); + UNIT_ASSERT_VALUES_EQUAL_C("hello1", result.front(), i); + UNIT_ASSERT_VALUES_EQUAL_C("101", result.back(), i); + } + } - MakeParser({"a2", "a1"}, [&](ui64, TList&&){ }); - UNIT_ASSERT_EXCEPTION_CONTAINS(Parser->Push(5, R"(ydb)"), yexception, "DB::ParsingException: Cannot parse input: expected '{' before: 'ydb': (at row 1)"); + Y_UNIT_TEST_F(ThrowExceptionByError, TFixture) { + MakeParser({"a2", "a1"}); + UNIT_ASSERT_EXCEPTION_CONTAINS(PushToParser(R"(ydb)"), simdjson::simdjson_error, "INCORRECT_TYPE: The JSON element does not have the requested type."); } } } - diff --git a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp index 65c24fcb85f1..25e9d5a7c810 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp +++ b/ydb/core/fq/libs/row_dispatcher/ut/topic_session_ut.cpp @@ -99,20 +99,20 @@ class TFixture : public NUnitTest::TBaseFixture { void ExpectMessageBatch(NActors::TActorId readActorId, const std::vector& expected) { auto eventHolder = Runtime.GrabEdgeEvent(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); UNIT_ASSERT(eventHolder.Get() != nullptr); - UNIT_ASSERT(eventHolder->Get()->ReadActorId == readActorId); - UNIT_ASSERT(expected.size() == eventHolder->Get()->Record.MessagesSize()); + UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId); + UNIT_ASSERT_VALUES_EQUAL(expected.size(), eventHolder->Get()->Record.MessagesSize()); for (size_t i = 0; i < expected.size(); ++i) { NFq::NRowDispatcherProto::TEvMessage message = eventHolder->Get()->Record.GetMessages(i); std::cerr << "message.GetJson() " << message.GetJson() << std::endl; - UNIT_ASSERT(expected[i] == message.GetJson()); + UNIT_ASSERT_VALUES_EQUAL(expected[i], message.GetJson()); } } void ExpectSessionError(NActors::TActorId readActorId, TString message) { auto eventHolder = Runtime.GrabEdgeEvent(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); UNIT_ASSERT(eventHolder.Get() != nullptr); - UNIT_ASSERT(eventHolder->Get()->ReadActorId == readActorId); - UNIT_ASSERT(TString(eventHolder->Get()->Record.GetMessage()).Contains(message)); + UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId); + UNIT_ASSERT_STRING_CONTAINS(TString(eventHolder->Get()->Record.GetMessage()), message); } void ExpectNewDataArrived(TSet readActorIds) { @@ -129,7 +129,7 @@ class TFixture : public NUnitTest::TBaseFixture { Runtime.Send(new IEventHandle(TopicSession, readActorId, new TEvRowDispatcher::TEvGetNextBatch())); auto eventHolder = Runtime.GrabEdgeEvent(RowDispatcherActorId, TDuration::Seconds(GrabTimeoutSec)); UNIT_ASSERT(eventHolder.Get() != nullptr); - UNIT_ASSERT(eventHolder->Get()->ReadActorId == readActorId); + UNIT_ASSERT_VALUES_EQUAL(eventHolder->Get()->ReadActorId, readActorId); return eventHolder->Get()->Record.MessagesSize(); } @@ -284,7 +284,7 @@ Y_UNIT_TEST_SUITE(TopicSessionTests) { const std::vector data = { "not json", "noch einmal / nicht json" }; PQWrite(data, topicName); - ExpectSessionError(ReadActorId1, "DB::ParsingException: Cannot parse input: expected '{' before: 'not json': (at row 1)"); + ExpectSessionError(ReadActorId1, "INCORRECT_TYPE: The JSON element does not have the requested type."); StopSession(ReadActorId1, source); } diff --git a/ydb/core/fq/libs/row_dispatcher/ut/ya.make b/ydb/core/fq/libs/row_dispatcher/ut/ya.make index 25242d092f28..9486156aad38 100644 --- a/ydb/core/fq/libs/row_dispatcher/ut/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/ut/ya.make @@ -19,8 +19,6 @@ PEERDIR( ydb/library/yql/udfs/common/json2 ydb/library/yql/udfs/common/yson2 ydb/tests/fq/pq_async_io - ydb/library/yql/sql/pg_dummy - ydb/library/yql/udfs/common/clickhouse/client ) SIZE(MEDIUM) diff --git a/ydb/core/fq/libs/row_dispatcher/ya.make b/ydb/core/fq/libs/row_dispatcher/ya.make index f1f036d20dc0..44be15461ee6 100644 --- a/ydb/core/fq/libs/row_dispatcher/ya.make +++ b/ydb/core/fq/libs/row_dispatcher/ya.make @@ -13,6 +13,7 @@ SRCS( PEERDIR( contrib/libs/fmt + contrib/libs/simdjson ydb/core/fq/libs/actors/logging ydb/core/fq/libs/config/protos ydb/core/fq/libs/control_plane_storage @@ -34,6 +35,8 @@ YQL_LAST_ABI_VERSION() END() -RECURSE_FOR_TESTS( - ut -) +IF(NOT EXPORT_CMAKE) + RECURSE_FOR_TESTS( + ut + ) +ENDIF()