Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/metadata #1768

Merged
merged 28 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ set(SUB_DIRECTORIES_LIST
application app_config checkpoint container_manager logger go_pipeline monitor monitor/metric_constants profile_sender models
config config/watcher
instance_config
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
metadata pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
runner runner/sink/http
protobuf/sls
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
Expand Down
1 change: 1 addition & 0 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ DEFINE_FLAG_STRING(metrics_report_method,

DEFINE_FLAG_STRING(loong_collector_operator_service, "loong collector operator service", "");
DEFINE_FLAG_INT32(loong_collector_operator_service_port, "loong collector operator service port", 8888);
DEFINE_FLAG_INT32(loong_collector_k8s_meta_service_port, "loong collector operator service port", 9000);
DEFINE_FLAG_STRING(_pod_name_, "agent pod name", "");

DEFINE_FLAG_STRING(app_info_file, "", "app_info.json");
Expand Down
279 changes: 279 additions & 0 deletions core/common/LRUCache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
/*
* LRUCache11 - a templated C++11 based LRU cache class that allows
NameHaibinZhang marked this conversation as resolved.
Show resolved Hide resolved
* specification of
* key, value and optionally the map container type (defaults to
* std::unordered_map)
* By using the std::unordered_map and a linked list of keys it allows O(1) insert, delete
* and
* refresh operations.
*
* This is a header-only library and all you need is the LRUCache11.hpp file
*
* Github: https://github.com/mohaps/lrucache11
*
* This is a follow-up to the LRUCache project -
* https://github.com/mohaps/lrucache
*
* Copyright (c) 2012-22 SAURAV MOHAPATRA <[email protected]>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/

#pragma once
#include <algorithm>
#include <cstdint>
#include <list>
#include <mutex>
#include <stdexcept>
#include <thread>
#include <unordered_map>
#include <chrono>
#include <atomic>

namespace logtail {
namespace lru11 {
/*
* a noop lockable concept that can be used in place of std::mutex
*/
class NullLock {
public:
void lock() {}
void unlock() {}
bool try_lock() { return true; }
};

/**
* error raised when a key not in cache is passed to get()
*/
class KeyNotFound : public std::invalid_argument {
public:
KeyNotFound() : std::invalid_argument("key_not_found") {}
};

template <typename K, typename V>
struct KeyValuePair {
public:
K key;
V value;
std::chrono::steady_clock::time_point timestamp;

KeyValuePair(K k, V v) : key(std::move(k)), value(std::move(v)), timestamp(std::chrono::steady_clock::now()) {}
};

/**
* The LRU Cache class templated by
* Key - key type
* Value - value type
* MapType - an associative container like std::unordered_map
* LockType - a lock type derived from the Lock class (default:
*NullLock = no synchronization)
*
* The default NullLock based template is not thread-safe, however passing
*Lock=std::mutex will make it
* thread-safe
*/
template <class Key, class Value, class Lock = NullLock,
class Map = std::unordered_map<
Key, typename std::list<KeyValuePair<Key, Value>>::iterator>>
class Cache {
public:
typedef KeyValuePair<Key, Value> node_type;
typedef std::list<KeyValuePair<Key, Value>> list_type;
typedef Map map_type;
typedef Lock lock_type;
using Guard = std::lock_guard<lock_type>;
/**
* the maxSize is the soft limit of keys and (maxSize + elasticity) is the
* hard limit
* the cache is allowed to grow till (maxSize + elasticity) and is pruned back
* to maxSize keys
* set maxSize = 0 for an unbounded cache (but in that case, you're better off
* using a std::unordered_map
* directly anyway! :)
*/
explicit Cache(size_t maxSize = 64, size_t elasticity = 10)
: maxSize_(maxSize), elasticity_(elasticity), prune_thread(&Cache::pruneThreadFunc, this) {}

virtual ~Cache() {
stop_pruning = true;
if (prune_thread.joinable()) {
prune_thread.join();
}
}

size_t size() const {
Guard g(lock_);
return cache_.size();
}
bool empty() const {
Guard g(lock_);
return cache_.empty();
}
void clear() {
Guard g(lock_);
cache_.clear();
keys_.clear();
}
void insert(const Key& k, Value v) {
Guard g(lock_);
const auto iter = cache_.find(k);
if (iter != cache_.end()) {
iter->second->value = v;
iter->second->timestamp = std::chrono::steady_clock::now();
keys_.splice(keys_.begin(), keys_, iter->second);
return;
}

keys_.emplace_front(k, std::move(v));
cache_[k] = keys_.begin();
prune();
}
/**
for backward compatibity. redirects to tryGetCopy()
*/
bool tryGet(const Key& kIn, Value& vOut) {
return tryGetCopy(kIn, vOut);
}

bool tryGetCopy(const Key& kIn, Value& vOut) {
Guard g(lock_);
Value tmp;
if (!tryGetRef_nolock(kIn, tmp)) { return false; }
vOut = tmp;
return true;
}

bool tryGetRef(const Key& kIn, Value& vOut) {
Guard g(lock_);
return tryGetRef_nolock(kIn, vOut);
}
/**
* The const reference returned here is only
* guaranteed to be valid till the next insert/delete
* in multi-threaded apps use getCopy() to be threadsafe
*/
const Value& getRef(const Key& k) {
Guard g(lock_);
return get_nolock(k);
}

/**
added for backward compatibility
*/
Value get(const Key& k) { return getCopy(k); }
/**
* returns a copy of the stored object (if found)
* safe to use/recommended in multi-threaded apps
*/
Value getCopy(const Key& k) {
Guard g(lock_);
return get_nolock(k);
}

bool remove(const Key& k) {
Guard g(lock_);
auto iter = cache_.find(k);
if (iter == cache_.end()) {
return false;
}
keys_.erase(iter->second);
cache_.erase(iter);
return true;
}
bool contains(const Key& k) const {
Guard g(lock_);
return cache_.find(k) != cache_.end();
}

size_t getMaxSize() const { return maxSize_; }
size_t getElasticity() const { return elasticity_; }
size_t getMaxAllowedSize() const { return maxSize_ + elasticity_; }
template <typename F>
void cwalk(F& f) const {
Guard g(lock_);
std::for_each(keys_.begin(), keys_.end(), f);
}

protected:
Value get_nolock(const Key& k) {
const auto iter = cache_.find(k);
if (iter == cache_.end()) {
return nullptr;
}
keys_.splice(keys_.begin(), keys_, iter->second);
return iter->second->value;
}
bool tryGetRef_nolock(const Key& kIn, Value& vOut) {
const auto iter = cache_.find(kIn);
if (iter == cache_.end()) {
return false;
}
keys_.splice(keys_.begin(), keys_, iter->second);
vOut = iter->second->value;
return true;
}
size_t prune() {
size_t maxAllowed = maxSize_ + elasticity_;
if (maxSize_ == 0 || cache_.size() < maxAllowed) {
return 0;
}
size_t count = 0;
while (cache_.size() > maxSize_) {
cache_.erase(keys_.back().key);
keys_.pop_back();
++count;
}
return count;
}

size_t pruneExpired() {
Guard g(lock_);
size_t count = 0;
auto now = std::chrono::steady_clock::now();
auto cutoff = now - std::chrono::minutes(5);

while (!keys_.empty() && keys_.back().timestamp < cutoff) {
cache_.erase(keys_.back().key);
keys_.pop_back();
++count;
}
return count;
}

void pruneThreadFunc() {
while (!stop_pruning) {
pruneExpired();
std::this_thread::sleep_for(std::chrono::seconds(60)); // 每60秒检查一次
}
}

private:
// Disallow copying.
Cache(const Cache&) = delete;
Cache& operator=(const Cache&) = delete;

mutable Lock lock_;
Map cache_;
list_type keys_;
size_t maxSize_;
size_t elasticity_;
std::atomic<bool> stop_pruning{false};
std::thread prune_thread;

#ifdef APSARA_UNIT_TEST_MAIN
friend class LRUCacheUnittest;
#endif
};

} // namespace LRUCache11
} //namespace logtail
Loading
Loading