diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 9f0a9d53e1..ad7e505876 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -113,7 +113,7 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/flusher/links.cmake) set(SUB_DIRECTORIES_LIST batch application app_config checkpoint compression config config/feedbacker config/provider config/watcher config_manager config_server_pb/v1 config_server_pb/v2 container_manager controller event event_handler event_listener file_server go_pipeline log_pb logger - models monitor parser pipeline plugin plugin/creator plugin/instance plugin/interface polling + metadata models monitor parser pipeline plugin plugin/creator plugin/instance plugin/interface polling profile_sender queue reader sdk sender serializer sls_control fuse prometheus sink/http route ebpf/observer ebpf/security ebpf/handler ebpf ) if (LINUX) diff --git a/core/common/LRUCache.h b/core/common/LRUCache.h new file mode 100644 index 0000000000..4294fec9ce --- /dev/null +++ b/core/common/LRUCache.h @@ -0,0 +1,245 @@ +/* + * LRUCache11 - a templated C++11 based LRU cache class that allows + * specification of + * key, value and optionally the map container type (defaults to + * std::unordered_map) + * By using the std::unordered_map and a linked list of keys it allows O(1) insert, delete + * and + * refresh operations. + * + * This is a header-only library and all you need is the LRUCache11.hpp file + * + * Github: https://github.com/mohaps/lrucache11 + * + * This is a follow-up to the LRUCache project - + * https://github.com/mohaps/lrucache + * + * Copyright (c) 2012-22 SAURAV MOHAPATRA + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +#pragma once +#include +#include +#include +#include +#include +#include +#include + +namespace logtail { + namespace lru11 { + /* + * a noop lockable concept that can be used in place of std::mutex + */ + class NullLock { + public: + void lock() {} + void unlock() {} + bool try_lock() { return true; } + }; + + /** + * error raised when a key not in cache is passed to get() + */ + class KeyNotFound : public std::invalid_argument { + public: + KeyNotFound() : std::invalid_argument("key_not_found") {} + }; + + template + struct KeyValuePair { + public: + K key; + V value; + + KeyValuePair(K k, V v) : key(std::move(k)), value(std::move(v)) {} + }; + + /** + * The LRU Cache class templated by + * Key - key type + * Value - value type + * MapType - an associative container like std::unordered_map + * LockType - a lock type derived from the Lock class (default: + *NullLock = no synchronization) + * + * The default NullLock based template is not thread-safe, however passing + *Lock=std::mutex will make it + * thread-safe + */ + template >::iterator>> + class Cache { + public: + typedef KeyValuePair node_type; + typedef std::list> list_type; + typedef Map map_type; + typedef Lock lock_type; + using Guard = std::lock_guard; + /** + * the maxSize is the soft limit of keys and (maxSize + elasticity) is the + * hard limit + * the cache is allowed to grow till (maxSize + elasticity) and is pruned back + * to maxSize keys + * set maxSize = 0 for an unbounded cache (but in that case, you're better off + * using a std::unordered_map + * directly anyway! :) + */ + explicit Cache(size_t maxSize = 64, size_t elasticity = 10) + : maxSize_(maxSize), elasticity_(elasticity) {} + virtual ~Cache() = default; + size_t size() const { + Guard g(lock_); + return cache_.size(); + } + bool empty() const { + Guard g(lock_); + return cache_.empty(); + } + void clear() { + Guard g(lock_); + cache_.clear(); + keys_.clear(); + } + void insert(const Key& k, Value v) { + Guard g(lock_); + const auto iter = cache_.find(k); + if (iter != cache_.end()) { + iter->second->value = v; + keys_.splice(keys_.begin(), keys_, iter->second); + return; + } + + keys_.emplace_front(k, std::move(v)); + cache_[k] = keys_.begin(); + prune(); + } + /** + for backward compatibity. redirects to tryGetCopy() + */ + bool tryGet(const Key& kIn, Value& vOut) { + return tryGetCopy(kIn, vOut); + } + + bool tryGetCopy(const Key& kIn, Value& vOut) { + Guard g(lock_); + Value tmp; + if (!tryGetRef_nolock(kIn, tmp)) { return false; } + vOut = tmp; + return true; + } + + bool tryGetRef(const Key& kIn, Value& vOut) { + Guard g(lock_); + return tryGetRef_nolock(kIn, vOut); + } + /** + * The const reference returned here is only + * guaranteed to be valid till the next insert/delete + * in multi-threaded apps use getCopy() to be threadsafe + */ + const Value& getRef(const Key& k) { + Guard g(lock_); + return get_nolock(k); + } + + /** + added for backward compatibility + */ + Value get(const Key& k) { return getCopy(k); } + /** + * returns a copy of the stored object (if found) + * safe to use/recommended in multi-threaded apps + */ + Value getCopy(const Key& k) { + Guard g(lock_); + return get_nolock(k); + } + + bool remove(const Key& k) { + Guard g(lock_); + auto iter = cache_.find(k); + if (iter == cache_.end()) { + return false; + } + keys_.erase(iter->second); + cache_.erase(iter); + return true; + } + bool contains(const Key& k) const { + Guard g(lock_); + return cache_.find(k) != cache_.end(); + } + + size_t getMaxSize() const { return maxSize_; } + size_t getElasticity() const { return elasticity_; } + size_t getMaxAllowedSize() const { return maxSize_ + elasticity_; } + template + void cwalk(F& f) const { + Guard g(lock_); + std::for_each(keys_.begin(), keys_.end(), f); + } + + protected: + Value get_nolock(const Key& k) { + const auto iter = cache_.find(k); + if (iter == cache_.end()) { + return nullptr; + } + keys_.splice(keys_.begin(), keys_, iter->second); + return iter->second->value; + } + bool tryGetRef_nolock(const Key& kIn, Value& vOut) { + const auto iter = cache_.find(kIn); + if (iter == cache_.end()) { + return false; + } + keys_.splice(keys_.begin(), keys_, iter->second); + vOut = iter->second->value; + return true; + } + size_t prune() { + size_t maxAllowed = maxSize_ + elasticity_; + if (maxSize_ == 0 || cache_.size() < maxAllowed) { + return 0; + } + size_t count = 0; + while (cache_.size() > maxSize_) { + cache_.erase(keys_.back().key); + keys_.pop_back(); + ++count; + } + return count; + } + + private: + // Disallow copying. + Cache(const Cache&) = delete; + Cache& operator=(const Cache&) = delete; + + mutable Lock lock_; + Map cache_; + list_type keys_; + size_t maxSize_; + size_t elasticity_; + + #ifdef APSARA_UNIT_TEST_MAIN + friend class LRUCacheUnittest; + #endif + }; + + } // namespace LRUCache11 +} //namespace logtail \ No newline at end of file diff --git a/core/metadata/ProcessorK8sMetadata.cpp b/core/metadata/ProcessorK8sMetadata.cpp new file mode 100644 index 0000000000..dc359e338f --- /dev/null +++ b/core/metadata/ProcessorK8sMetadata.cpp @@ -0,0 +1,157 @@ +/* + * Copyright 2023 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ProcessorK8sMetadata.h" + +#include +#include +#include "common/ParamExtractor.h" +#include "logger/Logger.h" +#include "models/MetricEvent.h" +#include "models/SpanEvent.h" +#include "monitor/MetricConstants.h" +#include +#include "k8sMetadata.h" + +using logtail::StringView; + +namespace logtail { + +void ProcessorK8sMetadata::Process(PipelineEventGroup& logGroup) { + if (logGroup.GetEvents().empty()) { + return; + } + EventsContainer& events = logGroup.MutableEvents(); + std::vector container_vec; + std::vector remote_ip_vec; + std::vector cotainer_not_tag; + for (size_t rIdx = 0; rIdx < events.size(); ++rIdx) { + if (!ProcessEvent(events[rIdx], container_vec, remote_ip_vec)) { + cotainer_not_tag.push_back(rIdx); + } + } + auto& k8sMetadata = K8sMetadata::GetInstance(); + if (!container_vec.empty()) { + k8sMetadata.GetByContainerIds(container_vec); + } + if (!remote_ip_vec.empty()) { + k8sMetadata.GetByIps(remote_ip_vec); + } + for (size_t i = 0; i < cotainer_not_tag.size(); ++i) { + ProcessEvent(events[i], container_vec, remote_ip_vec); + } + cotainer_not_tag.clear(); + container_vec.clear(); + remote_ip_vec.clear(); +} + +bool ProcessorK8sMetadata::ProcessEvent(PipelineEventPtr& e, std::vector& container_vec, std::vector& remote_ip_vec) { + if (!IsSupportedEvent(e)) { + return true; + } + + if (e.Is()) { + return ProcessEventForMetric(e.Cast(), container_vec, remote_ip_vec); + } else if (e.Is()) { + return ProcessEventForSpan(e.Cast(), container_vec, remote_ip_vec); + } + + return true; +} + +bool ProcessorK8sMetadata::ProcessEventForSpan(SpanEvent& e, std::vector& container_vec, std::vector& remote_ip_vec) { + bool res = true; + + auto& k8sMetadata = K8sMetadata::GetInstance(); + StringView container_id_view("container.id"); + StringView containerId_view = e.HasTag(container_id_view) ? e.GetTag(container_id_view) : StringView{}; + if (!containerId_view.empty()) { + std::string containerId(containerId_view); + std::shared_ptr container_info = k8sMetadata.GetInfoByContainerIdFromCache(containerId); + if (container_info == nullptr) { + container_vec.push_back(containerId); + res = false; + } else { + e.SetTag("workloadName", container_info->workloadName); + e.SetTag("workloadKind", container_info->workloadKind); + e.SetTag("namespace", container_info->k8sNamespace); + e.SetTag("serviceName", container_info->serviceName); + e.SetTag("pid", container_info->armsAppId); + } + } + + StringView ip_view("remote_ip"); + StringView remote_ip_view = e.HasTag(ip_view) ? e.GetTag(ip_view) : StringView{}; + if (!remote_ip_view.empty()) { + std::string remote_ip(remote_ip_view); + std::shared_ptr ip_info = k8sMetadata.GetInfoByIpFromCache(remote_ip); + if (ip_info == nullptr) { + remote_ip_vec.push_back(remote_ip); + res = false; + } else { + e.SetTag("peerWorkloadName", ip_info->workloadName); + e.SetTag("peerWorkloadKind", ip_info->workloadKind); + e.SetTag("peerNamespace", ip_info->k8sNamespace); + } + } + + return res; +} + +bool ProcessorK8sMetadata::ProcessEventForMetric(MetricEvent& e, std::vector& container_vec, std::vector& remote_ip_vec) { + bool res = true; + + auto& k8sMetadata = K8sMetadata::GetInstance(); + StringView container_id_view("container.id"); + StringView containerId_view = e.HasTag(container_id_view) ? e.GetTag(container_id_view) : StringView{}; + if (!containerId_view.empty()) { + std::string containerId(containerId_view); + std::shared_ptr container_info = k8sMetadata.GetInfoByContainerIdFromCache(containerId); + if (container_info == nullptr) { + container_vec.push_back(containerId); + res = false; + } else { + e.SetTag("workloadName", container_info->workloadName); + e.SetTag("workloadKind", container_info->workloadKind); + e.SetTag("namespace", container_info->k8sNamespace); + e.SetTag("serviceName", container_info->serviceName); + e.SetTag("pid", container_info->armsAppId); + } + } + + StringView ip_view("remote_ip"); + StringView remote_ip_view = e.HasTag(ip_view) ? e.GetTag(ip_view) : StringView{}; + if (!remote_ip_view.empty()) { + std::string remote_ip(remote_ip_view); + std::shared_ptr ip_info = k8sMetadata.GetInfoByIpFromCache(remote_ip); + if (ip_info == nullptr) { + remote_ip_vec.push_back(remote_ip); + res = false; + } else { + e.SetTag("peerWorkloadName", ip_info->workloadName); + e.SetTag("peerWorkloadKind", ip_info->workloadKind); + e.SetTag("peerNamespace", ip_info->k8sNamespace); + } + } + + return res; +} + +bool ProcessorK8sMetadata::IsSupportedEvent(const PipelineEventPtr& e) const { + return e.Is() || e.Is(); +} + +} // namespace logtail diff --git a/core/metadata/ProcessorK8sMetadata.h b/core/metadata/ProcessorK8sMetadata.h new file mode 100644 index 0000000000..4ee07b9791 --- /dev/null +++ b/core/metadata/ProcessorK8sMetadata.h @@ -0,0 +1,33 @@ +/* + * Copyright 2023 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include "models/LogEvent.h" +#include "plugin/interface/Processor.h" + +namespace logtail { + +class ProcessorK8sMetadata { +public: + void Process(PipelineEventGroup& logGroup); + bool ProcessEvent(PipelineEventPtr& e, std::vector& container_vec, std::vector& remote_ip_vec); + bool ProcessEventForMetric(MetricEvent& e, std::vector& container_vec, std::vector& remote_ip_vec); + bool ProcessEventForSpan(SpanEvent& e, std::vector& container_vec, std::vector& remote_ip_vec); +protected: + bool IsSupportedEvent(const PipelineEventPtr& e) const; +}; + +} // namespace logtail diff --git a/core/metadata/k8sMetadata.cpp b/core/metadata/k8sMetadata.cpp new file mode 100644 index 0000000000..d84b9890a7 --- /dev/null +++ b/core/metadata/k8sMetadata.cpp @@ -0,0 +1,244 @@ +// Copyright 2022 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific l + +#include +#include +#include "k8sMetadata.h" +#include "common/MachineInfoUtil.h" +#include "logger/Logger.h" + + +namespace logtail { + + size_t WriteCallback(void* contents, size_t size, size_t nmemb, void* userp) { + ((std::string*)userp)->append((char*)contents, size * nmemb); + return size * nmemb; + } + + void K8sMetadata::FromInfoJson(const Json::Value& json, k8sContainerInfo& info) { + if (!json.isMember("images") || !json.isMember("labels") || + !json.isMember("namespace") || + !json.isMember("workloadKind") || !json.isMember("workloadName")) { + return; + } + + for (const auto& key : json["images"].getMemberNames()) { + if (json["images"].isMember(key)) { + info.images[key] = json["images"][key].asString(); + } + } + for (const auto& key : json["labels"].getMemberNames()) { + if (json["labels"].isMember(key)) { + info.labels[key] = json["labels"][key].asString(); + if (key == "armsAppId") { + info.armsAppId = json["labels"][key].asString(); + } + } + } + + info.k8sNamespace = json["namespace"].asString(); + if (json.isMember("serviceName")) { + info.serviceName = json["serviceName"].asString(); + } + info.workloadKind = json["workloadKind"].asString(); + info.workloadName = json["workloadName"].asString(); + info.timestamp = std::time(0); + + } + + bool ContainerInfoIsExpired(std::shared_ptr info) { + if (info == nullptr) { + return false; + } + std::time_t now = std::time(0); + std::chrono::system_clock::time_point th1 = std::chrono::system_clock::from_time_t(info->timestamp); + std::chrono::system_clock::time_point th2 = std::chrono::system_clock::from_time_t(now); + std::chrono::duration diff = th2 - th1; + double minutes_diff = diff.count() / 60.0; + if (minutes_diff > 10) { + return true; + } + return false; + } + + void K8sMetadata::FromContainerJson(const Json::Value& json, std::shared_ptr data) { + for (const auto& key : json.getMemberNames()) { + k8sContainerInfo info; + FromInfoJson(json[key], info); + data->containers[key] = info; + } + } + + void K8sMetadata::GetK8sMetadataFromOperator(const std::string& urlHost, const std::string& output, containerInfoType infoType) { + bool isOk = false; + int retryCount = 0; + + // Maximum number of retries + const int maxRetries = 3; + while (retryCount < maxRetries && !isOk) { + isOk = SendRequestToOperator(oneOperatorContainerIdAddr, output, containerInfoType::ContainerIdInfo); + if (!isOk) { + LOG_DEBUG(sLogger, ("Request failed, retrying", retryCount)); + std::this_thread::sleep_for(std::chrono::seconds(3)); // Sleep for 1 second before retrying + retryCount++; + } + } + if (!isOk) { + LOG_DEBUG(sLogger, ("Failed to send request", retryCount)); + } + return; + } + + bool K8sMetadata::SendRequestToOperator(const std::string& urlHost, const std::string& output, containerInfoType infoType) { + CURL* curl; + CURLcode res; + std::string readBuffer; + + curl_global_init(CURL_GLOBAL_DEFAULT); + curl = curl_easy_init(); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, 5L);// 设置整体操作超时为5秒 + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, 5L); // 设置连接超时为5秒 + if(curl) { + curl_easy_setopt(curl, CURLOPT_URL, urlHost.c_str()); + if (output != "") { + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, output.c_str()); + } + + // 设置请求头 + struct curl_slist* headers = NULL; + headers = curl_slist_append(headers, "Content-Type: application/json"); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + + // 设置响应数据写入回调 + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, WriteCallback); + curl_easy_setopt(curl, CURLOPT_WRITEDATA, &readBuffer); + + // 执行请求 + res = curl_easy_perform(curl); + + if(res != CURLE_OK) { + LOG_DEBUG(sLogger, ("fetch k8s meta from one operator fail", curl_easy_strerror(res))); + return false; + } else { + Json::CharReaderBuilder readerBuilder; + Json::CharReader* reader = readerBuilder.newCharReader(); + Json::Value root; + std::string errors; + + if (reader->parse(readBuffer.c_str(), readBuffer.c_str() + readBuffer.size(), &root, &errors)) { + std::shared_ptr data = std::make_unique(); + if (data == nullptr) { + return true; + } + FromContainerJson(root, data); + for (const auto& pair : data->containers) { + if (infoType == containerInfoType::ContainerIdInfo) { + container_cache.insert(pair.first, std::make_shared(pair.second)); + } else { + ip_cache.insert(pair.first, std::make_shared(pair.second)); + } + } + } else { + return true; + } + + delete reader; + } + + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + } + + curl_global_cleanup(); + return true; + } + + void K8sMetadata::GetByContainerIds(std::vector containerIds) { + Json::Value jsonObj; + for (auto& str : containerIds) { + jsonObj["keys"].append(str); + } + Json::StreamWriterBuilder writer; + std::string output = Json::writeString(writer, jsonObj); + GetK8sMetadataFromOperator(oneOperatorContainerIdAddr, output, containerInfoType::ContainerIdInfo); + } + + void K8sMetadata::GetByLocalHost() { + std::string hostIp = GetHostIp(); + std::list strList{hostIp}; + Json::Value jsonObj; + for (const auto& str : strList) { + jsonObj["keys"].append(str); + } + Json::StreamWriterBuilder writer; + std::string output = Json::writeString(writer, jsonObj); + std::string urlHost = "http://" + oneOperatorAddr + "/metadata/host"; + GetK8sMetadataFromOperator(urlHost, output, containerInfoType::ContainerIdInfo); + } + + void K8sMetadata::SetContainerCache(const Json::Value& root) { + std::shared_ptr data = std::make_unique(); + if (data == nullptr) { + return; + } + FromContainerJson(root, data); + for (const auto& pair : data->containers) { + container_cache.insert(pair.first, std::make_shared(pair.second)); + } + } + + void K8sMetadata::SetIpCache(const Json::Value& root) { + std::shared_ptr data = std::make_unique(); + if (data == nullptr) { + return; + } + FromContainerJson(root, data); + for (const auto& pair : data->containers) { + ip_cache.insert(pair.first, std::make_shared(pair.second)); + } + } + + void K8sMetadata::GetByIps(std::vector ips) { + Json::Value jsonObj; + for (auto& str : ips) { + jsonObj["keys"].append(str); + } + Json::StreamWriterBuilder writer; + std::string output = Json::writeString(writer, jsonObj); + std::string urlHost = "http://" + oneOperatorAddr + "/metadata/ip"; + GetK8sMetadataFromOperator(urlHost, output, containerInfoType::IpInfo); + } + + std::shared_ptr K8sMetadata::GetInfoByContainerIdFromCache(const std::string& containerId) { + if (containerId.empty()) { + return nullptr; + } + return container_cache.get(containerId); + } + + std::shared_ptr K8sMetadata::GetInfoByIpFromCache(const std::string& ip) { + if (ip.empty()){ + return nullptr; + } + std::shared_ptr ip_info = ip_cache.get(ip); + if (ip_info == nullptr) { + return ip_info; + } + if (ContainerInfoIsExpired(ip_info)) { + return nullptr; + } + return ip_info; + } + + +} // namespace logtail \ No newline at end of file diff --git a/core/metadata/k8sMetadata.h b/core/metadata/k8sMetadata.h new file mode 100644 index 0000000000..beaa4c16ea --- /dev/null +++ b/core/metadata/k8sMetadata.h @@ -0,0 +1,77 @@ +// Copyright 2022 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific l +#pragma once +#include +#include +#include +#include "common/LRUCache.h" +#include + +namespace logtail { + struct k8sContainerInfo { + std::unordered_map images; + std::unordered_map labels; + std::string k8sNamespace; + std::string serviceName; + std::string workloadKind; + std::string workloadName; + std::time_t timestamp; + std::string armsAppId; + }; + + // 定义顶层的结构体 + struct ContainerData { + std::unordered_map containers; + }; + + + enum class containerInfoType { + ContainerIdInfo, + IpInfo + }; + + class K8sMetadata { + private: + lru11::Cache> container_cache; + lru11::Cache> ip_cache; + std::string oneOperatorAddr; + std::string oneOperatorContainerIdAddr; + K8sMetadata(size_t cacheSize, const std::string& operatorAddr) + : container_cache(cacheSize, 0), ip_cache(cacheSize, 0), oneOperatorAddr(operatorAddr) { + //GetByLocalHost(); + oneOperatorContainerIdAddr = "http://" + oneOperatorAddr + "/metadata/containerid"; + } + K8sMetadata(const K8sMetadata&) = delete; + K8sMetadata& operator=(const K8sMetadata&) = delete; + + public: + static K8sMetadata& GetInstance(size_t cacheSize = 500, const std::string& operatorAddr = "oneoperator:9000") { + static K8sMetadata instance(cacheSize, operatorAddr); + return instance; + } + + // 公共方法 + void GetByContainerIds(std::vector containerIds); + void GetByLocalHost(); + void GetByIps(std::vector ips); + std::shared_ptr GetInfoByContainerIdFromCache(const std::string& containerId); + std::shared_ptr GetInfoByIpFromCache(const std::string& ip); + bool SendRequestToOperator(const std::string& urlHost, const std::string& output, containerInfoType infoType); + void GetK8sMetadataFromOperator(const std::string& urlHost, const std::string& output, containerInfoType infoType); + void SetIpCache(const Json::Value& root); + void SetContainerCache(const Json::Value& root); + void FromInfoJson(const Json::Value& json, k8sContainerInfo& info); + void FromContainerJson(const Json::Value& json, std::shared_ptr data); + }; + +} // namespace logtail diff --git a/core/unittest/common/CMakeLists.txt b/core/unittest/common/CMakeLists.txt index c5f83bfcc2..bcfae2d60f 100644 --- a/core/unittest/common/CMakeLists.txt +++ b/core/unittest/common/CMakeLists.txt @@ -24,6 +24,9 @@ target_link_libraries(common_simple_utils_unittest ${UT_BASE_TARGET}) add_executable(common_logfileoperator_unittest LogFileOperatorUnittest.cpp) target_link_libraries(common_logfileoperator_unittest ${UT_BASE_TARGET}) +add_executable(common_k8slru_unittest LRUCacheUnittest.cpp) +target_link_libraries(common_k8slru_unittest ${UT_BASE_TARGET}) + add_executable(common_sliding_window_counter_unittest SlidingWindowCounterUnittest.cpp) target_link_libraries(common_sliding_window_counter_unittest ${UT_BASE_TARGET}) @@ -57,6 +60,7 @@ gtest_discover_tests(common_logfileoperator_unittest) gtest_discover_tests(common_sliding_window_counter_unittest) gtest_discover_tests(common_string_tools_unittest) gtest_discover_tests(common_machine_info_util_unittest) +gtest_discover_tests(common_k8slru_unittest) gtest_discover_tests(encoding_converter_unittest) gtest_discover_tests(yaml_util_unittest) gtest_discover_tests(safe_queue_unittest) diff --git a/core/unittest/common/LRUCacheUnittest.cpp b/core/unittest/common/LRUCacheUnittest.cpp new file mode 100644 index 0000000000..9ba6fb38ce --- /dev/null +++ b/core/unittest/common/LRUCacheUnittest.cpp @@ -0,0 +1,129 @@ +/* + * LRUCache11 - a templated C++11 based LRU cache class that allows specification of + * key, value and optionally the map container type (defaults to std::unordered_map) + * By using the std::map and a linked list of keys it allows O(1) insert, delete and + * refresh operations. + * + * This is a header-only library and all you need is the LRUCache11.hpp file + * + * Github: https://github.com/mohaps/lrucache11 + * + * This is a follow-up to the LRUCache project - https://github.com/mohaps/lrucache + * + * Copyright (c) 2012-22 SAURAV MOHAPATRA + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + */ + +// compile with g++ -std=c++11 -o sample_test LRUCache11Test.cpp +#include +#include "unittest/Unittest.h" +#include +#include +#include +#include + +#include "common/LRUCache.h" +#if defined(__linux__) +#include "unittest/UnittestHelper.h" +#endif + +using namespace logtail; +using namespace lru11; +typedef Cache KVCache; +typedef KVCache::node_type KVNode; +typedef KVCache::list_type KVList; +namespace logtail { + class LRUCacheUtilUnittest : public ::testing::Test { + public: + // test the vanilla version of the cache + void TestNoLock() { + + // with no lock + auto cachePrint = + [&] (const KVCache& c) { + std::cout << "Cache (size: "< " << n.value << std::endl; + if (n.key == "hello") { + APSARA_TEST_EQUAL(n.value, 1); + } + }; + c.cwalk(nodePrint); + }; + KVCache c(5, 2); + c.insert("hello", 1); + c.insert("world", 2); + c.insert("foo", 3); + c.insert("bar", 4); + c.insert("blanga", 5); + cachePrint(c); + + + } + + // Test a thread-safe version of the cache with a std::mutex + void TestWithLock() { + + using LCache = Cache; + auto cachePrint2 = + [&] (const LCache& c) { + std::cout << "Cache (size: "< " << n.value << std::endl; + }; + c.cwalk(nodePrint); + }; + // with a lock + LCache lc(25,2); + auto worker = [&] () { + std::ostringstream os; + os << std::this_thread::get_id(); + std::string id = os.str(); + + for (int i = 0; i < 10; i++) { + std::ostringstream os2; + os2 << "id:"<> workers; + workers.reserve(100); + for (int i = 0; i < 100; i++) { + workers.push_back(std::unique_ptr( + new std::thread(worker))); + } + + for (const auto& w : workers) { + w->join(); + } + std::cout << "... workers finished!" << std::endl; + cachePrint2(lc); + } + }; + + APSARA_UNIT_TEST_CASE(LRUCacheUtilUnittest, TestNoLock, 0); + APSARA_UNIT_TEST_CASE(LRUCacheUtilUnittest, TestWithLock, 1); + +} + + + +int main(int argc, char** argv) { + logtail::Logger::Instance().InitGlobalLoggers(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/core/unittest/metadata/CMakeLists.txt b/core/unittest/metadata/CMakeLists.txt new file mode 100644 index 0000000000..5e14034447 --- /dev/null +++ b/core/unittest/metadata/CMakeLists.txt @@ -0,0 +1,22 @@ +# Copyright 2022 iLogtail Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +cmake_minimum_required(VERSION 3.22) +project(metadata_unittest) + +add_executable(metadata_unittest K8sMetadataUnittest.cpp) +target_link_libraries(metadata_unittest ${UT_BASE_TARGET}) + +include(GoogleTest) +gtest_discover_tests(metadata_unittest) \ No newline at end of file diff --git a/core/unittest/metadata/K8sMetadataUnittest.cpp b/core/unittest/metadata/K8sMetadataUnittest.cpp new file mode 100644 index 0000000000..bedbd6892e --- /dev/null +++ b/core/unittest/metadata/K8sMetadataUnittest.cpp @@ -0,0 +1,194 @@ +// Copyright 2022 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "unittest/Unittest.h" +#include +#include +#include +#include "metadata/ProcessorK8sMetadata.h" +#include "metadata/k8sMetadata.h" + +using namespace std; + +namespace logtail { +class k8sMetadataUnittest : public ::testing::Test { +protected: + void SetUp() override { + // You can set up common objects needed for each test case here + } + + void TearDown() override { + // Clean up after each test case if needed + } + +public: + void TestGetByContainerIds() { + LOG_INFO(sLogger, ("TestGetByContainerIds() begin", time(NULL))); + const std::string jsonData = R"({"containerd://286effd2650c0689b779018e42e9ec7aa3d2cb843005e038204e85fc3d4f9144":{"namespace":"default","workloadName":"oneagent-demo-658648895b","workloadKind":"replicaset","serviceName":"","labels":{"app":"oneagent-demo","pod-template-hash":"658648895b"},"envs":{},"images":{"oneagent-demo":"sls-opensource-registry.cn-shanghai.cr.aliyuncs.com/ilogtail-community-edition/centos7-cve-fix:1.0.0"}}})"; + + Json::Value root; + Json::CharReaderBuilder readerBuilder; + std::istringstream jsonStream(jsonData); + std::string errors; + + // Parse JSON data + if (!Json::parseFromStream(readerBuilder, jsonStream, &root, &errors)) { + std::cerr << "Failed to parse JSON: " << errors << std::endl; + return; + } + + + + + auto& k8sMetadata = K8sMetadata::GetInstance(); + k8sMetadata.SetContainerCache(root); + k8sMetadata.GetByLocalHost(); + + // Assume GetInfoByContainerIdFromCache returns non-null shared_ptr for valid IDs, + // and check for some expectations. + APSARA_TEST_TRUE_FATAL(k8sMetadata.GetInfoByContainerIdFromCache("containerd://286effd2650c0689b779018e42e9ec7aa3d2cb843005e038204e85fc3d4f9144") != nullptr); + } + + void TestGetByLocalHost() { + LOG_INFO(sLogger, ("TestGetByLocalHost() begin", time(NULL))); + // Sample JSON data + const std::string jsonData = R"({ + "10.41.0.2": { + "namespace": "kube-system", + "workloadName": "coredns-7b669cbb96", + "workloadKind": "replicaset", + "serviceName": "", + "labels": { + "k8s-app": "kube-dns", + "pod-template-hash": "7b669cbb96" + }, + "envs": { + "COREDNS_NAMESPACE": "", + "COREDNS_POD_NAME": "" + }, + "images": { + "coredns": "registry-cn-chengdu-vpc.ack.aliyuncs.com/acs/coredns:v1.9.3.10-7dfca203-aliyun" + } + }, + "10.41.0.3": { + "namespace": "kube-system", + "workloadName": "csi-provisioner-8bd988c55", + "workloadKind": "replicaset", + "serviceName": "", + "labels": { + "app": "csi-provisioner", + "pod-template-hash": "8bd988c55" + }, + "envs": { + "CLUSTER_ID": "c33235919ddad4f279b3a67c2f0046704", + "ENABLE_NAS_SUBPATH_FINALIZER": "true", + "KUBE_NODE_NAME": "", + "SERVICE_TYPE": "provisioner" + }, + "images": { + "csi-provisioner": "registry-cn-chengdu-vpc.ack.aliyuncs.com/acs/csi-plugin:v1.30.3-921e63a-aliyun", + "external-csi-snapshotter": "registry-cn-chengdu-vpc.ack.aliyuncs.com/acs/csi-snapshotter:v4.0.0-a230d5b-aliyun", + "external-disk-attacher": "registry-cn-chengdu-vpc.ack.aliyuncs.com/acs/csi-attacher:v4.5.0-4a01fda6-aliyun", + "external-disk-provisioner": "registry-cn-chengdu-vpc.ack.aliyuncs.com/acs/csi-provisioner:v3.5.0-e7da67e52-aliyun", + "external-disk-resizer": "registry-cn-chengdu-vpc.ack.aliyuncs.com/acs/csi-resizer:v1.3-e48d981-aliyun", + "external-nas-provisioner": "registry-cn-chengdu-vpc.ack.aliyuncs.com/acs/csi-provisioner:v3.5.0-e7da67e52-aliyun", + "external-nas-resizer": "registry-cn-chengdu-vpc.ack.aliyuncs.com/acs/csi-resizer:v1.3-e48d981-aliyun", + "external-oss-provisioner": "registry-cn-chengdu-vpc.ack.aliyuncs.com/acs/csi-provisioner:v3.5.0-e7da67e52-aliyun", + "external-snapshot-controller": "registry-cn-chengdu-vpc.ack.aliyuncs.com/acs/snapshot-controller:v4.0.0-a230d5b-aliyun" + } + }, + "172.16.20.108": { + "namespace": "kube-system", + "workloadName": "kube-proxy-worker", + "workloadKind": "daemonset", + "serviceName": "", + "labels": { + "controller-revision-hash": "756748b889", + "k8s-app": "kube-proxy-worker", + "pod-template-generation": "1" + }, + "envs": { + "NODE_NAME": "" + }, + "images": { + "kube-proxy-worker": "registry-cn-chengdu-vpc.ack.aliyuncs.com/acs/kube-proxy:v1.30.1-aliyun.1" + } + } + })"; + + Json::Value root; + Json::CharReaderBuilder readerBuilder; + std::istringstream jsonStream(jsonData); + std::string errors; + + // Parse JSON data + if (!Json::parseFromStream(readerBuilder, jsonStream, &root, &errors)) { + std::cerr << "Failed to parse JSON: " << errors << std::endl; + return; + } + + + auto& k8sMetadata = K8sMetadata::GetInstance(); + k8sMetadata.SetIpCache(root); + k8sMetadata.GetByLocalHost(); + + auto sourceBuffer = std::make_shared(); + PipelineEventGroup eventGroup(sourceBuffer); + std::string eventStr = R"({ + "events" : + [ +{ + "name": "test", + "tags": { + "remote_ip": "172.16.20.108" + }, + "timestamp" : 12345678901, + "timestampNanosecond" : 0, + "type" : 2, + "value": { + "type": "untyped_single_value", + "detail": 10.0 + } + } + ], + "metadata" : + { + "log.file.path" : "/var/log/message" + }, + "tags" : + { + "app_name" : "xxx" + } + })"; + eventGroup.FromJsonString(eventStr); + eventGroup.AddMetricEvent(); + ProcessorK8sMetadata& processor = *(new ProcessorK8sMetadata); + processor.Process(eventGroup); + EventsContainer& eventsEnd = eventGroup.MutableEvents(); + auto& metricEvent = eventsEnd[0].Cast(); + APSARA_TEST_EQUAL("kube-proxy-worker", metricEvent.GetTag("peerWorkloadName").to_string()); + APSARA_TEST_TRUE_FATAL(k8sMetadata.GetInfoByIpFromCache("10.41.0.2") != nullptr); + } +}; + +APSARA_UNIT_TEST_CASE(k8sMetadataUnittest, TestGetByContainerIds, 0); +APSARA_UNIT_TEST_CASE(k8sMetadataUnittest, TestGetByLocalHost, 1); + +} // end of namespace logtail + +int main(int argc, char** argv) { + logtail::Logger::Instance().InitGlobalLoggers(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file