Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Aug 25, 2024
1 parent a1e971a commit 692ba6a
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 85 deletions.
25 changes: 19 additions & 6 deletions pluginmanager/checkpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,16 @@ var CheckPointFile = flag.String("CheckPointFile", "checkpoint", "checkpoint fil
var CheckPointCleanInterval = flag.Int("CheckPointCleanInterval", 600, "checkpoint clean interval, second")
var MaxCleanItemPerInterval = flag.Int("MaxCleanItemPerInterval", 1000, "max clean items per interval")

const DefaultCleanThreshold = 6 // one hour

type checkPointManager struct {
db *leveldb.DB
shutdown chan struct{}
waitgroup sync.WaitGroup
initFlag bool
configCounter map[string]int
db *leveldb.DB
shutdown chan struct{}
waitgroup sync.WaitGroup
initFlag bool
runningFlag bool
configCounter map[string]int
cleanThreshold int
}

var CheckPointManager checkPointManager
Expand Down Expand Up @@ -81,6 +85,7 @@ func (p *checkPointManager) Init() error {
}
p.shutdown = make(chan struct{}, 1)
p.configCounter = make(map[string]int)
p.cleanThreshold = DefaultCleanThreshold
logtailConfigDir := config.LogtailGlobalConfig.LogtailSysConfDir
pathExist, err := util.PathExists(logtailConfigDir)
var dbPath string
Expand All @@ -106,7 +111,11 @@ func (p *checkPointManager) Init() error {
}

func (p *checkPointManager) HoldOn() {
if !p.runningFlag {
return
}
logger.Info(context.Background(), "checkpoint", "HoldOn")
p.runningFlag = false
if p.db == nil {
return
}
Expand All @@ -115,7 +124,11 @@ func (p *checkPointManager) HoldOn() {
}

func (p *checkPointManager) Resume() {
if p.runningFlag {
return
}
logger.Info(context.Background(), "checkpoint", "Resume")
p.runningFlag = true
if p.db == nil {
return
}
Expand Down Expand Up @@ -174,7 +187,7 @@ func (p *checkPointManager) check() {
}
for _, key := range cleanItems {
p.configCounter[key]++
if p.configCounter[key] > 6 { // one hour
if p.configCounter[key] > p.cleanThreshold {
_ = p.db.Delete([]byte(key), nil)
logger.Info(context.Background(), "no config, delete checkpoint", key)
delete(p.configCounter, key)
Expand Down
9 changes: 9 additions & 0 deletions pluginmanager/checkpoint_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func Test_checkPointManager_run(t *testing.T) {
CheckPointManager.SaveCheckpoint("1", "xx", []byte("xxxxx"))
CheckPointManager.SaveCheckpoint("2", "yy", []byte("yyyyyy"))
*CheckPointCleanInterval = 1
CheckPointManager.cleanThreshold = 3
if data, err := CheckPointManager.GetCheckpoint("1", "xx"); err != nil || string(data) != "xxxxx" {
t.Errorf("checkPointManager.GetCheckpoint() error, %v %v", err, string(data))
}
Expand All @@ -74,6 +75,14 @@ func Test_checkPointManager_run(t *testing.T) {
t.Errorf("checkPointManager.GetCheckpoint() error, %v %v", err, string(data))
}
CheckPointManager.Resume()
time.Sleep(time.Second * time.Duration(1))
if data, err := CheckPointManager.GetCheckpoint("1", "xx"); err != nil || string(data) != "xxxxx" {
t.Errorf("checkPointManager.GetCheckpoint() error, %v %v", err, string(data))
}

if data, err := CheckPointManager.GetCheckpoint("2", "yy"); err != nil || string(data) != "yyyyyy" {
t.Errorf("checkPointManager.GetCheckpoint() error, %v %v", err, string(data))
}
time.Sleep(time.Second * time.Duration(5))
if data, err := CheckPointManager.GetCheckpoint("1", "xx"); err == nil {
t.Errorf("checkPointManager.GetCheckpoint() error, %v %v", err, string(data))
Expand Down
51 changes: 24 additions & 27 deletions pluginmanager/config_update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@ package pluginmanager

import (
"context"
"strings"
"sync"
"testing"
"time"

"github.com/alibaba/ilogtail/pkg/logger"
_ "github.com/alibaba/ilogtail/plugins/aggregator/baseagg"
"github.com/alibaba/ilogtail/plugins/flusher/checker"

"github.com/stretchr/testify/suite"
Expand All @@ -51,8 +51,8 @@ func (s *configUpdateTestSuite) BeforeTest(suiteName, testName string) {

func (s *configUpdateTestSuite) AfterTest(suiteName, testName string) {
logger.Infof(context.Background(), "========== %s %s test end ========================", suiteName, testName)
s.NoError(StopAll(false, false))
s.NoError(StopAll(false, true))
s.NoError(StopAll(true, false))
s.NoError(StopAll(true, true))
LogtailConfig = sync.Map{}
}

Expand All @@ -68,12 +68,13 @@ func (s *configUpdateTestSuite) TestConfigUpdate() {
// update same hang config
s.NoError(Stop(updateConfigName, false))
s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs")
err := LoadMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName))
s.True(strings.Contains(err.Error(), "failed to create config because timeout stop has happened on it"))
_ = LoadMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName))
// Since independently load config, reload block config will be allowed
s.NoError(LoadMockConfig(noblockUpdateConfigName, noblockUpdateConfigName, noblockUpdateConfigName, GetTestConfig(noblockUpdateConfigName)))
s.NoError(Start(updateConfigName))
s.NoError(Start(noblockUpdateConfigName))
_, exist := GetLogtailConfig(updateConfigName)
s.False(exist)
s.True(exist)
_, exist = GetLogtailConfig(noblockUpdateConfigName)
s.True(exist)

Expand All @@ -88,23 +89,23 @@ func (s *configUpdateTestSuite) TestConfigUpdate() {
}

func (s *configUpdateTestSuite) TestConfigUpdateMany() {
config, ok := GetLogtailConfig(updateConfigName)
s.True(ok)
s.NotNil(config, "%s logstrore config should exist", updateConfigName)
checkFlusher, ok := GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker)
s.True(ok)

s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs")
// load block config
var checkFlusher *checker.FlusherChecker
for i := 0; i < 5; i++ {
config, ok := GetLogtailConfig(updateConfigName)
s.True(ok)
s.NotNil(config, "%s logstrore config should exist", updateConfigName)
checkFlusher, ok = GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker)
s.True(ok)
s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs")

s.NoError(Stop(updateConfigName, false))
err := LoadMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName))
s.True(strings.Contains(err.Error(), "failed to create config because timeout stop has happened on it"))
checkFlusher.Block = false
_ = LoadMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName))
// Since independently load config, reload block config will be allowed
s.NoError(Start(updateConfigName))
_, exist := GetLogtailConfig(updateConfigName)
s.False(exist)
time.Sleep(time.Millisecond)
}
s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs")
checkFlusher.Block = false
time.Sleep(time.Second * time.Duration(5))
s.Equal(checkFlusher.GetLogCount(), 10000)
Expand All @@ -119,7 +120,7 @@ func (s *configUpdateTestSuite) TestConfigUpdateMany() {
s.True(exist)
time.Sleep(time.Millisecond)
}
config, ok = GetLogtailConfig(noblockUpdateConfigName)
config, ok := GetLogtailConfig(noblockUpdateConfigName)
s.True(ok)
checkFlusher, ok = GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker)
s.True(ok)
Expand All @@ -135,18 +136,17 @@ func (s *configUpdateTestSuite) TestConfigUpdateName() {
defer func() {
checkFlusher.Block = false
time.Sleep(time.Second * 5)
s.Equal(checkFlusher.GetLogCount(), 10000)
s.Equal(checkFlusher.GetLogCount(), 20000)
}()
s.True(ok)

s.NoError(Stop(updateConfigName, false))
s.Equal(0, checkFlusher.GetLogCount(), "the hold on blocking flusher checker doesn't have any logs")
s.NoError(LoadMockConfig(updateConfigName+"_", updateConfigName+"_", updateConfigName+"_", GetTestConfig(updateConfigName)))
s.NoError(Start(updateConfigName))
s.NoError(Start(updateConfigName + "_"))

{
_, exist := GetLogtailConfig(updateConfigName)
s.False(exist)
s.True(exist)
config, exist := GetLogtailConfig(updateConfigName + "_")
s.True(exist)
checkFlusher, ok := GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker)
Expand All @@ -168,10 +168,9 @@ func (s *configUpdateTestSuite) TestStopAllExit() {
s.NoError(StopAll(true, true))
s.NoError(StopAll(true, false))
s.Equal(20000, checkFlusher.GetLogCount())
s.NoError(Start(updateConfigName))
}

func (s *configUpdateTestSuite) TestHoldOnExitTimeout() {
func (s *configUpdateTestSuite) TestStopAllExitTimeout() {
time.Sleep(time.Second * time.Duration(1))
config, ok := GetLogtailConfig(updateConfigName)
s.True(ok)
Expand All @@ -185,6 +184,4 @@ func (s *configUpdateTestSuite) TestHoldOnExitTimeout() {
checkFlusher.Block = false
time.Sleep(time.Second * time.Duration(5))
s.Equal(10000, checkFlusher.GetLogCount())
time.Sleep(time.Second * 10)
s.NoError(Start(updateConfigName))
}
14 changes: 0 additions & 14 deletions pluginmanager/logstore_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,6 @@ type LogstoreConfig struct {
PluginRunner PluginRunner
// private fields
configDetailHash string
// processShutdown chan struct{}
// flushShutdown chan struct{}
pauseChan chan struct{}
// processWaitSema sync.WaitGroup
// flushWaitSema sync.WaitGroup
pauseOrResumeWg sync.WaitGroup

K8sLabelSet map[string]struct{}
ContainerLabelSet map[string]struct{}
Expand Down Expand Up @@ -142,8 +136,6 @@ func (lc *LogstoreConfig) Start() {
lc.FlushOutFlag = false
logger.Info(lc.Context.GetRuntimeContext(), "config start", "begin")

lc.pauseChan = make(chan struct{}, 1)

lc.PluginRunner.Run()

logger.Info(lc.Context.GetRuntimeContext(), "config start", "success")
Expand All @@ -169,12 +161,6 @@ func (lc *LogstoreConfig) Stop(exitFlag bool) error {
return nil
}

func (lc *LogstoreConfig) pause() {
lc.pauseOrResumeWg.Add(1)
lc.pauseChan <- struct{}{}
lc.pauseOrResumeWg.Wait()
}

const (
rawStringKey = "content"
defaultTagPrefix = "__tag__:__prefix__"
Expand Down
86 changes: 48 additions & 38 deletions pluginmanager/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,12 +134,6 @@ func Init() (err error) {
// timeoutStop wrappers LogstoreConfig.Stop with timeout (5s by default).
// @return true if Stop returns before timeout, otherwise false.
func timeoutStop(config *LogstoreConfig, exitFlag bool) bool {
if !exitFlag {
config.pause()
logger.Info(config.Context.GetRuntimeContext(), "Pause config", "done")
return true
}

done := make(chan int)
go func() {
logger.Info(config.Context.GetRuntimeContext(), "Stop config in goroutine", "begin")
Expand All @@ -161,50 +155,66 @@ func timeoutStop(config *LogstoreConfig, exitFlag bool) bool {
func StopAll(exitFlag, withInput bool) error {
defer panicRecover("Run plugin")

configNames := make([]string, 0)
LogtailConfig.Range(func(key, value interface{}) bool {
if logstoreConfig, ok := value.(*LogstoreConfig); ok {
if (withInput && logstoreConfig.PluginRunner.IsWithInputPlugin()) || (!withInput && !logstoreConfig.PluginRunner.IsWithInputPlugin()) {
configNames = append(configNames, key.(string))
return true
})
for _, configName := range configNames {
if logstoreConfig, ok := GetLogtailConfig(configName); ok {
matchFlag := false
if withInput {
if logstoreConfig.PluginRunner.IsWithInputPlugin() {
matchFlag = true
}
} else {
if !logstoreConfig.PluginRunner.IsWithInputPlugin() {
matchFlag = true
}
}
if matchFlag {
if hasStopped := timeoutStop(logstoreConfig, exitFlag); !hasStopped {
// TODO: This alarm can not be sent to server in current alarm design.
logger.Error(logstoreConfig.Context.GetRuntimeContext(), "CONFIG_STOP_TIMEOUT_ALARM",
"timeout when stop config, goroutine might leak")
}
LogtailConfig.Delete(key)
} else {
// should never happen
logger.Error(logstoreConfig.Context.GetRuntimeContext(), "CONFIG_STOP_ALARM", "stop config not match withInput", withInput, "configName", key)
LogtailConfig.Delete(configName)
}
}
return true
})
if StatisticsConfig != nil {
if *flags.ForceSelfCollect {
logger.Info(context.Background(), "force collect the static metrics")
control := pipeline.NewAsyncControl()
StatisticsConfig.PluginRunner.RunPlugins(pluginMetricInput, control)
control.WaitCancel()
}
_ = StatisticsConfig.Stop(exitFlag)
}
if AlarmConfig != nil {
if *flags.ForceSelfCollect {
logger.Info(context.Background(), "force collect the alarm metrics")
control := pipeline.NewAsyncControl()
AlarmConfig.PluginRunner.RunPlugins(pluginMetricInput, control)
control.WaitCancel()
if exitFlag {
if StatisticsConfig != nil {
if *flags.ForceSelfCollect {
logger.Info(context.Background(), "force collect the static metrics")
control := pipeline.NewAsyncControl()
StatisticsConfig.PluginRunner.RunPlugins(pluginMetricInput, control)
control.WaitCancel()
}
_ = StatisticsConfig.Stop(exitFlag)
StatisticsConfig = nil
}
_ = AlarmConfig.Stop(exitFlag)
}
if ContainerConfig != nil {
if *flags.ForceSelfCollect {
logger.Info(context.Background(), "force collect the container metrics")
control := pipeline.NewAsyncControl()
ContainerConfig.PluginRunner.RunPlugins(pluginMetricInput, control)
control.WaitCancel()
if AlarmConfig != nil {
if *flags.ForceSelfCollect {
logger.Info(context.Background(), "force collect the alarm metrics")
control := pipeline.NewAsyncControl()
AlarmConfig.PluginRunner.RunPlugins(pluginMetricInput, control)
control.WaitCancel()
}
_ = AlarmConfig.Stop(exitFlag)
AlarmConfig = nil
}
if ContainerConfig != nil {
if *flags.ForceSelfCollect {
logger.Info(context.Background(), "force collect the container metrics")
control := pipeline.NewAsyncControl()
ContainerConfig.PluginRunner.RunPlugins(pluginMetricInput, control)
control.WaitCancel()
}
_ = ContainerConfig.Stop(exitFlag)
ContainerConfig = nil
}
_ = ContainerConfig.Stop(exitFlag)
CheckPointManager.HoldOn()
}
CheckPointManager.HoldOn()
return nil
}

Expand Down

0 comments on commit 692ba6a

Please sign in to comment.