Skip to content

Commit

Permalink
feat: 1. support prom scrape_protocols, 2. support prom enable_compre…
Browse files Browse the repository at this point in the history
…ssion (#1719)
  • Loading branch information
catdogpandas committed Sep 19, 2024
1 parent 38bc1fe commit 7106316
Show file tree
Hide file tree
Showing 6 changed files with 327 additions and 16 deletions.
15 changes: 15 additions & 0 deletions core/prometheus/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const char* const SCHEME = "scheme";
const char* const METRICS_PATH = "metrics_path";
const char* const SCRAPE_INTERVAL = "scrape_interval";
const char* const SCRAPE_TIMEOUT = "scrape_timeout";
const char* const SCRAPE_PROTOCOLS = "scrape_protocols";
const char* const HEADERS = "headers";
const char* const PARAMS = "params";
const char* const QUERY_STRING = "query_string";
Expand All @@ -71,6 +72,14 @@ const char* const PASSWORD = "password";
const char* const PASSWORD_FILE = "password_file";
const char* const BASIC_PREFIX = "Basic ";

// scrape protocols, from https://prometheus.io/docs/prometheus/latest/configuration/configuration/#scrape_config
// text/plain, application/openmetrics-text will be used
// version of openmetrics is 1.0.0 or 0.0.1, from
// https://github.com/OpenObservability/OpenMetrics/blob/main/specification/OpenMetrics.md#extensions-and-improvements
const char* const PrometheusProto = "PrometheusProto";
const char* const PrometheusText0_0_4 = "PrometheusText0.0.4";
const char* const OpenMetricsText0_0_1 = "OpenMetricsText0.0.1";
const char* const OpenMetricsText1_0_0 = "OpenMetricsText1.0.0";

// metric labels
const char* const JOB = "job";
Expand All @@ -94,4 +103,10 @@ const char* const UP = "up";

const char* const SCRAPE_TIMESTAMP_MILLISEC = "scrape_timestamp_millisec";

// scrape config compression
const char* const ENABLE_COMPRESSION = "enable_compression";
const char* const ACCEPT_ENCODING = "Accept-Encoding";
const char* const GZIP = "gzip";
const char* const IDENTITY = "identity";

} // namespace logtail::prometheus
112 changes: 110 additions & 2 deletions core/prometheus/schedulers/ScrapeConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ bool ScrapeConfig::Init(const Json::Value& scrapeConfig) {
return false;
}


if (scrapeConfig.isMember(prometheus::SCRAPE_INTERVAL) && scrapeConfig[prometheus::SCRAPE_INTERVAL].isString()) {
string tmpScrapeIntervalString = scrapeConfig[prometheus::SCRAPE_INTERVAL].asString();
mScrapeIntervalSeconds = DurationToSecond(tmpScrapeIntervalString);
Expand All @@ -43,6 +44,23 @@ bool ScrapeConfig::Init(const Json::Value& scrapeConfig) {
string tmpScrapeTimeoutString = scrapeConfig[prometheus::SCRAPE_TIMEOUT].asString();
mScrapeTimeoutSeconds = DurationToSecond(tmpScrapeTimeoutString);
}
if (scrapeConfig.isMember(prometheus::SCRAPE_PROTOCOLS) && scrapeConfig[prometheus::SCRAPE_PROTOCOLS].isArray()) {
if (!InitScrapeProtocols(scrapeConfig[prometheus::SCRAPE_PROTOCOLS])) {
LOG_ERROR(sLogger, ("scrape protocol config error", scrapeConfig[prometheus::SCRAPE_PROTOCOLS]));
return false;
}
} else {
Json::Value nullJson;
InitScrapeProtocols(nullJson);
}

if (scrapeConfig.isMember(prometheus::ENABLE_COMPRESSION)
&& scrapeConfig[prometheus::ENABLE_COMPRESSION].isBool()) {
InitEnableCompression(scrapeConfig[prometheus::ENABLE_COMPRESSION].asBool());
} else {
InitEnableCompression(true);
}

if (scrapeConfig.isMember(prometheus::METRICS_PATH) && scrapeConfig[prometheus::METRICS_PATH].isString()) {
mMetricsPath = scrapeConfig[prometheus::METRICS_PATH].asString();
}
Expand Down Expand Up @@ -185,7 +203,7 @@ bool ScrapeConfig::InitBasicAuth(const Json::Value& basicAuth) {

auto token = username + ":" + password;
auto token64 = sdk::Base64Enconde(token);
mAuthHeaders[prometheus::A_UTHORIZATION] = prometheus::BASIC_PREFIX + token64;
mRequestHeaders[prometheus::A_UTHORIZATION] = prometheus::BASIC_PREFIX + token64;
return true;
}

Expand Down Expand Up @@ -219,8 +237,98 @@ bool ScrapeConfig::InitAuthorization(const Json::Value& authorization) {
return false;
}

mAuthHeaders[prometheus::A_UTHORIZATION] = type + " " + credentials;
mRequestHeaders[prometheus::A_UTHORIZATION] = type + " " + credentials;
return true;
}

bool ScrapeConfig::InitScrapeProtocols(const Json::Value& scrapeProtocols) {
static auto sScrapeProtocolsHeaders = std::map<string, string>{
{prometheus::PrometheusProto,
"application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited"},
{prometheus::PrometheusText0_0_4, "text/plain;version=0.0.4"},
{prometheus::OpenMetricsText0_0_1, "application/openmetrics-text;version=0.0.1"},
{prometheus::OpenMetricsText1_0_0, "application/openmetrics-text;version=1.0.0"},
};
static auto sDefaultScrapeProtocols = vector<string>{
prometheus::PrometheusText0_0_4,
prometheus::PrometheusProto,
prometheus::OpenMetricsText0_0_1,
prometheus::OpenMetricsText1_0_0,
};

auto join = [](const vector<string>& strs, const string& sep) {
string result;
for (const auto& str : strs) {
if (!result.empty()) {
result += sep;
}
result += str;
}
return result;
};

auto getScrapeProtocols = [](const Json::Value& scrapeProtocols, vector<string>& res) {
for (const auto& scrapeProtocol : scrapeProtocols) {
if (scrapeProtocol.isString()) {
res.push_back(scrapeProtocol.asString());
} else {
LOG_ERROR(sLogger, ("scrape_protocols config error", ""));
return false;
}
}
return true;
};

auto validateScrapeProtocols = [](const vector<string>& scrapeProtocols) {
set<string> dups;
for (const auto& scrapeProtocol : scrapeProtocols) {
if (!sScrapeProtocolsHeaders.count(scrapeProtocol)) {
LOG_ERROR(sLogger,
("unknown scrape protocol prometheusproto", scrapeProtocol)(
"supported",
"[OpenMetricsText0.0.1 OpenMetricsText1.0.0 PrometheusProto PrometheusText0.0.4]"));
return false;
}
if (dups.count(scrapeProtocol)) {
LOG_ERROR(sLogger, ("duplicated protocol in scrape_protocols", scrapeProtocol));
return false;
}
dups.insert(scrapeProtocol);
}
return true;
};

vector<string> tmpScrapeProtocols;

if (!getScrapeProtocols(scrapeProtocols, tmpScrapeProtocols)) {
return false;
}

// if scrape_protocols is empty, use default protocols
if (tmpScrapeProtocols.empty()) {
tmpScrapeProtocols = sDefaultScrapeProtocols;
}
if (!validateScrapeProtocols(tmpScrapeProtocols)) {
return false;
}

auto weight = tmpScrapeProtocols.size() + 1;
for (auto& tmpScrapeProtocol : tmpScrapeProtocols) {
auto val = sScrapeProtocolsHeaders[tmpScrapeProtocol];
val += ";q=0." + std::to_string(weight--);
tmpScrapeProtocol = val;
}
tmpScrapeProtocols.push_back("*/*;q=0." + ToString(weight));
mRequestHeaders[prometheus::ACCEPT] = join(tmpScrapeProtocols, ",");
return true;
}

void ScrapeConfig::InitEnableCompression(bool enableCompression) {
if (enableCompression) {
mRequestHeaders[prometheus::ACCEPT_ENCODING] = prometheus::GZIP;
} else {
mRequestHeaders[prometheus::ACCEPT_ENCODING] = prometheus::IDENTITY;
}
}

} // namespace logtail
9 changes: 8 additions & 1 deletion core/prometheus/schedulers/ScrapeConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

namespace logtail {


class ScrapeConfig {
public:
std::string mJobName;
Expand All @@ -20,7 +21,10 @@ class ScrapeConfig {
std::string mMetricsPath;
std::string mScheme;

std::map<std::string, std::string> mAuthHeaders;
// auth header
// scrape_protocols Accept header: PrometheusProto, OpenMetricsText0.0.1, OpenMetricsText1.0.0, PrometheusText0.0.4
// enable_compression Accept-Encoding header: gzip, identity
std::map<std::string, std::string> mRequestHeaders;

int64_t mMaxScrapeSizeBytes;
int64_t mSampleLimit;
Expand All @@ -37,10 +41,13 @@ class ScrapeConfig {
private:
bool InitBasicAuth(const Json::Value& basicAuth);
bool InitAuthorization(const Json::Value& authorization);
bool InitScrapeProtocols(const Json::Value& scrapeProtocols);
void InitEnableCompression(bool enableCompression);

#ifdef APSARA_UNIT_TEST_MAIN
friend class ScrapeConfigUnittest;
#endif
};


} // namespace logtail
4 changes: 2 additions & 2 deletions core/prometheus/schedulers/ScrapeScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void ScrapeScheduler::OnMetricResult(const HttpResponse& response, uint64_t time
if (response.mStatusCode != 200) {
mScrapeResponseSizeBytes = 0;
string headerStr;
for (const auto& [k, v] : mScrapeConfigPtr->mAuthHeaders) {
for (const auto& [k, v] : mScrapeConfigPtr->mRequestHeaders) {
headerStr.append(k).append(":").append(v).append(";");
}
LOG_WARNING(sLogger,
Expand Down Expand Up @@ -144,7 +144,7 @@ std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::
mPort,
mScrapeConfigPtr->mMetricsPath,
mScrapeConfigPtr->mQueryString,
mScrapeConfigPtr->mAuthHeaders,
mScrapeConfigPtr->mRequestHeaders,
"",
mScrapeConfigPtr->mScrapeTimeoutSeconds,
mScrapeConfigPtr->mScrapeIntervalSeconds
Expand Down
Loading

0 comments on commit 7106316

Please sign in to comment.