diff --git a/pluginmanager/checkpoint_manager.go b/pluginmanager/checkpoint_manager.go index f8e7e318c8..2ba02fb1e8 100644 --- a/pluginmanager/checkpoint_manager.go +++ b/pluginmanager/checkpoint_manager.go @@ -34,12 +34,16 @@ var CheckPointFile = flag.String("CheckPointFile", "checkpoint", "checkpoint fil var CheckPointCleanInterval = flag.Int("CheckPointCleanInterval", 600, "checkpoint clean interval, second") var MaxCleanItemPerInterval = flag.Int("MaxCleanItemPerInterval", 1000, "max clean items per interval") +const DefaultCleanThreshold = 6 // one hour + type checkPointManager struct { - db *leveldb.DB - shutdown chan struct{} - waitgroup sync.WaitGroup - initFlag bool - configCounter map[string]int + db *leveldb.DB + shutdown chan struct{} + waitgroup sync.WaitGroup + initFlag bool + runningFlag bool + configCounter map[string]int + cleanThreshold int } var CheckPointManager checkPointManager @@ -81,6 +85,7 @@ func (p *checkPointManager) Init() error { } p.shutdown = make(chan struct{}, 1) p.configCounter = make(map[string]int) + p.cleanThreshold = DefaultCleanThreshold logtailConfigDir := config.LogtailGlobalConfig.LogtailSysConfDir pathExist, err := util.PathExists(logtailConfigDir) var dbPath string @@ -106,7 +111,11 @@ func (p *checkPointManager) Init() error { } func (p *checkPointManager) HoldOn() { + if !p.runningFlag { + return + } logger.Info(context.Background(), "checkpoint", "HoldOn") + p.runningFlag = false if p.db == nil { return } @@ -115,7 +124,11 @@ func (p *checkPointManager) HoldOn() { } func (p *checkPointManager) Resume() { + if p.runningFlag { + return + } logger.Info(context.Background(), "checkpoint", "Resume") + p.runningFlag = true if p.db == nil { return } @@ -174,7 +187,7 @@ func (p *checkPointManager) check() { } for _, key := range cleanItems { p.configCounter[key]++ - if p.configCounter[key] > 6 { // one hour + if p.configCounter[key] > p.cleanThreshold { _ = p.db.Delete([]byte(key), nil) logger.Info(context.Background(), "no config, delete checkpoint", key) delete(p.configCounter, key) diff --git a/pluginmanager/checkpoint_manager_test.go b/pluginmanager/checkpoint_manager_test.go index 196e34c18d..a9801a2da5 100644 --- a/pluginmanager/checkpoint_manager_test.go +++ b/pluginmanager/checkpoint_manager_test.go @@ -66,6 +66,7 @@ func Test_checkPointManager_run(t *testing.T) { CheckPointManager.SaveCheckpoint("1", "xx", []byte("xxxxx")) CheckPointManager.SaveCheckpoint("2", "yy", []byte("yyyyyy")) *CheckPointCleanInterval = 1 + CheckPointManager.cleanThreshold = 3 if data, err := CheckPointManager.GetCheckpoint("1", "xx"); err != nil || string(data) != "xxxxx" { t.Errorf("checkPointManager.GetCheckpoint() error, %v %v", err, string(data)) } @@ -74,6 +75,14 @@ func Test_checkPointManager_run(t *testing.T) { t.Errorf("checkPointManager.GetCheckpoint() error, %v %v", err, string(data)) } CheckPointManager.Resume() + time.Sleep(time.Second * time.Duration(1)) + if data, err := CheckPointManager.GetCheckpoint("1", "xx"); err != nil || string(data) != "xxxxx" { + t.Errorf("checkPointManager.GetCheckpoint() error, %v %v", err, string(data)) + } + + if data, err := CheckPointManager.GetCheckpoint("2", "yy"); err != nil || string(data) != "yyyyyy" { + t.Errorf("checkPointManager.GetCheckpoint() error, %v %v", err, string(data)) + } time.Sleep(time.Second * time.Duration(5)) if data, err := CheckPointManager.GetCheckpoint("1", "xx"); err == nil { t.Errorf("checkPointManager.GetCheckpoint() error, %v %v", err, string(data)) diff --git a/pluginmanager/config_update_test.go b/pluginmanager/config_update_test.go index 11f8766be8..9c0ec27666 100644 --- a/pluginmanager/config_update_test.go +++ b/pluginmanager/config_update_test.go @@ -19,12 +19,12 @@ package pluginmanager import ( "context" - "strings" "sync" "testing" "time" "github.com/alibaba/ilogtail/pkg/logger" + _ "github.com/alibaba/ilogtail/plugins/aggregator/baseagg" "github.com/alibaba/ilogtail/plugins/flusher/checker" "github.com/stretchr/testify/suite" @@ -51,8 +51,8 @@ func (s *configUpdateTestSuite) BeforeTest(suiteName, testName string) { func (s *configUpdateTestSuite) AfterTest(suiteName, testName string) { logger.Infof(context.Background(), "========== %s %s test end ========================", suiteName, testName) - s.NoError(StopAll(false, false)) - s.NoError(StopAll(false, true)) + s.NoError(StopAll(true, false)) + s.NoError(StopAll(true, true)) LogtailConfig = sync.Map{} } @@ -68,12 +68,13 @@ func (s *configUpdateTestSuite) TestConfigUpdate() { // update same hang config s.NoError(Stop(updateConfigName, false)) s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs") - err := LoadMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName)) - s.True(strings.Contains(err.Error(), "failed to create config because timeout stop has happened on it")) + _ = LoadMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName)) + // Since independently load config, reload block config will be allowed s.NoError(LoadMockConfig(noblockUpdateConfigName, noblockUpdateConfigName, noblockUpdateConfigName, GetTestConfig(noblockUpdateConfigName))) s.NoError(Start(updateConfigName)) + s.NoError(Start(noblockUpdateConfigName)) _, exist := GetLogtailConfig(updateConfigName) - s.False(exist) + s.True(exist) _, exist = GetLogtailConfig(noblockUpdateConfigName) s.True(exist) @@ -88,23 +89,23 @@ func (s *configUpdateTestSuite) TestConfigUpdate() { } func (s *configUpdateTestSuite) TestConfigUpdateMany() { - config, ok := GetLogtailConfig(updateConfigName) - s.True(ok) - s.NotNil(config, "%s logstrore config should exist", updateConfigName) - checkFlusher, ok := GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker) - s.True(ok) - - s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs") // load block config + var checkFlusher *checker.FlusherChecker for i := 0; i < 5; i++ { + config, ok := GetLogtailConfig(updateConfigName) + s.True(ok) + s.NotNil(config, "%s logstrore config should exist", updateConfigName) + checkFlusher, ok = GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker) + s.True(ok) + s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs") + s.NoError(Stop(updateConfigName, false)) - err := LoadMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName)) - s.True(strings.Contains(err.Error(), "failed to create config because timeout stop has happened on it")) + checkFlusher.Block = false + _ = LoadMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName)) + // Since independently load config, reload block config will be allowed s.NoError(Start(updateConfigName)) - _, exist := GetLogtailConfig(updateConfigName) - s.False(exist) + time.Sleep(time.Millisecond) } - s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs") checkFlusher.Block = false time.Sleep(time.Second * time.Duration(5)) s.Equal(checkFlusher.GetLogCount(), 10000) @@ -119,7 +120,7 @@ func (s *configUpdateTestSuite) TestConfigUpdateMany() { s.True(exist) time.Sleep(time.Millisecond) } - config, ok = GetLogtailConfig(noblockUpdateConfigName) + config, ok := GetLogtailConfig(noblockUpdateConfigName) s.True(ok) checkFlusher, ok = GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker) s.True(ok) @@ -135,18 +136,17 @@ func (s *configUpdateTestSuite) TestConfigUpdateName() { defer func() { checkFlusher.Block = false time.Sleep(time.Second * 5) - s.Equal(checkFlusher.GetLogCount(), 10000) + s.Equal(checkFlusher.GetLogCount(), 20000) }() s.True(ok) - s.NoError(Stop(updateConfigName, false)) s.Equal(0, checkFlusher.GetLogCount(), "the hold on blocking flusher checker doesn't have any logs") s.NoError(LoadMockConfig(updateConfigName+"_", updateConfigName+"_", updateConfigName+"_", GetTestConfig(updateConfigName))) - s.NoError(Start(updateConfigName)) + s.NoError(Start(updateConfigName + "_")) { _, exist := GetLogtailConfig(updateConfigName) - s.False(exist) + s.True(exist) config, exist := GetLogtailConfig(updateConfigName + "_") s.True(exist) checkFlusher, ok := GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker) @@ -168,10 +168,9 @@ func (s *configUpdateTestSuite) TestStopAllExit() { s.NoError(StopAll(true, true)) s.NoError(StopAll(true, false)) s.Equal(20000, checkFlusher.GetLogCount()) - s.NoError(Start(updateConfigName)) } -func (s *configUpdateTestSuite) TestHoldOnExitTimeout() { +func (s *configUpdateTestSuite) TestStopAllExitTimeout() { time.Sleep(time.Second * time.Duration(1)) config, ok := GetLogtailConfig(updateConfigName) s.True(ok) @@ -185,6 +184,4 @@ func (s *configUpdateTestSuite) TestHoldOnExitTimeout() { checkFlusher.Block = false time.Sleep(time.Second * time.Duration(5)) s.Equal(10000, checkFlusher.GetLogCount()) - time.Sleep(time.Second * 10) - s.NoError(Start(updateConfigName)) } diff --git a/pluginmanager/logstore_config.go b/pluginmanager/logstore_config.go index 8845cfff61..70c38eac51 100644 --- a/pluginmanager/logstore_config.go +++ b/pluginmanager/logstore_config.go @@ -103,12 +103,6 @@ type LogstoreConfig struct { PluginRunner PluginRunner // private fields configDetailHash string - // processShutdown chan struct{} - // flushShutdown chan struct{} - pauseChan chan struct{} - // processWaitSema sync.WaitGroup - // flushWaitSema sync.WaitGroup - pauseOrResumeWg sync.WaitGroup K8sLabelSet map[string]struct{} ContainerLabelSet map[string]struct{} @@ -142,8 +136,6 @@ func (lc *LogstoreConfig) Start() { lc.FlushOutFlag = false logger.Info(lc.Context.GetRuntimeContext(), "config start", "begin") - lc.pauseChan = make(chan struct{}, 1) - lc.PluginRunner.Run() logger.Info(lc.Context.GetRuntimeContext(), "config start", "success") @@ -169,12 +161,6 @@ func (lc *LogstoreConfig) Stop(exitFlag bool) error { return nil } -func (lc *LogstoreConfig) pause() { - lc.pauseOrResumeWg.Add(1) - lc.pauseChan <- struct{}{} - lc.pauseOrResumeWg.Wait() -} - const ( rawStringKey = "content" defaultTagPrefix = "__tag__:__prefix__" diff --git a/pluginmanager/plugin_manager.go b/pluginmanager/plugin_manager.go index a92b92c407..22858f29a1 100644 --- a/pluginmanager/plugin_manager.go +++ b/pluginmanager/plugin_manager.go @@ -134,12 +134,6 @@ func Init() (err error) { // timeoutStop wrappers LogstoreConfig.Stop with timeout (5s by default). // @return true if Stop returns before timeout, otherwise false. func timeoutStop(config *LogstoreConfig, exitFlag bool) bool { - if !exitFlag { - config.pause() - logger.Info(config.Context.GetRuntimeContext(), "Pause config", "done") - return true - } - done := make(chan int) go func() { logger.Info(config.Context.GetRuntimeContext(), "Stop config in goroutine", "begin") @@ -161,50 +155,66 @@ func timeoutStop(config *LogstoreConfig, exitFlag bool) bool { func StopAll(exitFlag, withInput bool) error { defer panicRecover("Run plugin") + configNames := make([]string, 0) LogtailConfig.Range(func(key, value interface{}) bool { - if logstoreConfig, ok := value.(*LogstoreConfig); ok { - if (withInput && logstoreConfig.PluginRunner.IsWithInputPlugin()) || (!withInput && !logstoreConfig.PluginRunner.IsWithInputPlugin()) { + configNames = append(configNames, key.(string)) + return true + }) + for _, configName := range configNames { + if logstoreConfig, ok := GetLogtailConfig(configName); ok { + matchFlag := false + if withInput { + if logstoreConfig.PluginRunner.IsWithInputPlugin() { + matchFlag = true + } + } else { + if !logstoreConfig.PluginRunner.IsWithInputPlugin() { + matchFlag = true + } + } + if matchFlag { if hasStopped := timeoutStop(logstoreConfig, exitFlag); !hasStopped { // TODO: This alarm can not be sent to server in current alarm design. logger.Error(logstoreConfig.Context.GetRuntimeContext(), "CONFIG_STOP_TIMEOUT_ALARM", "timeout when stop config, goroutine might leak") } - LogtailConfig.Delete(key) - } else { - // should never happen - logger.Error(logstoreConfig.Context.GetRuntimeContext(), "CONFIG_STOP_ALARM", "stop config not match withInput", withInput, "configName", key) + LogtailConfig.Delete(configName) } } - return true - }) - if StatisticsConfig != nil { - if *flags.ForceSelfCollect { - logger.Info(context.Background(), "force collect the static metrics") - control := pipeline.NewAsyncControl() - StatisticsConfig.PluginRunner.RunPlugins(pluginMetricInput, control) - control.WaitCancel() - } - _ = StatisticsConfig.Stop(exitFlag) } - if AlarmConfig != nil { - if *flags.ForceSelfCollect { - logger.Info(context.Background(), "force collect the alarm metrics") - control := pipeline.NewAsyncControl() - AlarmConfig.PluginRunner.RunPlugins(pluginMetricInput, control) - control.WaitCancel() + if exitFlag { + if StatisticsConfig != nil { + if *flags.ForceSelfCollect { + logger.Info(context.Background(), "force collect the static metrics") + control := pipeline.NewAsyncControl() + StatisticsConfig.PluginRunner.RunPlugins(pluginMetricInput, control) + control.WaitCancel() + } + _ = StatisticsConfig.Stop(exitFlag) + StatisticsConfig = nil } - _ = AlarmConfig.Stop(exitFlag) - } - if ContainerConfig != nil { - if *flags.ForceSelfCollect { - logger.Info(context.Background(), "force collect the container metrics") - control := pipeline.NewAsyncControl() - ContainerConfig.PluginRunner.RunPlugins(pluginMetricInput, control) - control.WaitCancel() + if AlarmConfig != nil { + if *flags.ForceSelfCollect { + logger.Info(context.Background(), "force collect the alarm metrics") + control := pipeline.NewAsyncControl() + AlarmConfig.PluginRunner.RunPlugins(pluginMetricInput, control) + control.WaitCancel() + } + _ = AlarmConfig.Stop(exitFlag) + AlarmConfig = nil + } + if ContainerConfig != nil { + if *flags.ForceSelfCollect { + logger.Info(context.Background(), "force collect the container metrics") + control := pipeline.NewAsyncControl() + ContainerConfig.PluginRunner.RunPlugins(pluginMetricInput, control) + control.WaitCancel() + } + _ = ContainerConfig.Stop(exitFlag) + ContainerConfig = nil } - _ = ContainerConfig.Stop(exitFlag) + CheckPointManager.HoldOn() } - CheckPointManager.HoldOn() return nil }