Skip to content

Commit

Permalink
Flusher supports new metricstore endpoint, which optimizes time serie…
Browse files Browse the repository at this point in the history
…s data storage and query (#1731)
  • Loading branch information
EvanLjp committed Sep 27, 2024
1 parent d413393 commit dfb87fc
Show file tree
Hide file tree
Showing 12 changed files with 161 additions and 32 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,4 @@ your changes, such as:
- [public] [both] [updated] add a new feature

## [Unreleased]
- [inner] [both] [updated] Support SLS Metricstore output
14 changes: 13 additions & 1 deletion core/plugin/flusher/sls/DiskBufferWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,9 @@ bool DiskBufferWriter::ReadNextEncryption(int32_t& pos,
if (!bufferMeta.has_compresstype()) {
bufferMeta.set_compresstype(sls_logs::SlsCompressType::SLS_CMP_LZ4);
}
if (!bufferMeta.has_telemetrytype()) {
bufferMeta.set_telemetrytype(sls_logs::SLS_TELEMETRY_TYPE_LOGS);
}

buffer = new char[meta.mEncryptionSize + 1];
nbytes = fread(buffer, sizeof(char), meta.mEncryptionSize, fin);
Expand Down Expand Up @@ -469,6 +472,7 @@ void DiskBufferWriter::SendEncryptionBuffer(const std::string& filename, int32_t
bufferMeta.set_datatype(int(RawDataType::EVENT_GROUP));
bufferMeta.set_rawsize(meta.mLogDataSize);
bufferMeta.set_compresstype(sls_logs::SLS_CMP_LZ4);
bufferMeta.set_telemetrytype(sls_logs::SLS_TELEMETRY_TYPE_LOGS);
}
}
if (!sendResult) {
Expand Down Expand Up @@ -650,6 +654,7 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) {
bufferMeta.set_rawsize(data->mRawSize);
bufferMeta.set_shardhashkey(data->mShardHashKey);
bufferMeta.set_compresstype(ConvertCompressType(flusher->GetCompressType()));
bufferMeta.set_telemetrytype(flusher->mTelemetryType);
string encodedInfo;
bufferMeta.SerializeToString(&encodedInfo);

Expand Down Expand Up @@ -727,7 +732,14 @@ SendResult DiskBufferWriter::SendToNetSync(sdk::Client* sendClient,
++retryTimes;
try {
if (bufferMeta.datatype() == int(RawDataType::EVENT_GROUP)) {
if (bufferMeta.has_shardhashkey() && !bufferMeta.shardhashkey().empty())
if (bufferMeta.has_telemetrytype()
&& bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_METRICS) {
sendClient->PostMetricStoreLogs(bufferMeta.project(),
bufferMeta.logstore(),
bufferMeta.compresstype(),
logData,
bufferMeta.rawsize());
} else if (bufferMeta.has_shardhashkey() && !bufferMeta.shardhashkey().empty())
sendClient->PostLogStoreLogs(bufferMeta.project(),
bufferMeta.logstore(),
bufferMeta.compresstype(),
Expand Down
44 changes: 29 additions & 15 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "plugin/flusher/sls/FlusherSLS.h"

#include "sls_logs.pb.h"

#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
#endif
Expand Down Expand Up @@ -56,6 +58,7 @@ DEFINE_FLAG_INT32(discard_send_fail_interval, "discard data when send fail after
DEFINE_FLAG_INT32(profile_data_send_retrytimes, "how many times should retry if profile data send fail", 5);
DEFINE_FLAG_INT32(unknow_error_try_max, "discard data when try times > this value", 5);
DEFINE_FLAG_BOOL(global_network_success, "global network success flag, default false", false);
DEFINE_FLAG_BOOL(enable_metricstore_channel, "only works for metrics data for enhance metrics query performance", true);

DECLARE_FLAG_BOOL(send_prefer_real_ip);

Expand Down Expand Up @@ -360,6 +363,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline

// TelemetryType
string telemetryType;

if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
Expand All @@ -371,7 +375,8 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
mContext->GetLogstoreName(),
mContext->GetRegion());
} else if (telemetryType == "metrics") {
mTelemetryType = TelemetryType::METRIC;
mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS
: sls_logs::SLS_TELEMETRY_TYPE_LOGS;
} else if (!telemetryType.empty() && telemetryType != "logs") {
PARAM_WARNING_DEFAULT(mContext->GetLogger(),
mContext->GetAlarm(),
Expand Down Expand Up @@ -559,20 +564,29 @@ unique_ptr<HttpSinkRequest> FlusherSLS::BuildRequest(SenderQueueItem* item) cons
}

if (data->mType == RawDataType::EVENT_GROUP) {
if (data->mShardHashKey.empty()) {
return sendClient->CreatePostLogStoreLogsRequest(
if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) {
return sendClient->CreatePostMetricStoreLogsRequest(
mProject, data->mLogstore, ConvertCompressType(GetCompressType()), data->mData, data->mRawSize, item);
} else {
auto& exactlyOnceCpt = data->mExactlyOnceCheckpoint;
int64_t hashKeySeqID = exactlyOnceCpt ? exactlyOnceCpt->data.sequence_id() : sdk::kInvalidHashKeySeqID;
return sendClient->CreatePostLogStoreLogsRequest(mProject,
data->mLogstore,
ConvertCompressType(GetCompressType()),
data->mData,
data->mRawSize,
item,
data->mShardHashKey,
hashKeySeqID);
if (data->mShardHashKey.empty()) {
return sendClient->CreatePostLogStoreLogsRequest(mProject,
data->mLogstore,
ConvertCompressType(GetCompressType()),
data->mData,
data->mRawSize,
item);
} else {
auto& exactlyOnceCpt = data->mExactlyOnceCheckpoint;
int64_t hashKeySeqID = exactlyOnceCpt ? exactlyOnceCpt->data.sequence_id() : sdk::kInvalidHashKeySeqID;
return sendClient->CreatePostLogStoreLogsRequest(mProject,
data->mLogstore,
ConvertCompressType(GetCompressType()),
data->mData,
data->mRawSize,
item,
data->mShardHashKey,
hashKeySeqID);
}
}
} else {
if (data->mShardHashKey.empty())
Expand Down Expand Up @@ -726,8 +740,8 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)
// the possibility of hash key conflict is very low, so data is
// dropped here.
cpt->Commit();
failDetail << ", drop exactly once log group and commit checkpoint" << " checkpointKey:" << cpt->key
<< " checkpoint:" << cpt->data.DebugString();
failDetail << ", drop exactly once log group and commit checkpoint"
<< " checkpointKey:" << cpt->key << " checkpoint:" << cpt->data.DebugString();
suggestion << "no suggestion";
LogtailAlarm::GetInstance()->SendAlarm(
EXACTLY_ONCE_ALARM,
Expand Down
4 changes: 2 additions & 2 deletions core/plugin/flusher/sls/FlusherSLS.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
#include "pipeline/limiter/ConcurrencyLimiter.h"
#include "pipeline/plugin/interface/HttpFlusher.h"
#include "pipeline/serializer/SLSSerializer.h"
#include "protobuf/sls/sls_logs.pb.h"

namespace logtail {

class FlusherSLS : public HttpFlusher {
public:
enum class TelemetryType { LOG, METRIC };

static std::shared_ptr<ConcurrencyLimiter> GetProjectConcurrencyLimiter(const std::string& project);
static std::shared_ptr<ConcurrencyLimiter> GetRegionConcurrencyLimiter(const std::string& region);
Expand Down Expand Up @@ -77,7 +77,7 @@ class FlusherSLS : public HttpFlusher {
std::string mRegion;
std::string mEndpoint;
std::string mAliuid;
TelemetryType mTelemetryType = TelemetryType::LOG;
sls_logs::SlsTelemetryType mTelemetryType = sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS;
std::vector<std::string> mShardHashKeys;
uint32_t mMaxSendRate = 0; // preserved only for exactly once
uint32_t mFlowControlExpireTime = 0;
Expand Down
1 change: 1 addition & 0 deletions core/protobuf/sls/logtail_buffer_meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,5 @@ message LogtailBufferMeta
optional int32 rawsize = 6;
optional string shardhashkey = 7;
optional SlsCompressType compresstype = 8;
optional SlsTelemetryType telemetrytype = 9;
}
6 changes: 6 additions & 0 deletions core/protobuf/sls/sls_logs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ enum SlsCompressType
SLS_CMP_ZSTD = 3;
}

enum SlsTelemetryType
{
SLS_TELEMETRY_TYPE_LOGS = 0;
SLS_TELEMETRY_TYPE_METRICS = 1;
}

message Log
{
required uint32 Time = 1;// UNIX Time Format
Expand Down
57 changes: 52 additions & 5 deletions core/sdk/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include "Client.h"

#include "Common.h"
#include "CurlImp.h"
#include "Exception.h"
#include "Result.h"
Expand Down Expand Up @@ -219,15 +220,20 @@ namespace sdk {
sls_logs::SlsCompressType compressType,
const std::string& compressedLogGroup,
uint32_t rawSize,
const std::string& hashKey) {
const std::string& hashKey,
bool isTimeSeries) {
map<string, string> httpHeader;
httpHeader[CONTENT_TYPE] = TYPE_LOG_PROTOBUF;
if (!mKeyProvider.empty()) {
httpHeader[X_LOG_KEYPROVIDER] = mKeyProvider;
}
httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(rawSize);
httpHeader[X_LOG_COMPRESSTYPE] = Client::GetCompressTypeString(compressType);
return SynPostLogStoreLogs(project, logstore, compressedLogGroup, httpHeader, hashKey);
if (isTimeSeries) {
return SynPostMetricStoreLogs(project, logstore, compressedLogGroup, httpHeader);
} else {
return SynPostLogStoreLogs(project, logstore, compressedLogGroup, httpHeader, hashKey);
}
}

PostLogStoreLogsResponse Client::PostLogStoreLogPackageList(const std::string& project,
Expand All @@ -253,18 +259,25 @@ namespace sdk {
uint32_t rawSize,
SenderQueueItem* item,
const std::string& hashKey,
int64_t hashKeySeqID) {
int64_t hashKeySeqID,
bool isTimeSeries) {
map<string, string> httpHeader;
httpHeader[CONTENT_TYPE] = TYPE_LOG_PROTOBUF;
if (!mKeyProvider.empty()) {
httpHeader[X_LOG_KEYPROVIDER] = mKeyProvider;
}
httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(rawSize);
httpHeader[X_LOG_COMPRESSTYPE] = Client::GetCompressTypeString(compressType);
return CreateAsynPostLogStoreLogsRequest(
project, logstore, compressedLogGroup, httpHeader, hashKey, hashKeySeqID, item);
if (isTimeSeries) {
return CreateAsynPostMetricStoreLogsRequest(
project, logstore, compressedLogGroup, httpHeader,item);
} else {
return CreateAsynPostLogStoreLogsRequest(
project, logstore, compressedLogGroup, httpHeader, hashKey, hashKeySeqID, item);
}
}


unique_ptr<HttpSinkRequest> Client::CreatePostLogStoreLogPackageListRequest(const std::string& project,
const std::string& logstore,
sls_logs::SlsCompressType compressType,
Expand Down Expand Up @@ -314,6 +327,22 @@ namespace sdk {
}
}

std::unique_ptr<HttpSinkRequest>
Client::CreateAsynPostMetricStoreLogsRequest(const std::string& project,
const std::string& logstore,
const std::string& body,
std::map<std::string, std::string>& httpHeader,
SenderQueueItem* item) {
string operation = METRICSTORES;
operation.append("/").append(project).append("/").append(logstore).append("/api/v1/write");
httpHeader[CONTENT_MD5] = CalcMD5(body);
map<string, string> parameterList;
string host = GetSlsHost();
SetCommonHeader(httpHeader, (int32_t)(body.length()), "");
string signature = GetUrlSignature(HTTP_POST, operation, httpHeader, parameterList, body, GetAccessKey());
httpHeader[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + GetAccessKeyId() + ':' + signature;
return make_unique<HttpSinkRequest>(HTTP_POST, mUsingHTTPS, host, mPort, operation, "", httpHeader, body, item);
}
unique_ptr<HttpSinkRequest>
Client::CreateAsynPostLogStoreLogsRequest(const std::string& project,
const std::string& logstore,
Expand Down Expand Up @@ -395,6 +424,24 @@ namespace sdk {
return ret;
}

PostLogStoreLogsResponse Client::SynPostMetricStoreLogs(const std::string& project,
const std::string& logstore,
const std::string& body,
std::map<std::string, std::string>& httpHeader,
std::string* realIpPtr) {
string operation = METRICSTORES;
operation.append("/").append(project).append("/").append(logstore).append("/api/v1/write");
httpHeader[CONTENT_MD5] = CalcMD5(body);
map<string, string> parameterList;
HttpMessage httpResponse;
SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse, realIpPtr);
PostLogStoreLogsResponse ret;
ret.bodyBytes = (int32_t)body.size();
ret.statusCode = httpResponse.statusCode;
ret.requestId = httpResponse.header[X_LOG_REQUEST_ID];
return ret;
}

PostLogStoreLogsResponse Client::PostLogUsingWebTracking(const std::string& project,
const std::string& logstore,
sls_logs::SlsCompressType compressType,
Expand Down
Loading

0 comments on commit dfb87fc

Please sign in to comment.