diff --git a/CHANGELOG.md b/CHANGELOG.md index 69cbd0aaa3..dd883da34f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -36,3 +36,4 @@ your changes, such as: - [public] [both] [updated] add a new feature ## [Unreleased] +- [inner] [both] [updated] Support SLS Metricstore output \ No newline at end of file diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index 94471e1e54..cfd0f9f909 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -388,6 +388,9 @@ bool DiskBufferWriter::ReadNextEncryption(int32_t& pos, if (!bufferMeta.has_compresstype()) { bufferMeta.set_compresstype(sls_logs::SlsCompressType::SLS_CMP_LZ4); } + if (!bufferMeta.has_telemetrytype()) { + bufferMeta.set_telemetrytype(sls_logs::SLS_TELEMETRY_TYPE_LOGS); + } buffer = new char[meta.mEncryptionSize + 1]; nbytes = fread(buffer, sizeof(char), meta.mEncryptionSize, fin); @@ -469,6 +472,7 @@ void DiskBufferWriter::SendEncryptionBuffer(const std::string& filename, int32_t bufferMeta.set_datatype(int(RawDataType::EVENT_GROUP)); bufferMeta.set_rawsize(meta.mLogDataSize); bufferMeta.set_compresstype(sls_logs::SLS_CMP_LZ4); + bufferMeta.set_telemetrytype(sls_logs::SLS_TELEMETRY_TYPE_LOGS); } } if (!sendResult) { @@ -650,6 +654,7 @@ bool DiskBufferWriter::SendToBufferFile(SenderQueueItem* dataPtr) { bufferMeta.set_rawsize(data->mRawSize); bufferMeta.set_shardhashkey(data->mShardHashKey); bufferMeta.set_compresstype(ConvertCompressType(flusher->GetCompressType())); + bufferMeta.set_telemetrytype(flusher->mTelemetryType); string encodedInfo; bufferMeta.SerializeToString(&encodedInfo); @@ -727,7 +732,14 @@ SendResult DiskBufferWriter::SendToNetSync(sdk::Client* sendClient, ++retryTimes; try { if (bufferMeta.datatype() == int(RawDataType::EVENT_GROUP)) { - if (bufferMeta.has_shardhashkey() && !bufferMeta.shardhashkey().empty()) + if (bufferMeta.has_telemetrytype() + && bufferMeta.telemetrytype() == sls_logs::SLS_TELEMETRY_TYPE_METRICS) { + sendClient->PostMetricStoreLogs(bufferMeta.project(), + bufferMeta.logstore(), + bufferMeta.compresstype(), + logData, + bufferMeta.rawsize()); + } else if (bufferMeta.has_shardhashkey() && !bufferMeta.shardhashkey().empty()) sendClient->PostLogStoreLogs(bufferMeta.project(), bufferMeta.logstore(), bufferMeta.compresstype(), diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 1650d9fb79..102eba7597 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -14,6 +14,8 @@ #include "plugin/flusher/sls/FlusherSLS.h" +#include "sls_logs.pb.h" + #ifdef __ENTERPRISE__ #include "config/provider/EnterpriseConfigProvider.h" #endif @@ -56,6 +58,7 @@ DEFINE_FLAG_INT32(discard_send_fail_interval, "discard data when send fail after DEFINE_FLAG_INT32(profile_data_send_retrytimes, "how many times should retry if profile data send fail", 5); DEFINE_FLAG_INT32(unknow_error_try_max, "discard data when try times > this value", 5); DEFINE_FLAG_BOOL(global_network_success, "global network success flag, default false", false); +DEFINE_FLAG_BOOL(enable_metricstore_channel, "only works for metrics data for enhance metrics query performance", true); DECLARE_FLAG_BOOL(send_prefer_real_ip); @@ -360,6 +363,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline // TelemetryType string telemetryType; + if (!GetOptionalStringParam(config, "TelemetryType", telemetryType, errorMsg)) { PARAM_WARNING_DEFAULT(mContext->GetLogger(), mContext->GetAlarm(), @@ -371,7 +375,8 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mContext->GetLogstoreName(), mContext->GetRegion()); } else if (telemetryType == "metrics") { - mTelemetryType = TelemetryType::METRIC; + mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS + : sls_logs::SLS_TELEMETRY_TYPE_LOGS; } else if (!telemetryType.empty() && telemetryType != "logs") { PARAM_WARNING_DEFAULT(mContext->GetLogger(), mContext->GetAlarm(), @@ -559,20 +564,29 @@ unique_ptr FlusherSLS::BuildRequest(SenderQueueItem* item) cons } if (data->mType == RawDataType::EVENT_GROUP) { - if (data->mShardHashKey.empty()) { - return sendClient->CreatePostLogStoreLogsRequest( + if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) { + return sendClient->CreatePostMetricStoreLogsRequest( mProject, data->mLogstore, ConvertCompressType(GetCompressType()), data->mData, data->mRawSize, item); } else { - auto& exactlyOnceCpt = data->mExactlyOnceCheckpoint; - int64_t hashKeySeqID = exactlyOnceCpt ? exactlyOnceCpt->data.sequence_id() : sdk::kInvalidHashKeySeqID; - return sendClient->CreatePostLogStoreLogsRequest(mProject, - data->mLogstore, - ConvertCompressType(GetCompressType()), - data->mData, - data->mRawSize, - item, - data->mShardHashKey, - hashKeySeqID); + if (data->mShardHashKey.empty()) { + return sendClient->CreatePostLogStoreLogsRequest(mProject, + data->mLogstore, + ConvertCompressType(GetCompressType()), + data->mData, + data->mRawSize, + item); + } else { + auto& exactlyOnceCpt = data->mExactlyOnceCheckpoint; + int64_t hashKeySeqID = exactlyOnceCpt ? exactlyOnceCpt->data.sequence_id() : sdk::kInvalidHashKeySeqID; + return sendClient->CreatePostLogStoreLogsRequest(mProject, + data->mLogstore, + ConvertCompressType(GetCompressType()), + data->mData, + data->mRawSize, + item, + data->mShardHashKey, + hashKeySeqID); + } } } else { if (data->mShardHashKey.empty()) @@ -726,8 +740,8 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) // the possibility of hash key conflict is very low, so data is // dropped here. cpt->Commit(); - failDetail << ", drop exactly once log group and commit checkpoint" << " checkpointKey:" << cpt->key - << " checkpoint:" << cpt->data.DebugString(); + failDetail << ", drop exactly once log group and commit checkpoint" + << " checkpointKey:" << cpt->key << " checkpoint:" << cpt->data.DebugString(); suggestion << "no suggestion"; LogtailAlarm::GetInstance()->SendAlarm( EXACTLY_ONCE_ALARM, diff --git a/core/plugin/flusher/sls/FlusherSLS.h b/core/plugin/flusher/sls/FlusherSLS.h index 7e3d4658a1..235355b1ca 100644 --- a/core/plugin/flusher/sls/FlusherSLS.h +++ b/core/plugin/flusher/sls/FlusherSLS.h @@ -31,12 +31,12 @@ #include "pipeline/limiter/ConcurrencyLimiter.h" #include "pipeline/plugin/interface/HttpFlusher.h" #include "pipeline/serializer/SLSSerializer.h" +#include "protobuf/sls/sls_logs.pb.h" namespace logtail { class FlusherSLS : public HttpFlusher { public: - enum class TelemetryType { LOG, METRIC }; static std::shared_ptr GetProjectConcurrencyLimiter(const std::string& project); static std::shared_ptr GetRegionConcurrencyLimiter(const std::string& region); @@ -77,7 +77,7 @@ class FlusherSLS : public HttpFlusher { std::string mRegion; std::string mEndpoint; std::string mAliuid; - TelemetryType mTelemetryType = TelemetryType::LOG; + sls_logs::SlsTelemetryType mTelemetryType = sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS; std::vector mShardHashKeys; uint32_t mMaxSendRate = 0; // preserved only for exactly once uint32_t mFlowControlExpireTime = 0; diff --git a/core/protobuf/sls/logtail_buffer_meta.proto b/core/protobuf/sls/logtail_buffer_meta.proto index 458027f715..dc2639e997 100644 --- a/core/protobuf/sls/logtail_buffer_meta.proto +++ b/core/protobuf/sls/logtail_buffer_meta.proto @@ -27,4 +27,5 @@ message LogtailBufferMeta optional int32 rawsize = 6; optional string shardhashkey = 7; optional SlsCompressType compresstype = 8; + optional SlsTelemetryType telemetrytype = 9; } diff --git a/core/protobuf/sls/sls_logs.proto b/core/protobuf/sls/sls_logs.proto index 16edf75bb5..9f3aad5856 100644 --- a/core/protobuf/sls/sls_logs.proto +++ b/core/protobuf/sls/sls_logs.proto @@ -23,6 +23,12 @@ enum SlsCompressType SLS_CMP_ZSTD = 3; } +enum SlsTelemetryType +{ + SLS_TELEMETRY_TYPE_LOGS = 0; + SLS_TELEMETRY_TYPE_METRICS = 1; +} + message Log { required uint32 Time = 1;// UNIX Time Format diff --git a/core/sdk/Client.cpp b/core/sdk/Client.cpp index 887f207291..bc1cc16465 100644 --- a/core/sdk/Client.cpp +++ b/core/sdk/Client.cpp @@ -14,6 +14,7 @@ #include "Client.h" +#include "Common.h" #include "CurlImp.h" #include "Exception.h" #include "Result.h" @@ -219,7 +220,8 @@ namespace sdk { sls_logs::SlsCompressType compressType, const std::string& compressedLogGroup, uint32_t rawSize, - const std::string& hashKey) { + const std::string& hashKey, + bool isTimeSeries) { map httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_PROTOBUF; if (!mKeyProvider.empty()) { @@ -227,7 +229,11 @@ namespace sdk { } httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(rawSize); httpHeader[X_LOG_COMPRESSTYPE] = Client::GetCompressTypeString(compressType); - return SynPostLogStoreLogs(project, logstore, compressedLogGroup, httpHeader, hashKey); + if (isTimeSeries) { + return SynPostMetricStoreLogs(project, logstore, compressedLogGroup, httpHeader); + } else { + return SynPostLogStoreLogs(project, logstore, compressedLogGroup, httpHeader, hashKey); + } } PostLogStoreLogsResponse Client::PostLogStoreLogPackageList(const std::string& project, @@ -253,7 +259,8 @@ namespace sdk { uint32_t rawSize, SenderQueueItem* item, const std::string& hashKey, - int64_t hashKeySeqID) { + int64_t hashKeySeqID, + bool isTimeSeries) { map httpHeader; httpHeader[CONTENT_TYPE] = TYPE_LOG_PROTOBUF; if (!mKeyProvider.empty()) { @@ -261,10 +268,16 @@ namespace sdk { } httpHeader[X_LOG_BODYRAWSIZE] = std::to_string(rawSize); httpHeader[X_LOG_COMPRESSTYPE] = Client::GetCompressTypeString(compressType); - return CreateAsynPostLogStoreLogsRequest( - project, logstore, compressedLogGroup, httpHeader, hashKey, hashKeySeqID, item); + if (isTimeSeries) { + return CreateAsynPostMetricStoreLogsRequest( + project, logstore, compressedLogGroup, httpHeader,item); + } else { + return CreateAsynPostLogStoreLogsRequest( + project, logstore, compressedLogGroup, httpHeader, hashKey, hashKeySeqID, item); + } } + unique_ptr Client::CreatePostLogStoreLogPackageListRequest(const std::string& project, const std::string& logstore, sls_logs::SlsCompressType compressType, @@ -314,6 +327,22 @@ namespace sdk { } } + std::unique_ptr + Client::CreateAsynPostMetricStoreLogsRequest(const std::string& project, + const std::string& logstore, + const std::string& body, + std::map& httpHeader, + SenderQueueItem* item) { + string operation = METRICSTORES; + operation.append("/").append(project).append("/").append(logstore).append("/api/v1/write"); + httpHeader[CONTENT_MD5] = CalcMD5(body); + map parameterList; + string host = GetSlsHost(); + SetCommonHeader(httpHeader, (int32_t)(body.length()), ""); + string signature = GetUrlSignature(HTTP_POST, operation, httpHeader, parameterList, body, GetAccessKey()); + httpHeader[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + GetAccessKeyId() + ':' + signature; + return make_unique(HTTP_POST, mUsingHTTPS, host, mPort, operation, "", httpHeader, body, item); + } unique_ptr Client::CreateAsynPostLogStoreLogsRequest(const std::string& project, const std::string& logstore, @@ -395,6 +424,24 @@ namespace sdk { return ret; } + PostLogStoreLogsResponse Client::SynPostMetricStoreLogs(const std::string& project, + const std::string& logstore, + const std::string& body, + std::map& httpHeader, + std::string* realIpPtr) { + string operation = METRICSTORES; + operation.append("/").append(project).append("/").append(logstore).append("/api/v1/write"); + httpHeader[CONTENT_MD5] = CalcMD5(body); + map parameterList; + HttpMessage httpResponse; + SendRequest(project, HTTP_POST, operation, body, parameterList, httpHeader, httpResponse, realIpPtr); + PostLogStoreLogsResponse ret; + ret.bodyBytes = (int32_t)body.size(); + ret.statusCode = httpResponse.statusCode; + ret.requestId = httpResponse.header[X_LOG_REQUEST_ID]; + return ret; + } + PostLogStoreLogsResponse Client::PostLogUsingWebTracking(const std::string& project, const std::string& logstore, sls_logs::SlsCompressType compressType, diff --git a/core/sdk/Client.h b/core/sdk/Client.h index 6203b8350d..54a6136c07 100644 --- a/core/sdk/Client.h +++ b/core/sdk/Client.h @@ -100,7 +100,18 @@ namespace sdk { sls_logs::SlsCompressType compressType, const std::string& compressedLogGroup, uint32_t rawSize, - const std::string& hashKey = ""); + const std::string& hashKey = "", + bool isTimeSeries = false); + + PostLogStoreLogsResponse PostMetricStoreLogs(const std::string& project, + const std::string& logstore, + sls_logs::SlsCompressType compressType, + const std::string& compressedLogGroup, + uint32_t rawSize) { + return PostLogStoreLogs(project, logstore, compressType, compressedLogGroup, rawSize, "", true); + } + + /** Sync Put data to LOG service. Unsuccessful opertaion will cause an LOGException. * @param project The project name * @param logstore The logstore name @@ -127,7 +138,27 @@ namespace sdk { uint32_t rawSize, SenderQueueItem* item, const std::string& hashKey = "", - int64_t hashKeySeqID = kInvalidHashKeySeqID); + int64_t hashKeySeqID = kInvalidHashKeySeqID, + bool isTimeSeries = false); + /** Async Put metrics data to SLS metricstore. Unsuccessful opertaion will cause an LOGException. + * @param project The project name + * @param logstore The logstore name + * @param compressedLogGroup data of logGroup, LZ4 comressed + * @param rawSize before compress + * @param compressType compression type + * @return request_id. + */ + std::unique_ptr CreatePostMetricStoreLogsRequest(const std::string& project, + const std::string& logstore, + sls_logs::SlsCompressType compressType, + const std::string& compressedLogGroup, + uint32_t rawSize, + SenderQueueItem* item) { + return CreatePostLogStoreLogsRequest( + project, logstore, compressType, compressedLogGroup, rawSize, item, "", kInvalidHashKeySeqID, true); + } + + /** Async Put data to LOG service. Unsuccessful opertaion will cause an LOGException. * @param project The project name * @param logstore The logstore name @@ -171,6 +202,13 @@ namespace sdk { int64_t hashKeySeqID, SenderQueueItem* item); + std::unique_ptr + CreateAsynPostMetricStoreLogsRequest(const std::string& project, + const std::string& logstore, + const std::string& body, + std::map& httpHeader, + SenderQueueItem* item); + // PingSLSServer sends a trivial data packet to SLS for some inner purposes. PostLogStoreLogsResponse PingSLSServer(const std::string& project, const std::string& logstore, std::string* realIpPtr = NULL); @@ -182,6 +220,12 @@ namespace sdk { const std::string& hashKey, std::string* realIpPtr = NULL); + PostLogStoreLogsResponse SynPostMetricStoreLogs(const std::string& project, + const std::string& logstore, + const std::string& body, + std::map& httpHeader, + std::string* realIpPtr = NULL); + void SetCommonHeader(std::map& httpHeader, int32_t contentLength, const std::string& project = ""); diff --git a/core/sdk/Common.cpp b/core/sdk/Common.cpp index 431483995f..8f3a1a7890 100644 --- a/core/sdk/Common.cpp +++ b/core/sdk/Common.cpp @@ -70,6 +70,7 @@ namespace sdk { const char* const LOGE_INVALID_SEQUENCE_ID = "InvalidSequenceId"; const char* const LOGSTORES = "/logstores"; + const char* const METRICSTORES = "/prometheus"; const char* const SHARDS = "/shards"; const char* const INDEX = "/index"; const char* const CONFIGS = "/configs"; diff --git a/core/sdk/Common.h b/core/sdk/Common.h index bd6211746d..56ce512560 100644 --- a/core/sdk/Common.h +++ b/core/sdk/Common.h @@ -85,6 +85,7 @@ namespace sdk { extern const char* const LOGE_INVALID_SEQUENCE_ID; //="InvalidSequenceId"; extern const char* const LOGSTORES; //= "/logstores" + extern const char* const METRICSTORES; //= "/prometheus" extern const char* const SHARDS; //= "/shards" extern const char* const INDEX; //= "/index" extern const char* const CONFIGS; //= "/configs" diff --git a/core/sdk/CurlImp.cpp b/core/sdk/CurlImp.cpp index 05d875f6e2..9cea4bb9d6 100644 --- a/core/sdk/CurlImp.cpp +++ b/core/sdk/CurlImp.cpp @@ -13,10 +13,12 @@ // limitations under the License. #include "CurlImp.h" -#include "Exception.h" + +#include + #include "DNSCache.h" +#include "Exception.h" #include "app_config/AppConfig.h" -#include #include "common/http/Curl.h" using namespace std; diff --git a/core/unittest/flusher/FlusherSLSUnittest.cpp b/core/unittest/flusher/FlusherSLSUnittest.cpp index 71e13cbcc6..5cc19ff27a 100644 --- a/core/unittest/flusher/FlusherSLSUnittest.cpp +++ b/core/unittest/flusher/FlusherSLSUnittest.cpp @@ -104,7 +104,7 @@ void FlusherSLSUnittest::OnSuccessfulInit() { APSARA_TEST_EQUAL("", flusher->mEndpoint); #endif APSARA_TEST_EQUAL("", flusher->mAliuid); - APSARA_TEST_EQUAL(FlusherSLS::TelemetryType::LOG, flusher->mTelemetryType); + APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType); APSARA_TEST_TRUE(flusher->mShardHashKeys.empty()); APSARA_TEST_EQUAL(static_cast(INT32_FLAG(merge_log_count_limit)), flusher->mBatcher.GetEventFlushStrategy().GetMaxCnt()); @@ -154,7 +154,7 @@ void FlusherSLSUnittest::OnSuccessfulInit() { APSARA_TEST_EQUAL("cn-hangzhou.log.aliyuncs.com", flusher->mEndpoint); APSARA_TEST_EQUAL("", flusher->mAliuid); #endif - APSARA_TEST_EQUAL(FlusherSLS::TelemetryType::METRIC, flusher->mTelemetryType); + APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_METRICS, flusher->mTelemetryType); APSARA_TEST_EQUAL(1U, flusher->mShardHashKeys.size()); SenderQueueManager::GetInstance()->Clear(); @@ -185,7 +185,7 @@ void FlusherSLSUnittest::OnSuccessfulInit() { APSARA_TEST_EQUAL("", flusher->mEndpoint); #endif APSARA_TEST_EQUAL("", flusher->mAliuid); - APSARA_TEST_EQUAL(FlusherSLS::TelemetryType::LOG, flusher->mTelemetryType); + APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType); APSARA_TEST_TRUE(flusher->mShardHashKeys.empty()); SenderQueueManager::GetInstance()->Clear(); @@ -249,7 +249,7 @@ void FlusherSLSUnittest::OnSuccessfulInit() { flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1", "1", "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); - APSARA_TEST_EQUAL(FlusherSLS::TelemetryType::LOG, flusher->mTelemetryType); + APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType); SenderQueueManager::GetInstance()->Clear(); configStr = R"( @@ -267,7 +267,7 @@ void FlusherSLSUnittest::OnSuccessfulInit() { flusher->SetContext(ctx); flusher->SetMetricsRecordRef(FlusherSLS::sName, "1", "1", "1"); APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline)); - APSARA_TEST_EQUAL(FlusherSLS::TelemetryType::LOG, flusher->mTelemetryType); + APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType); SenderQueueManager::GetInstance()->Clear(); // ShardHashKeys