Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Sep 24, 2024
1 parent 5d46f61 commit 7714655
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 40 deletions.
43 changes: 9 additions & 34 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,37 +39,24 @@ namespace logtail {
void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
#ifndef APSARA_UNIT_TEST_MAIN
// 过渡使用
static bool isFileServerStarted = false, isInputObserverStarted = false;
bool isInputObserverChanged = false, isInputFileChanged = false, isInputStreamChanged = false,
isInputContainerStdioChanged = false;
static bool isFileServerStarted = false;
bool isInputFileChanged = false;
for (const auto& name : diff.mRemoved) {
CheckIfInputUpdated(mPipelineNameEntityMap[name]->GetConfig()["inputs"][0],
isInputObserverChanged,
isInputFileChanged,
isInputStreamChanged,
isInputContainerStdioChanged);
isInputFileChanged = CheckIfFileServerUpdated(mPipelineNameEntityMap[name]->GetConfig()["inputs"][0]);
}
for (const auto& config : diff.mModified) {
CheckIfInputUpdated(*config.mInputs[0],
isInputObserverChanged,
isInputFileChanged,
isInputStreamChanged,
isInputContainerStdioChanged);
isInputFileChanged = CheckIfFileServerUpdated(*config.mInputs[0]);
}
for (const auto& config : diff.mAdded) {
CheckIfInputUpdated(*config.mInputs[0],
isInputObserverChanged,
isInputFileChanged,
isInputStreamChanged,
isInputContainerStdioChanged);
isInputFileChanged = CheckIfFileServerUpdated(*config.mInputs[0]);
}

#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
if (AppConfig::GetInstance()->ShennongSocketEnabled()) {
ShennongManager::GetInstance()->Pause();
}
#endif
if (isFileServerStarted && (isInputFileChanged || isInputContainerStdioChanged)) {
if (isFileServerStarted && isInputFileChanged) {
FileServer::GetInstance()->Pause();
}
#endif
Expand Down Expand Up @@ -137,7 +124,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
// 在Flusher改造完成前,先不执行如下步骤,不会造成太大影响
// Sender::CleanUnusedAk();

if (isInputFileChanged || isInputContainerStdioChanged) {
if (isInputFileChanged) {
if (isFileServerStarted) {
FileServer::GetInstance()->Resume();
} else {
Expand Down Expand Up @@ -250,21 +237,9 @@ void PipelineManager::DecreasePluginUsageCnt(const unordered_map<string, unorder
}
}

void PipelineManager::CheckIfInputUpdated(const Json::Value& config,
bool& isInputObserverChanged,
bool& isInputFileChanged,
bool& isInputStreamChanged,
bool& isInputContainerStdioChanged) {
bool PipelineManager::CheckIfFileServerUpdated(const Json::Value& config) {
string inputType = config["Type"].asString();
if (inputType == "input_observer_network") {
isInputObserverChanged = true;
} else if (inputType == "input_file") {
isInputFileChanged = true;
} else if (inputType == "input_stream") {
isInputStreamChanged = true;
} else if (inputType == "input_container_stdio") {
isInputContainerStdioChanged = true;
}
return inputType == "input_file" || inputType == "input_container_stdio";
}

} // namespace logtail
7 changes: 1 addition & 6 deletions core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,7 @@ class PipelineManager {
void DecreasePluginUsageCnt(
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& statistics);
void FlushAllBatch();
// 过渡使用
void CheckIfInputUpdated(const Json::Value& config,
bool& isInputObserverChanged,
bool& isInputFileChanged,
bool& isInputStreamChanged,
bool& isInputContainerStdioChanged);
bool CheckIfFileServerUpdated(const Json::Value& config);

std::unordered_map<std::string, std::shared_ptr<Pipeline>> mPipelineNameEntityMap;
mutable SpinLock mPluginCntMapLock;
Expand Down

0 comments on commit 7714655

Please sign in to comment.