Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support go to cpp PipelineEventGroup transfer #1771

Merged
merged 23 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ _deps
/build/
core/build/
core/protobuf/sls/*.pb.*
core/protobuf/models/*.pb.*
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved
core/common/Version.cpp
!/Makefile
# Enterprise
Expand Down
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ set(SUB_DIRECTORIES_LIST
config config/watcher
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
runner runner/sink/http
protobuf/sls
protobuf/sls protobuf/models
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
prometheus prometheus/labels prometheus/schedulers prometheus/async
ebpf ebpf/observer ebpf/security ebpf/handler
Expand Down
3 changes: 3 additions & 0 deletions core/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ logtail_define(protobuf_BIN "Absolute path to protoc" "${DEPS_BINARY_ROOT}/proto
set(PROTO_FILE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/protobuf/sls")
set(PROTO_FILES ${PROTO_FILE_PATH}/sls_logs.proto ${PROTO_FILE_PATH}/logtail_buffer_meta.proto ${PROTO_FILE_PATH}/metric.proto ${PROTO_FILE_PATH}/checkpoint.proto)
execute_process(COMMAND ${protobuf_BIN} --proto_path=${PROTO_FILE_PATH} --cpp_out=${PROTO_FILE_PATH} ${PROTO_FILES})
set(PROTO_FILE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/protobuf/models")
set(PROTO_FILES ${PROTO_FILE_PATH}/log_event.proto ${PROTO_FILE_PATH}/metric_event.proto ${PROTO_FILE_PATH}/span_event.proto ${PROTO_FILE_PATH}/pipeline_event_group.proto)
execute_process(COMMAND ${protobuf_BIN} --proto_path=${PROTO_FILE_PATH} --cpp_out=${PROTO_FILE_PATH} ${PROTO_FILES})

# re2
macro(link_re2 target_name)
Expand Down
156 changes: 156 additions & 0 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
#include "monitor/LogtailAlarm.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "protobuf/models/pipeline_event_group.pb.h"
#include "protobuf/models/log_event.pb.h"
#include "protobuf/models/metric_event.pb.h"
#include "protobuf/models/span_event.pb.h"
#include "provider/Provider.h"

DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
Expand All @@ -42,6 +46,11 @@ using namespace logtail;

LogtailPlugin* LogtailPlugin::s_instance = NULL;

bool TransferPBToPipelineEventGroup(const logtail::models::PipelineEventGroup& src, logtail::PipelineEventGroup& dst, std::string& errMsg);
bool TransferPBToLogEvent(const logtail::models::LogEvent& src, logtail::LogEvent& dst, std::string& errMsg);
bool TransferPBToMetricEvent(const logtail::models::MetricEvent& src, logtail::MetricEvent& dst, std::string& errMsg);
bool TransferPBToSpanEvent(const logtail::models::SpanEvent& src, logtail::SpanEvent& dst, std::string& errMsg);

Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
LogtailPlugin::LogtailPlugin() {
mPluginAdapterPtr = NULL;
mPluginBasePtr = NULL;
Expand Down Expand Up @@ -563,3 +572,150 @@ K8sContainerMeta LogtailPlugin::GetContainerMeta(const string& containerID) {
}
return K8sContainerMeta();
}

bool TransferPBToPipelineEventGroup(const logtail::models::PipelineEventGroup& src, logtail::PipelineEventGroup& dst, std::string& errMsg) {
// events
if (src.has_logs()) {
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
if (src.logs().array_size() == 0) {
errMsg = "error transfer PB to PipelineEventGroup: logs array is empty";
return false;
}
dst.MutableEvents().reserve(src.logs().array_size());
for (auto& logSrc : src.logs().array()) {
auto logDst = dst.CreateLogEvent();
if (!TransferPBToLogEvent(logSrc, *logDst, errMsg)) {
return false;
}
dst.MutableEvents().emplace_back(std::move(logDst));
}
henryzhx8 marked this conversation as resolved.
Show resolved Hide resolved
} else if (src.has_metrics()) {
if (src.metrics().array_size() == 0) {
errMsg = "error transfer PB to PipelineEventGroup: metrics array is empty";
return false;
}
dst.MutableEvents().reserve(src.metrics().array_size());
for (auto& metricSrc : src.metrics().array()) {
auto metricDst = dst.CreateMetricEvent();
if (!TransferPBToMetricEvent(metricSrc, *metricDst, errMsg)) {
return false;
}
dst.MutableEvents().emplace_back(std::move(metricDst));
}
} else if (src.has_spans()) {
if (src.spans().array_size() == 0) {
errMsg = "error transfer PB to PipelineEventGroup: spans array is empty";
return false;
}
// timestamp
for (auto& spanSrc : src.spans().array()) {
auto spanDst = dst.CreateSpanEvent();
if (!TransferPBToSpanEvent(spanSrc, *spanDst, errMsg)) {
return false;
}
dst.MutableEvents().emplace_back(std::move(spanDst));
}
} else {
errMsg = "error transfer PB to PipelineEventGroup: unsupported event type";
return false;
}

// tags
for (auto& tag : src.tags()) {
dst.SetTag(tag.first, tag.second);
}

// metadatas
for (auto& metaData : src.metadata()) {
if (metaData.first == "source_id") {
dst.SetMetadata(logtail::EventGroupMetaKey::SOURCE_ID, metaData.second);
}
}

return true;
}

bool TransferPBToLogEvent(const logtail::models::LogEvent& src, logtail::LogEvent& dst, std::string& errMsg) {
// timestamp
time_t ts = static_cast<time_t>(src.timestamp() / 1e9);
time_t tns = static_cast<time_t>(src.timestamp() - static_cast<uint64_t>(ts) * 1e9);
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
dst.SetTimestamp(ts, tns);
// contents
for (auto& content_pair : src.contents()) {
dst.SetContent(content_pair.key(), content_pair.value());
}
// level
dst.SetLevel(src.level());
// fileoffset and rawsize
dst.SetPosition(src.fileoffset(), src.rawsize());
return true;
}

bool TransferPBToMetricEvent(const logtail::models::MetricEvent& src, logtail::MetricEvent& dst, std::string& errMsg) {
// timestamp
time_t ts = static_cast<time_t>(src.timestamp() / 1e9);
time_t tns = static_cast<time_t>(src.timestamp() - static_cast<uint64_t>(ts) * 1e9);
dst.SetTimestamp(ts, tns);
// name
dst.SetName(src.name());
// value
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
if (src.has_untypedsinglevalue()) {
dst.SetValue(logtail::UntypedSingleValue{src.untypedsinglevalue().value()});
} else {
errMsg = "error transfer PB to PipelineEventGroup: unsupported value type";
return false;
}
// tags
for (auto& tag_pair : src.tags()) {
dst.SetTag(tag_pair.first, tag_pair.second);
}
return true;
}

bool TransferPBToSpanEvent(const logtail::models::SpanEvent& src, logtail::SpanEvent& dst, std::string& errMsg) {
// timestamp
time_t ts = static_cast<time_t>(src.timestamp() / 1e9);
time_t tns = static_cast<time_t>(src.timestamp() - static_cast<uint64_t>(ts) * 1e9);
dst.SetTimestamp(ts, tns);

dst.SetTraceId(src.traceid());
dst.SetSpanId(src.spanid());
dst.SetTraceState(src.tracestate());
dst.SetParentSpanId(src.parentspanid());
dst.SetName(src.name());
dst.SetKind(static_cast<logtail::SpanEvent::Kind>(src.kind()));
dst.SetStartTimeNs(src.starttimens());
dst.SetEndTimeNs(src.endtimens());

// tags
for (auto& tag_pair : src.tags()) {
dst.SetTag(tag_pair.first, tag_pair.second);
}
// inner events
for (auto& event : src.events()) {
auto dstEvent = dst.AddEvent();
dstEvent->SetTimestampNs(event.timestampns());
dstEvent->SetName(event.name());
for (auto& tag_pair : event.tags()) {
dstEvent->SetTag(tag_pair.first, tag_pair.second);
}
}
// span links
for (auto& link : src.links()) {
auto dstLink = dst.AddLink();
dstLink->SetTraceId(link.traceid());
dstLink->SetSpanId(link.spanid());
dstLink->SetTraceState(link.tracestate());
for (auto& tag_pair : link.tags()) {
dstLink->SetTag(tag_pair.first, tag_pair.second);
}
}

dst.SetStatus(static_cast<logtail::SpanEvent::StatusCode>(src.status()));

// scope tags
for (auto& tag_pair : src.scopetags()) {
dst.SetScopeTag(tag_pair.first, tag_pair.second);
}

return true;
}
12 changes: 12 additions & 0 deletions core/models/LogEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,18 @@ void LogEvent::DelContent(StringView key) {
}
}

void LogEvent::SetLevel(const std::string& level) {
SetLevelNoCopy(GetSourceBuffer()->CopyString(level));
}

void LogEvent::SetLevelNoCopy(const StringBuffer& level) {
SetLevelNoCopy(StringView(level.data, level.size));
}

void LogEvent::SetLevelNoCopy(StringView level) {
mLevel = level;
}

LogEvent::ContentIterator LogEvent::FindContent(StringView key) {
auto it = mIndex.find(key);
if (it != mIndex.end()) {
Expand Down
14 changes: 10 additions & 4 deletions core/models/LogEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,16 @@ class LogEvent : public PipelineEvent {
void SetContentNoCopy(StringView key, StringView val);
void DelContent(StringView key);

void SetPosition(uint32_t offset, uint32_t size) {
void SetPosition(uint64_t offset, uint64_t size) {
mFileOffset = offset;
mRawSize = size;
}
std::pair<uint32_t, uint32_t> GetPosition() const { return {mFileOffset, mRawSize}; }
std::pair<uint64_t, uint64_t> GetPosition() const { return {mFileOffset, mRawSize}; }

StringView GetLevel() const { return mLevel; }
void SetLevel(const std::string& level);
void SetLevelNoCopy(const StringBuffer& level);
void SetLevelNoCopy(StringView level);
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved

bool Empty() const { return mIndex.empty(); }
size_t Size() const { return mIndex.size(); }
Expand Down Expand Up @@ -117,8 +122,9 @@ class LogEvent : public PipelineEvent {
ContentsContainer mContents;
size_t mAllocatedContentSize = 0;
std::map<StringView, size_t> mIndex;
uint32_t mFileOffset = 0;
uint32_t mRawSize = 0;
uint64_t mFileOffset = 0;
uint64_t mRawSize = 0;
StringView mLevel;
};

} // namespace logtail
14 changes: 14 additions & 0 deletions core/protobuf/models/log_event.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
syntax = "proto3";
package logtail.models;

message LogEvent {
uint64 Timestamp = 1;
message Content {
bytes Key = 1;
bytes Value = 2;
}
repeated Content Contents= 2;
bytes Level = 3;
uint64 FileOffset = 4;
uint64 RawSize = 5;
}
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
15 changes: 15 additions & 0 deletions core/protobuf/models/metric_event.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
syntax = "proto3";
package logtail.models;

message UntypedSingleValue {
double Value = 1;
}

message MetricEvent {
uint64 Timestamp = 1;
bytes Name = 2;
oneof Value {
UntypedSingleValue UntypedSingleValue = 3;
}
map<string, bytes> Tags = 4;
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
}
25 changes: 25 additions & 0 deletions core/protobuf/models/pipeline_event_group.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
syntax = "proto3";
package logtail.models;

import "log_event.proto";
import "metric_event.proto";
import "span_event.proto";

message PipelineEventGroup {
map<string, bytes> Metadata = 1;
map<string, bytes> Tags = 2;
message LogEvents {
repeated LogEvent Array = 1;
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
}
message MetricEvents {
repeated MetricEvent Array = 1;
}
message SpanEvents {
repeated SpanEvent Array = 1;
}
oneof PipelineEvents {
LogEvents Logs = 3;
MetricEvents Metrics = 4;
SpanEvents Spans = 5;
}
}
43 changes: 43 additions & 0 deletions core/protobuf/models/span_event.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
syntax = "proto3";
package logtail.models;

message SpanEvent {
uint64 Timestamp = 1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的时间戳单位是ns吗?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的

bytes TraceID = 2;
bytes SpanID = 3;
bytes TraceState = 4;
bytes ParentSpanID = 5;
bytes Name = 6;
enum SpanKind {
UNSPECIFIED = 0;
INTERVAL = 1;
SERVER = 2;
CLIENT = 3;
PRODUCER = 4;
CONSUMER = 5;
}
SpanKind Kind = 7;
uint64 StartTimeNs = 8;
uint64 EndTimeNs = 9;
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
map<string, bytes> Tags = 10;
message InnerEvent {
uint64 TimestampNs = 1;
bytes Name = 2;
map<string, bytes> Tags = 3;
}
repeated InnerEvent Events = 11;
message SpanLink {
string TraceID = 1;
string SpanID = 2;
string TraceState = 3;
Assassin718 marked this conversation as resolved.
Show resolved Hide resolved
map<string, bytes> Tags = 4;
}
repeated SpanLink Links = 12;
enum StatusCode {
Unset = 0;
Ok = 1;
Error = 2;
}
StatusCode Status = 13;
map<string, bytes> ScopeTags = 14;
}
7 changes: 7 additions & 0 deletions core/unittest/models/LogEventUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class LogEventUnittest : public ::testing::Test {
void TestMeta();
void TestSize();
void TestFromJsonToJson();
void TestLevel();

protected:
void SetUp() override {
Expand Down Expand Up @@ -202,6 +203,11 @@ void LogEventUnittest::TestFromJsonToJson() {
APSARA_TEST_STREQ(CompactJson(inJson).c_str(), CompactJson(outJson).c_str());
}

void LogEventUnittest::TestLevel() {
mLogEvent->SetLevel("level");
APSARA_TEST_EQUAL("level", mLogEvent->GetLevel().to_string());
}

UNIT_TEST_CASE(LogEventUnittest, TestTimestampOp)
UNIT_TEST_CASE(LogEventUnittest, TestSetContent)
UNIT_TEST_CASE(LogEventUnittest, TestDelContent)
Expand All @@ -210,6 +216,7 @@ UNIT_TEST_CASE(LogEventUnittest, TestIterateContent)
UNIT_TEST_CASE(LogEventUnittest, TestMeta)
UNIT_TEST_CASE(LogEventUnittest, TestSize)
UNIT_TEST_CASE(LogEventUnittest, TestFromJsonToJson)
UNIT_TEST_CASE(LogEventUnittest, TestLevel)

} // namespace logtail

Expand Down
Loading
Loading