Skip to content

Commit

Permalink
Merge branch 'main' into optimization/relabel
Browse files Browse the repository at this point in the history
  • Loading branch information
catdogpandas committed Sep 19, 2024
2 parents af1355a + 96afc79 commit ddcab1d
Show file tree
Hide file tree
Showing 111 changed files with 2,068 additions and 432 deletions.
22 changes: 13 additions & 9 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ cmake_dependent_option(ENABLE_STATIC_LINK_CRT "Build Logtail by linking CRT stat
option(WITHOUTGDB "Build Logtail without gdb")
option(WITHSPL "Build Logtail and UT with SPL" ON)
option(BUILD_LOGTAIL_UT "Build unit test for Logtail")
set(PROVIDER_PATH "provider" CACHE PATH "Path to the provider module") # external provider path can be set with -DPROVIDER_PATH
set(UNITTEST_PATH "unittest" CACHE PATH "Path to the unittest module") # external unittest path can be set with -DUNITTEST_PATH

if (BUILD_LOGTAIL_SHARED_LIBRARY AND WITHSPL)
message(FATEL_ERROR, "Generating logtail shared library is not supported to be linked with SPL. WITHSPL should be set OFF.")
Expand Down Expand Up @@ -109,13 +111,13 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/processor/links.cmake)
include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/flusher.cmake)
include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake)

# Subdirectories (modules). except for common, input, processor, flusher, observer, helper and spl.
# Subdirectories (modules). except for common, input, processor, flusher, observer, helper, spl, and provider.
set(SUB_DIRECTORIES_LIST
application app_config checkpoint container_manager logger go_pipeline monitor profile_sender models
config config/feedbacker config/provider config/watcher
pipeline pipeline/batch pipeline/compression pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
config config/watcher
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
runner runner/sink/http
protobuf/config_server/v1 protobuf/config_server/v2 protobuf/sls
protobuf/sls
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
prometheus prometheus/labels prometheus/schedulers prometheus/async
ebpf ebpf/observer ebpf/security ebpf/handler
Expand Down Expand Up @@ -161,6 +163,10 @@ endif()
# remove several files in go_pipeline
list(REMOVE_ITEM FRAMEWORK_SOURCE_FILES ${CMAKE_CURRENT_SOURCE_DIR}/go_pipeline/LogtailPluginAdapter.cpp ${CMAKE_CURRENT_SOURCE_DIR}/go_pipeline/LogtailPluginAdapter.h)


# add provider
add_subdirectory("${PROVIDER_PATH}" "${CMAKE_BINARY_DIR}/provider")

if(MSVC)
# remove linux event listener
file(GLOB REMOVE_EVENT_LISTENER_SOURCES file_server/event_listener/*_Linux.cpp file_server/event_listener/*_Linux.h)
Expand Down Expand Up @@ -231,12 +237,12 @@ if(BUILD_LOGTAIL OR BUILD_LOGTAIL_SHARED_LIBRARY)
flusher_link(${LOGTAIL_TARGET})
all_link(${LOGTAIL_TARGET})
common_link(${LOGTAIL_TARGET})
target_link_libraries(${LOGTAIL_TARGET} provider)
endif()

# Logtail UT.
if (BUILD_LOGTAIL_UT)
message(STATUS "Build unittest.")
add_definitions(-DVIRTUAL=virtual)
message(STATUS "Build unittest.")
function(delete_gcda_files target_directory)
if(EXISTS "${target_directory}")
message(STATUS "Deleting .gcda files in ${target_directory}")
Expand All @@ -249,7 +255,5 @@ if (BUILD_LOGTAIL_UT)
delete_gcda_files(".")
include(CTest)
enable_testing()
add_subdirectory(unittest)
else()
add_definitions(-DVIRTUAL= )
add_subdirectory("${UNITTEST_PATH}" "${CMAKE_BINARY_DIR}/unittest")
endif ()
12 changes: 6 additions & 6 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,7 @@
#include "streamlog/StreamLogManager.h"
#endif
#else
#include "config/provider/CommonConfigProvider.h"
#include "config/provider/LegacyCommonConfigProvider.h"
#include "provider/Provider.h"
#endif

DEFINE_FLAG_BOOL(ilogtail_disable_core, "disable core in worker process", true);
Expand Down Expand Up @@ -242,8 +241,7 @@ void Application::Start() { // GCOVR_EXCL_START
EnterpriseConfigProvider::GetInstance()->Init("enterprise");
LegacyConfigProvider::GetInstance()->Init("legacy");
#else
CommonConfigProvider::GetInstance()->Init("common_v2");
LegacyCommonConfigProvider::GetInstance()->Init("common");
InitRemoteConfigProviders();
#endif

LogtailAlarm::GetInstance()->Init();
Expand Down Expand Up @@ -370,8 +368,10 @@ void Application::Exit() {
EnterpriseConfigProvider::GetInstance()->Stop();
LegacyConfigProvider::GetInstance()->Stop();
#else
CommonConfigProvider::GetInstance()->Stop();
LegacyCommonConfigProvider::GetInstance()->Stop();
auto remoteConfigProviders = GetRemoteConfigProviders();
for (auto& provider : remoteConfigProviders) {
provider->Stop();
}
#endif

LogtailMonitor::GetInstance()->Stop();
Expand Down
2 changes: 1 addition & 1 deletion core/common/common.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ list(APPEND THIS_SOURCE_FILES_LIST ${XX_HASH_SOURCE_FILES})
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/memory/SourceBuffer.h)
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/http/AsynCurlRunner.cpp ${CMAKE_SOURCE_DIR}/common/http/Curl.cpp ${CMAKE_SOURCE_DIR}/common/http/HttpResponse.cpp)
list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/timer/Timer.cpp ${CMAKE_SOURCE_DIR}/common/timer/HttpRequestTimerEvent.cpp)

list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/compression/Compressor.cpp ${CMAKE_SOURCE_DIR}/common/compression/CompressorFactory.cpp ${CMAKE_SOURCE_DIR}/common/compression/LZ4Compressor.cpp ${CMAKE_SOURCE_DIR}/common/compression/ZstdCompressor.cpp)
# remove several files in common
list(REMOVE_ITEM THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/BoostRegexValidator.cpp ${CMAKE_SOURCE_DIR}/common/GetUUID.cpp)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,14 @@

namespace logtail {

enum class CompressType { NONE, LZ4, ZSTD };
enum class CompressType {
NONE,
LZ4,
ZSTD
#ifdef APSARA_UNIT_TEST_MAIN
,
MOCK
#endif
};

} // namespace logtail
59 changes: 59 additions & 0 deletions core/common/compression/Compressor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "common/compression/Compressor.h"

#include <chrono>

#include "monitor/MetricConstants.h"

using namespace std;

namespace logtail {

void Compressor::SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef, std::move(labels), std::move(dynamicLabels));
mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT);
mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEM_SIZE_BYTES);
mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEMS_CNT);
mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEM_SIZE_BYTES);
mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter("discarded_items_cnt");
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter("discarded_item_size_bytes");
mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS);
}

bool Compressor::DoCompress(const string& input, string& output, string& errorMsg) {
if (mMetricsRecordRef != nullptr) {
mInItemsCnt->Add(1);
mInItemSizeBytes->Add(input.size());
}

auto before = chrono::system_clock::now();
auto res = Compress(input, output, errorMsg);

if (mMetricsRecordRef != nullptr) {
mTotalDelayMs->Add(chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - before).count());
if (res) {
mOutItemsCnt->Add(1);
mOutItemSizeBytes->Add(output.size());
} else {
mDiscardedItemsCnt->Add(1);
mDiscardedItemSizeBytes->Add(input.size());
}
}
return res;
}

} // namespace logtail
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@

#include <string>

#include "pipeline/compression/CompressType.h"
#include "monitor/LogtailMetric.h"
#include "common/compression/CompressType.h"

namespace logtail {

Expand All @@ -27,17 +28,35 @@ class Compressor {
Compressor(CompressType type) : mType(type) {}
virtual ~Compressor() = default;

virtual bool Compress(const std::string& input, std::string& output, std::string& errorMsg) = 0;
bool DoCompress(const std::string& input, std::string& output, std::string& errorMsg);

#ifdef APSARA_UNIT_TEST_MAIN
// buffer shoudl be reserved for output before calling this function
virtual bool UnCompress(const std::string& input, std::string& output, std::string& errorMsg) = 0;
#endif

CompressType GetCompressType() const { return mType; }
void SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {});

protected:
mutable MetricsRecordRef mMetricsRecordRef;
CounterPtr mInItemsCnt;
CounterPtr mInItemSizeBytes;
CounterPtr mOutItemsCnt;
CounterPtr mOutItemSizeBytes;
CounterPtr mDiscardedItemsCnt;
CounterPtr mDiscardedItemSizeBytes;
CounterPtr mTotalDelayMs;

private:
virtual bool Compress(const std::string& input, std::string& output, std::string& errorMsg) = 0;

CompressType mType = CompressType::NONE;

#ifdef APSARA_UNIT_TEST_MAIN
friend class CompressorUnittest;
friend class CompressorFactoryUnittest;
#endif
};

} // namespace logtail
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,24 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "pipeline/compression/CompressorFactory.h"
#include "common/compression/CompressorFactory.h"

#include "common/ParamExtractor.h"
#include "pipeline/compression/LZ4Compressor.h"
#include "pipeline/compression/ZstdCompressor.h"
#include "monitor/MetricConstants.h"
#include "common/compression/LZ4Compressor.h"
#include "common/compression/ZstdCompressor.h"

using namespace std;

namespace logtail {

unique_ptr<Compressor> CompressorFactory::Create(const Json::Value& config,
const PipelineContext& ctx,
const string& pluginType,
CompressType defaultType) {
const PipelineContext& ctx,
const string& pluginType,
const string& flusherId,
CompressType defaultType) {
string compressType, errorMsg;
unique_ptr<Compressor> compressor;
if (!GetOptionalStringParam(config, "CompressType", compressType, errorMsg)) {
PARAM_WARNING_DEFAULT(ctx.GetLogger(),
ctx.GetAlarm(),
Expand All @@ -37,11 +40,11 @@ unique_ptr<Compressor> CompressorFactory::Create(const Json::Value& config,
ctx.GetProjectName(),
ctx.GetLogstoreName(),
ctx.GetRegion());
return Create(defaultType);
compressor = Create(defaultType);
} else if (compressType == "lz4") {
return Create(CompressType::LZ4);
compressor = Create(CompressType::LZ4);
} else if (compressType == "zstd") {
return Create(CompressType::ZSTD);
compressor = Create(CompressType::ZSTD);
} else if (compressType == "none") {
return nullptr;
} else if (!compressType.empty()) {
Expand All @@ -54,10 +57,15 @@ unique_ptr<Compressor> CompressorFactory::Create(const Json::Value& config,
ctx.GetProjectName(),
ctx.GetLogstoreName(),
ctx.GetRegion());
return Create(defaultType);
compressor = Create(defaultType);
} else {
return Create(defaultType);
compressor = Create(defaultType);
}
compressor->SetMetricRecordRef({{METRIC_LABEL_PROJECT, ctx.GetProjectName()},
{METRIC_LABEL_CONFIG_NAME, ctx.GetConfigName()},
{METRIC_LABEL_KEY_COMPONENT_NAME, "compressor"},
{METRIC_LABEL_KEY_FLUSHER_NODE_ID, flusherId}});
return compressor;
}

unique_ptr<Compressor> CompressorFactory::Create(CompressType type) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
#include <memory>
#include <string>

#include "pipeline/compression/CompressType.h"
#include "pipeline/compression/Compressor.h"
#include "pipeline/PipelineContext.h"
#include "common/compression/CompressType.h"
#include "common/compression/Compressor.h"

namespace logtail {

Expand All @@ -42,13 +42,13 @@ class CompressorFactory {
std::unique_ptr<Compressor> Create(const Json::Value& config,
const PipelineContext& ctx,
const std::string& pluginType,
const std::string& flusherId,
CompressType defaultType);
std::unique_ptr<Compressor> Create(CompressType type);

private:
CompressorFactory() = default;
~CompressorFactory() = default;

std::unique_ptr<Compressor> Create(CompressType defaultType);
};

} // namespace logtail
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "pipeline/compression/LZ4Compressor.h"
#include "common/compression/LZ4Compressor.h"

#include <lz4/lz4.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,20 @@

#pragma once

#include "pipeline/compression/Compressor.h"
#include "common/compression/Compressor.h"

namespace logtail {

class LZ4Compressor : public Compressor {
public:
LZ4Compressor(CompressType type) : Compressor(type){};
LZ4Compressor(CompressType type) : Compressor(type) {};

bool Compress(const std::string& input, std::string& output, std::string& errorMsg) override;

#ifdef APSARA_UNIT_TEST_MAIN
bool UnCompress(const std::string& input, std::string& output, std::string& errorMsg) override;
#endif

private:
bool Compress(const std::string& input, std::string& output, std::string& errorMsg) override;
};

} // namespace logtail
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "pipeline/compression/ZstdCompressor.h"
#include "common/compression/ZstdCompressor.h"

#include <zstd/zstd.h>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,21 @@

#pragma once

#include "pipeline/compression/Compressor.h"
#include "common/compression/Compressor.h"

namespace logtail {

class ZstdCompressor : public Compressor {
public:
ZstdCompressor(CompressType type, int32_t level = 1) : Compressor(type), mCompressionLevel(level){};

bool Compress(const std::string& input, std::string& output, std::string& errorMsg) override;
ZstdCompressor(CompressType type, int32_t level = 1) : Compressor(type), mCompressionLevel(level) {};

#ifdef APSARA_UNIT_TEST_MAIN
bool UnCompress(const std::string& input, std::string& output, std::string& errorMsg) override;
#endif

private:
bool Compress(const std::string& input, std::string& output, std::string& errorMsg) override;

int32_t mCompressionLevel = 1;
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "config/provider/CommonConfigProvider.h"
#include "CommonConfigProvider.h"

#include <json/json.h>

Expand Down
Loading

0 comments on commit ddcab1d

Please sign in to comment.