Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 1. support prom scrape_protocols, 2. support prom enable_compression #1719

Merged
merged 18 commits into from
Sep 19, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions core/prometheus/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ const char* const SCHEME = "scheme";
const char* const METRICS_PATH = "metrics_path";
const char* const SCRAPE_INTERVAL = "scrape_interval";
const char* const SCRAPE_TIMEOUT = "scrape_timeout";
const char* const SCRAPE_PROTOCOLS = "scrape_protocols";
const char* const HEADERS = "headers";
const char* const PARAMS = "params";
const char* const QUERY_STRING = "query_string";
Expand All @@ -71,6 +72,11 @@ const char* const PASSWORD = "password";
const char* const PASSWORD_FILE = "password_file";
const char* const BASIC_PREFIX = "Basic ";

// scrape protocols
const char* const PrometheusProto = "PrometheusProto";
const char* const PrometheusText0_0_4 = "PrometheusText0.0.4";
catdogpandas marked this conversation as resolved.
Show resolved Hide resolved
const char* const OpenMetricsText0_0_1 = "OpenMetricsText0.0.1";
const char* const OpenMetricsText1_0_0 = "OpenMetricsText1.0.0";

// metric labels
const char* const JOB = "job";
Expand All @@ -95,4 +101,10 @@ const char* const UP = "up";

const char* const SCRAPE_TIMESTAMP_MILLISEC = "scrape_timestamp_millisec";

// scrape config compression
const char* const ENABLE_COMPRESSION = "enable_compression";
const char* const ACCEPT_ENCODING = "Accept-Encoding";
const char* const GZIP = "gzip";
const char* const IDENTITY = "identity";

} // namespace logtail::prometheus
112 changes: 110 additions & 2 deletions core/prometheus/schedulers/ScrapeConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ bool ScrapeConfig::Init(const Json::Value& scrapeConfig) {
return false;
}


if (scrapeConfig.isMember(prometheus::SCRAPE_INTERVAL) && scrapeConfig[prometheus::SCRAPE_INTERVAL].isString()) {
string tmpScrapeIntervalString = scrapeConfig[prometheus::SCRAPE_INTERVAL].asString();
mScrapeIntervalSeconds = DurationToSecond(tmpScrapeIntervalString);
Expand All @@ -43,6 +44,23 @@ bool ScrapeConfig::Init(const Json::Value& scrapeConfig) {
string tmpScrapeTimeoutString = scrapeConfig[prometheus::SCRAPE_TIMEOUT].asString();
mScrapeTimeoutSeconds = DurationToSecond(tmpScrapeTimeoutString);
}
if (scrapeConfig.isMember(prometheus::SCRAPE_PROTOCOLS) && scrapeConfig[prometheus::SCRAPE_PROTOCOLS].isArray()) {
if (!InitScrapeProtocols(scrapeConfig[prometheus::SCRAPE_PROTOCOLS])) {
LOG_ERROR(sLogger, ("scrape protocol config error", scrapeConfig[prometheus::SCRAPE_PROTOCOLS]));
return false;
}
} else {
Json::Value nullJson;
InitScrapeProtocols(nullJson);
}

if (scrapeConfig.isMember(prometheus::ENABLE_COMPRESSION)
&& scrapeConfig[prometheus::ENABLE_COMPRESSION].isBool()) {
InitEnableCompression(scrapeConfig[prometheus::ENABLE_COMPRESSION].asBool());
} else {
InitEnableCompression(true);
}

if (scrapeConfig.isMember(prometheus::METRICS_PATH) && scrapeConfig[prometheus::METRICS_PATH].isString()) {
mMetricsPath = scrapeConfig[prometheus::METRICS_PATH].asString();
}
Expand Down Expand Up @@ -185,7 +203,7 @@ bool ScrapeConfig::InitBasicAuth(const Json::Value& basicAuth) {

auto token = username + ":" + password;
auto token64 = sdk::Base64Enconde(token);
mAuthHeaders[prometheus::A_UTHORIZATION] = prometheus::BASIC_PREFIX + token64;
mRequestHeaders[prometheus::A_UTHORIZATION] = prometheus::BASIC_PREFIX + token64;
return true;
}

Expand Down Expand Up @@ -219,8 +237,98 @@ bool ScrapeConfig::InitAuthorization(const Json::Value& authorization) {
return false;
}

mAuthHeaders[prometheus::A_UTHORIZATION] = type + " " + credentials;
mRequestHeaders[prometheus::A_UTHORIZATION] = type + " " + credentials;
return true;
}

bool ScrapeConfig::InitScrapeProtocols(const Json::Value& scrapeProtocols) {
catdogpandas marked this conversation as resolved.
Show resolved Hide resolved
static auto sScrapeProtocolsHeaders = std::map<string, string>{
{prometheus::PrometheusProto,
"application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited"},
{prometheus::PrometheusText0_0_4, "text/plain;version=0.0.4"},
{prometheus::OpenMetricsText0_0_1, "application/openmetrics-text;version=0.0.1"},
{prometheus::OpenMetricsText1_0_0, "application/openmetrics-text;version=1.0.0"},
};
static auto sDefaultScrapeProtocols = vector<string>{
prometheus::PrometheusText0_0_4,
prometheus::PrometheusProto,
prometheus::OpenMetricsText0_0_1,
prometheus::OpenMetricsText1_0_0,
};

auto join = [](const vector<string>& strs, const string& sep) {
string result;
for (const auto& str : strs) {
if (!result.empty()) {
result += sep;
}
result += str;
}
return result;
};

auto getScrapeProtocols = [](const Json::Value& scrapeProtocols, vector<string>& res) {
for (const auto& scrapeProtocol : scrapeProtocols) {
if (scrapeProtocol.isString()) {
res.push_back(scrapeProtocol.asString());
} else {
LOG_ERROR(sLogger, ("scrape_protocols config error", ""));
return false;
}
}
return true;
};

auto validateScrapeProtocols = [](const vector<string>& scrapeProtocols) {
set<string> dups;
for (const auto& scrapeProtocol : scrapeProtocols) {
if (!sScrapeProtocolsHeaders.count(scrapeProtocol)) {
LOG_ERROR(sLogger,
("unknown scrape protocol prometheusproto", scrapeProtocol)(
"supported",
"[OpenMetricsText0.0.1 OpenMetricsText1.0.0 PrometheusProto PrometheusText0.0.4]"));
return false;
}
if (dups.count(scrapeProtocol)) {
LOG_ERROR(sLogger, ("duplicated protocol in scrape_protocols", scrapeProtocol));
return false;
}
dups.insert(scrapeProtocol);
}
return true;
};

vector<string> tmpScrapeProtocols;

if (!getScrapeProtocols(scrapeProtocols, tmpScrapeProtocols)) {
return false;
}

// if scrape_protocols is empty, use default protocols
if (tmpScrapeProtocols.empty()) {
tmpScrapeProtocols = sDefaultScrapeProtocols;
}
if (!validateScrapeProtocols(tmpScrapeProtocols)) {
return false;
}

auto weight = tmpScrapeProtocols.size() + 1;
for (auto& tmpScrapeProtocol : tmpScrapeProtocols) {
auto val = sScrapeProtocolsHeaders[tmpScrapeProtocol];
val += ";q=0." + std::to_string(weight--);
tmpScrapeProtocol = val;
}
tmpScrapeProtocols.push_back("*/*;q=0." + ToString(weight));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个拼出来的字符串的含义是什么?会影响什么?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里是对不同的协议按照先后顺序设置权重,靠前的权重会高。会放到的请求头的 Accept 中。
比如text/plain;version=0.0.4;q=0.4,application/vnd.google.protobuf;proto=io.prometheus.client.MetricFamily;encoding=delimited;q=0.3,application/openmetrics-text;version=0.0.1;q=0.2,*/*;q=0.1这个情况会希望Exporter返回 text/plain 的优先级高。

mRequestHeaders[prometheus::ACCEPT] = join(tmpScrapeProtocols, ",");
return true;
}

void ScrapeConfig::InitEnableCompression(bool enableCompression) {
if (enableCompression) {
mRequestHeaders[prometheus::ACCEPT_ENCODING] = prometheus::GZIP;
} else {
mRequestHeaders[prometheus::ACCEPT_ENCODING] = prometheus::IDENTITY;
}
}

} // namespace logtail
9 changes: 8 additions & 1 deletion core/prometheus/schedulers/ScrapeConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

namespace logtail {


class ScrapeConfig {
public:
std::string mJobName;
Expand All @@ -20,7 +21,10 @@ class ScrapeConfig {
std::string mMetricsPath;
std::string mScheme;

std::map<std::string, std::string> mAuthHeaders;
// auth header
// scrape_protocols Accept header: PrometheusProto, OpenMetricsText0.0.1, OpenMetricsText1.0.0, PrometheusText0.0.4
// enable_compression Accept-Encoding header: gzip, identity
std::map<std::string, std::string> mRequestHeaders;

int64_t mMaxScrapeSizeBytes;
int64_t mSampleLimit;
Expand All @@ -37,10 +41,13 @@ class ScrapeConfig {
private:
bool InitBasicAuth(const Json::Value& basicAuth);
bool InitAuthorization(const Json::Value& authorization);
bool InitScrapeProtocols(const Json::Value& scrapeProtocols);
void InitEnableCompression(bool enableCompression);

#ifdef APSARA_UNIT_TEST_MAIN
friend class ScrapeConfigUnittest;
#endif
};


} // namespace logtail
4 changes: 2 additions & 2 deletions core/prometheus/schedulers/ScrapeScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void ScrapeScheduler::OnMetricResult(const HttpResponse& response, uint64_t time
if (response.mStatusCode != 200) {
mScrapeResponseSizeBytes = 0;
string headerStr;
for (const auto& [k, v] : mScrapeConfigPtr->mAuthHeaders) {
for (const auto& [k, v] : mScrapeConfigPtr->mRequestHeaders) {
headerStr.append(k).append(":").append(v).append(";");
}
LOG_WARNING(sLogger,
Expand Down Expand Up @@ -159,7 +159,7 @@ std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::
mPort,
mScrapeConfigPtr->mMetricsPath,
mScrapeConfigPtr->mQueryString,
mScrapeConfigPtr->mAuthHeaders,
mScrapeConfigPtr->mRequestHeaders,
"",
mScrapeConfigPtr->mScrapeTimeoutSeconds,
mScrapeConfigPtr->mScrapeIntervalSeconds
Expand Down
Loading
Loading