Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Sep 27, 2024
1 parent 7634196 commit 25b10d2
Show file tree
Hide file tree
Showing 10 changed files with 49 additions and 45 deletions.
12 changes: 6 additions & 6 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ LogtailPlugin::LogtailPlugin() {
mPluginBasePtr = NULL;
mLoadPipelineFun = NULL;
mUnloadPipelineFun = NULL;
mStopAllFun = NULL;
mStopAllPipelinesFun = NULL;
mStopFun = NULL;
mStartFun = NULL;
mLoadGlobalConfigFun = NULL;
Expand Down Expand Up @@ -124,11 +124,11 @@ bool LogtailPlugin::UnloadPipeline(const std::string& pipelineName) {
return false;
}

void LogtailPlugin::StopAll(bool withInputFlag) {
if (mPluginValid && mStopAllFun != NULL) {
void LogtailPlugin::StopAllPipelines(bool withInputFlag) {
if (mPluginValid && mStopAllPipelinesFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop all", "starts"));
auto stopAllStart = GetCurrentTimeInMilliSeconds();
mStopAllFun(withInputFlag ? 1 : 0);
mStopAllPipelinesFun(withInputFlag ? 1 : 0);
auto stopAllCost = GetCurrentTimeInMilliSeconds() - stopAllStart;
LOG_INFO(sLogger, ("Go pipelines stop all", "succeeded")("cost", ToString(stopAllCost) + "ms"));
if (stopAllCost >= 10 * 1000) {
Expand Down Expand Up @@ -386,9 +386,9 @@ bool LogtailPlugin::LoadPluginBase() {
return mPluginValid;
}
// 停止所有插件
mStopAllFun = (StopAllFun)loader.LoadMethod("StopAll", error);
mStopAllPipelinesFun = (StopAllPipelinesFun)loader.LoadMethod("StopAllPipelines", error);
if (!error.empty()) {
LOG_ERROR(sLogger, ("load StopAll error, Message", error));
LOG_ERROR(sLogger, ("load StopAllPipelines error, Message", error));
return mPluginValid;
}
// 停止单个插件
Expand Down
6 changes: 3 additions & 3 deletions core/go_pipeline/LogtailPlugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ struct K8sContainerMeta {
typedef GoInt (*LoadGlobalConfigFun)(GoString);
typedef GoInt (*LoadPipelineFun)(GoString p, GoString l, GoString c, GoInt64 k, GoString p2);
typedef GoInt (*UnloadPipelineFun)(GoString c);
typedef void (*StopAllFun)(GoInt);
typedef void (*StopAllPipelinesFun)(GoInt);
typedef void (*StopFun)(GoString, GoInt);
typedef void (*StopBuiltInModulesFun)();
typedef void (*StartFun)(GoString);
Expand Down Expand Up @@ -214,7 +214,7 @@ class LogtailPlugin {
const std::string& region = "",
logtail::QueueKey logstoreKey = 0);
bool UnloadPipeline(const std::string& pipelineName);
void StopAll(bool withInputFlag);
void StopAllPipelines(bool withInputFlag);
void Stop(const std::string& configName, bool removingFlag);
void StopBuiltInModules();
void Start(const std::string& configName);
Expand Down Expand Up @@ -264,7 +264,7 @@ class LogtailPlugin {
LoadGlobalConfigFun mLoadGlobalConfigFun;
LoadPipelineFun mLoadPipelineFun;
UnloadPipelineFun mUnloadPipelineFun;
StopAllFun mStopAllFun;
StopAllPipelinesFun mStopAllPipelinesFun;
StopFun mStopFun;
StopBuiltInModulesFun mStopBuiltInModulesFun;
StartFun mStartFun;
Expand Down
14 changes: 9 additions & 5 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
}
#endif
#endif

for (auto& item : mInputRunners) {
if (!item->HasRegisteredPlugins()) {
item->Stop();
}
}
}

shared_ptr<Pipeline> PipelineManager::FindConfigByName(const string& configName) const {
Expand Down Expand Up @@ -180,19 +186,17 @@ string PipelineManager::GetPluginStatistics() const {
void PipelineManager::StopAllPipelines() {
LOG_INFO(sLogger, ("stop all pipelines", "starts"));
for (auto& item : mInputRunners) {
if (item->HasRegisteredPlugins()) {
item->Stop();
}
item->Stop();
}
FileServer::GetInstance()->Stop();

LogtailPlugin::GetInstance()->StopAll(true);
LogtailPlugin::GetInstance()->StopAllPipelines(true);

ProcessorRunner::GetInstance()->Stop();

FlushAllBatch();

LogtailPlugin::GetInstance()->StopAll(false);
LogtailPlugin::GetInstance()->StopAllPipelines(false);

// TODO: make it common
FlusherSLS::RecycleResourceIfNotUsed();
Expand Down
18 changes: 11 additions & 7 deletions plugin_main/plugin_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,16 +186,20 @@ func ProcessLogGroup(configName string, logBytes []byte, packID string) int {
return config.ProcessLogGroup(logBytes, util.StringDeepCopy(packID))
}

//export StopAll
func StopAll(withInputFlag int) {
logger.Info(context.Background(), "Stop all", "start")
err := pluginmanager.StopAll(withInputFlag != 0)
//export StopAllPipelines
func StopAllPipelines(withInputFlag int) {
logger.Info(context.Background(), "Stop all", "start", "with input", withInputFlag)
err := pluginmanager.StopAllPipelines(withInputFlag != 0)
if err != nil {
logger.Error(context.Background(), "PLUGIN_ALARM", "stop all error", err)
}
logger.Info(context.Background(), "Stop all", "success")
logger.Info(context.Background(), "logger", "close and recover")
logger.Close()
logger.Info(context.Background(), "Stop all", "success", "with input", withInputFlag)
// Stop with input first, without input last.
if withInputFlag == 0 {
logger.Info(context.Background(), "logger", "close and recover")
logger.Flush()
logger.Close()
}
}

//export Stop
Expand Down
6 changes: 2 additions & 4 deletions plugin_main/plugin_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,8 @@ func HandleLoadConfig(w http.ResponseWriter, r *http.Request) {
func HandleHoldOn(w http.ResponseWriter, r *http.Request) {
controlLock.Lock()
defer controlLock.Unlock()
StopAll(1)
StopAll(0)
// flush async logs when hold on with exit flag.
logger.Flush()
StopAllPipelines(1)
StopAllPipelines(0)
w.WriteHeader(http.StatusOK)
}

Expand Down
6 changes: 2 additions & 4 deletions plugin_main/plugin_main.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,10 +176,8 @@ func main() {
if !*flags.FileIOFlag {
<-signals.SetupSignalHandler()
}
logger.Info(context.Background(), "########################## exit process begin ##########################")
StopAll(1)
StopAll(0)
logger.Info(context.Background(), "########################## exit process done ##########################")
StopAllPipelines(1)
StopAllPipelines(0)
}

func generatePluginDoc() {
Expand Down
8 changes: 4 additions & 4 deletions plugin_main/plugin_main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ func TestHangConfigWhenStop(t *testing.T) {

// Quit.
time.Sleep(time.Second)
StopAll(1)
StopAll(0)
StopAllPipelines(1)
StopAllPipelines(0)
pluginmanager.LogtailConfigLock.RLock()
require.Empty(t, pluginmanager.LogtailConfig)
pluginmanager.LogtailConfigLock.RUnlock()
Expand Down Expand Up @@ -225,8 +225,8 @@ func TestSlowConfigWhenStop(t *testing.T) {

// Quit.
time.Sleep(time.Second)
StopAll(1)
StopAll(0)
StopAllPipelines(1)
StopAllPipelines(0)
time.Sleep(time.Second * 6)
require.Empty(t, pluginmanager.DisabledLogtailConfig)
}
16 changes: 8 additions & 8 deletions pluginmanager/config_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func (s *configUpdateTestSuite) BeforeTest(suiteName, testName string) {

func (s *configUpdateTestSuite) AfterTest(suiteName, testName string) {
logger.Infof(context.Background(), "========== %s %s test end ========================", suiteName, testName)
s.NoError(StopAll(false))
s.NoError(StopAll(true))
s.NoError(StopAllPipelines(false))
s.NoError(StopAllPipelines(true))
LogtailConfigLock.Lock()
LogtailConfig = make(map[string]*LogstoreConfig)
LogtailConfigLock.Unlock()
Expand Down Expand Up @@ -108,8 +108,8 @@ func (s *configUpdateTestSuite) TestConfigUpdateMany() {

// load normal config
for i := 0; i < 3; i++ {
s.NoError(StopAll(true))
s.NoError(StopAll(false))
s.NoError(StopAllPipelines(true))
s.NoError(StopAllPipelines(false))
s.NoError(LoadAndStartMockConfig(noblockUpdateConfigName, noblockUpdateConfigName, noblockUpdateConfigName, GetTestConfig(noblockUpdateConfigName)))
LogtailConfigLock.RLock()
s.NotNil(LogtailConfig[noblockUpdateConfigName])
Expand Down Expand Up @@ -163,8 +163,8 @@ func (s *configUpdateTestSuite) TestStopAllExit() {
s.True(ok)
checkFlusher.Block = false
time.Sleep(time.Second * time.Duration(5))
s.NoError(StopAll(true))
s.NoError(StopAll(false))
s.NoError(StopAllPipelines(true))
s.NoError(StopAllPipelines(false))
s.Equal(20000, checkFlusher.GetLogCount())
}

Expand All @@ -177,8 +177,8 @@ func (s *configUpdateTestSuite) TestStopAllExitTimeout() {
checkFlusher, ok := GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker)
s.True(ok)
s.Equal(0, checkFlusher.GetLogCount())
s.NoError(StopAll(true))
s.NoError(StopAll(false))
s.NoError(StopAllPipelines(true))
s.NoError(StopAllPipelines(false))
time.Sleep(time.Second)
s.Equal(0, checkFlusher.GetLogCount())
checkFlusher.Block = false
Expand Down
4 changes: 2 additions & 2 deletions pluginmanager/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,10 +169,10 @@ func timeoutStop(config *LogstoreConfig, removedFlag bool) bool {
}
}

// StopAll stops all config instance so that it is ready
// StopAllPipelines stops all pipelines so that it is ready
// to quit.
// For user-defined config, timeoutStop is used to avoid hanging.
func StopAll(withInput bool) error {
func StopAllPipelines(withInput bool) error {
defer panicRecover("Run plugin")

LogtailConfigLock.Lock()
Expand Down
4 changes: 2 additions & 2 deletions plugins/test/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ func PluginStart() error {
}

func PluginStop() error {
if err := pluginmanager.StopAll(true); err != nil {
if err := pluginmanager.StopAllPipelines(true); err != nil {
return err
}
if err := pluginmanager.StopAll(false); err != nil {
if err := pluginmanager.StopAllPipelines(false); err != nil {
return err
}
return nil
Expand Down

0 comments on commit 25b10d2

Please sign in to comment.