Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Sep 17, 2024
1 parent fd478b7 commit 9ddb08d
Show file tree
Hide file tree
Showing 19 changed files with 338 additions and 308 deletions.
4 changes: 2 additions & 2 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ void LogtailPlugin::Stop(const std::string& configName, bool removingFlag) {

void LogtailPlugin::Start(const std::string& configName) {
if (mPluginValid && mStartFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines start", "starts"));
LOG_INFO(sLogger, ("Go pipelines start", "starts")("config name", configName));
GoString goConfigName;
goConfigName.n = configName.size();
goConfigName.p = configName.c_str();
mStartFun(goConfigName);
LOG_INFO(sLogger, ("Go pipelines start", "succeeded"));
LOG_INFO(sLogger, ("Go pipelines start", "succeeded")("config name", configName));
}
}

Expand Down
2 changes: 2 additions & 0 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ bool Pipeline::LoadGoPipelines() const {
// 目前按照从后往前顺序加载,即便without成功with失败导致without残留在插件系统中,也不会有太大的问题,但最好改成原子的。
if (!mGoPipelineWithoutInput.isNull()) {
string content = mGoPipelineWithoutInput.toStyledString();
LOG_INFO(sLogger, ("load go pipeline", "without input")("content", content)("config", mName));
string goConfigName = GetConfigNameOfGoPipelineWithoutInput();
if (!LogtailPlugin::GetInstance()->LoadPipeline(goConfigName,
content,
Expand All @@ -497,6 +498,7 @@ bool Pipeline::LoadGoPipelines() const {
}
if (!mGoPipelineWithInput.isNull()) {
string content = mGoPipelineWithInput.toStyledString();
LOG_INFO(sLogger, ("load go pipeline", "with input")("content", content)("config", mName));
string goConfigName = GetConfigNameOfGoPipelineWithInput();
if (!LogtailPlugin::GetInstance()->LoadPipeline(goConfigName,
content,
Expand Down
24 changes: 16 additions & 8 deletions plugin_main/plugin_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,10 +161,12 @@ func UnloadConfig(project string, logstore string, configName string) int {

//export ProcessRawLog
func ProcessRawLog(configName string, rawLog []byte, packID string, topic string) int {
plugin, ok := pluginmanager.GetLogtailConfig(configName)
if !ok {
pluginmanager.LogtailConfigLock.RLock()
plugin, flag := pluginmanager.LogtailConfig[configName]
if !flag {
return -1
}
pluginmanager.LogtailConfigLock.RUnlock()

// rawLog will be copied when it is converted to string, packID and topic
// are unused now, so deep copy is unnecessary.
Expand All @@ -173,28 +175,34 @@ func ProcessRawLog(configName string, rawLog []byte, packID string, topic string

//export ProcessRawLogV2
func ProcessRawLogV2(configName string, rawLog []byte, packID string, topic string, tags []byte) int {
config, ok := pluginmanager.GetLogtailConfig(configName)
if !ok {
pluginmanager.LogtailConfigLock.RLock()
config, flag := pluginmanager.LogtailConfig[configName]
if !flag {
return -1
}
pluginmanager.LogtailConfigLock.RUnlock()
return config.ProcessRawLogV2(rawLog, util.StringDeepCopy(packID), util.StringDeepCopy(topic), tags)
}

//export ProcessLog
func ProcessLog(configName string, logBytes []byte, packID string, topic string, tags []byte) int {
config, ok := pluginmanager.GetLogtailConfig(configName)
if !ok {
pluginmanager.LogtailConfigLock.RLock()
config, flag := pluginmanager.LogtailConfig[configName]
if !flag {
return -1
}
pluginmanager.LogtailConfigLock.RUnlock()
return config.ProcessLog(logBytes, util.StringDeepCopy(packID), util.StringDeepCopy(topic), tags)
}

//export ProcessLogGroup
func ProcessLogGroup(configName string, logBytes []byte, packID string) int {
config, ok := pluginmanager.GetLogtailConfig(configName)
if !ok {
pluginmanager.LogtailConfigLock.RLock()
config, flag := pluginmanager.LogtailConfig[configName]
if !flag {
return -1
}
pluginmanager.LogtailConfigLock.RUnlock()
return config.ProcessLogGroup(logBytes, util.StringDeepCopy(packID))
}

Expand Down
95 changes: 67 additions & 28 deletions plugin_main/plugin_main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,61 +99,80 @@ func TestHangConfigWhenStop(t *testing.T) {
// Initialize plugin and run config.
require.Equal(t, 0, InitPluginBase())
require.Equal(t, 0, LoadConfig("project", "logstore", configName, 0, badConfigStr))
require.NotNil(t, pluginmanager.ToStartLogtailConfig)
require.Equal(t, configName, pluginmanager.ToStartLogtailConfig.ConfigName)
Start(configName)
time.Sleep(time.Second * 2)
config, ok := pluginmanager.GetLogtailConfig(configName)
require.True(t, ok)
config := pluginmanager.ToStartLogtailConfigWithoutInput
require.NotNil(t, config)
require.Equal(t, configName, config.ConfigName)
flusher, _ := pluginmanager.GetConfigFlushers(config.PluginRunner)[0].(*BadFlusher)
flusher.Shutdown = shutdown
Start(configName)
time.Sleep(time.Second * 2)
require.Nil(t, pluginmanager.ToStartLogtailConfigWithoutInput)
pluginmanager.LogtailConfigLock.RLock()
_, exists := pluginmanager.LogtailConfig[configName]
require.True(t, exists)
pluginmanager.LogtailConfigLock.RUnlock()

// Stop config, it will hang.
Stop(configName, 0)
time.Sleep(time.Second * 2)
// Load again. It will success since independent reload.
config, exists = pluginmanager.DisabledLogtailConfig[configName]
require.Equal(t, configName, config.ConfigName)
require.True(t, exists)
// Load again, fail.
time.Sleep(time.Second)
require.Equal(t, 0, LoadConfig("project", "logstore", configName, 0, badConfigStr))
require.Equal(t, 1, LoadConfig("project", "logstore", configName, 0, badConfigStr))

// Notify the config to quit so that it can be enabled again.
close(shutdown)
time.Sleep(time.Second * 2)
require.Empty(t, pluginmanager.DisabledLogtailConfig)

// Load again, succeed.
require.Equal(t, 0, LoadConfig("project", "logstore", configName, 0, badConfigStr))
require.NotNil(t, pluginmanager.ToStartLogtailConfig)
require.Equal(t, configName, pluginmanager.ToStartLogtailConfig.ConfigName)
config = pluginmanager.ToStartLogtailConfigWithoutInput
require.NotNil(t, config)
require.Equal(t, configName, config.ConfigName)
Start(configName)
time.Sleep(time.Second)
config, ok = pluginmanager.GetLogtailConfig(configName)
require.True(t, ok)
require.Equal(t, configName, config.ConfigName)
pluginmanager.LogtailConfigLock.RLock()
_, exists = pluginmanager.LogtailConfig[configName]
pluginmanager.LogtailConfigLock.RUnlock()
require.True(t, exists)
flusher, _ = pluginmanager.GetConfigFlushers(config.PluginRunner)[0].(*BadFlusher)
shutdown = make(chan int)
flusher.Shutdown = shutdown

// Stop config, hang again.
Stop(configName, 0)
Stop(config.ConfigNameWithSuffix, 0)
time.Sleep(time.Second * 2)
// Load again. It will success since independent reload.
config, exists = pluginmanager.DisabledLogtailConfig[config.ConfigNameWithSuffix]
require.True(t, exists)
require.Equal(t, configName, config.ConfigName)
// Load again, fail.
time.Sleep(time.Second)
require.Equal(t, 0, LoadConfig("project", "logstore", configName, 0, badConfigStr))
require.Equal(t, 1, LoadConfig("project", "logstore", configName, 0, badConfigStr))
require.Nil(t, pluginmanager.ToStartLogtailConfigWithInput)

// Change config detail so that it can be loaded again.
validConfigStr := fmt.Sprintf(configTemplateJSONStr, 4)
require.Equal(t, 0, LoadConfig("project", "logstore", configName, 0, validConfigStr))
Start(configName)
time.Sleep(time.Second * 2)
config, ok = pluginmanager.GetLogtailConfig(configName)
require.True(t, ok)
pluginmanager.LogtailConfigLock.RLock()
config, exists = pluginmanager.LogtailConfig[configName]
pluginmanager.LogtailConfigLock.RUnlock()
require.True(t, exists)
require.Equal(t, configName, config.ConfigName)
require.Empty(t, pluginmanager.DisabledLogtailConfig)

// Quit.
time.Sleep(time.Second)
StopAll(1, 1)
StopAll(1, 0)
time.Sleep(time.Second)
require.Equal(t, 0, pluginmanager.GetLogtailConfigSize())
require.Empty(t, pluginmanager.DisabledLogtailConfig)
pluginmanager.LogtailConfigLock.RLock()
require.Empty(t, pluginmanager.LogtailConfig)
pluginmanager.LogtailConfigLock.RUnlock()

// Close hanged goroutine.
close(shutdown)
Expand All @@ -167,33 +186,53 @@ func TestSlowConfigWhenStop(t *testing.T) {
// Initialize plugin and run config.
require.Equal(t, 0, InitPluginBase())
require.Equal(t, 0, LoadConfig("project", "logstore", configName, 0, badConfigStr))
require.NotNil(t, pluginmanager.ToStartLogtailConfig)
require.Equal(t, configName, pluginmanager.ToStartLogtailConfig.ConfigName)
config := pluginmanager.ToStartLogtailConfigWithoutInput
require.NotNil(t, config)
require.Equal(t, configName, config.ConfigName)
Start(configName)
config, exists := pluginmanager.LogtailConfig[configName]
require.True(t, exists)
require.Equal(t, configName, config.ConfigName)
time.Sleep(time.Second * 2)
config, ok := pluginmanager.GetLogtailConfig(configName)
pluginmanager.LogtailConfigLock.RLock()
config, ok := pluginmanager.LogtailConfig[configName]
pluginmanager.LogtailConfigLock.RUnlock()
require.True(t, ok)
require.Equal(t, configName, config.ConfigName)

// Stop config, it will hang.
Stop(configName, 0)
// Load again. It will success since independent reload.
config, exists = pluginmanager.DisabledLogtailConfig[configName]
require.True(t, exists)
require.Equal(t, configName, config.ConfigName)
// Load again, fail.
time.Sleep(time.Second)
require.Equal(t, 0, LoadConfig("project", "logstore", configName, 0, badConfigStr))
require.NotNil(t, pluginmanager.ToStartLogtailConfig)
require.Equal(t, 1, LoadConfig("project", "logstore", configName, 0, badConfigStr))
require.Empty(t, pluginmanager.LogtailConfig)
require.Nil(t, pluginmanager.ToStartLogtailConfigWithInput)

// Wait more time, so that the config can finish stopping.
time.Sleep(time.Second * 5)
// Load again, succeed.
require.Equal(t, 0, LoadConfig("project", "logstore", configName, 0, badConfigStr))
config = pluginmanager.ToStartLogtailConfigWithoutInput
require.NotNil(t, config)
require.Equal(t, configName, config.ConfigName)
Start(configName)
config, exists = pluginmanager.LogtailConfig[configName]
require.True(t, exists)
require.Equal(t, configName, config.ConfigName)
time.Sleep(time.Second)
config, ok = pluginmanager.GetLogtailConfig(configName)
pluginmanager.LogtailConfigLock.RLock()
config, ok = pluginmanager.LogtailConfig[configName]
pluginmanager.LogtailConfigLock.RUnlock()
require.True(t, ok)
require.Equal(t, configName, config.ConfigName)

// Quit.
time.Sleep(time.Second)
StopAll(1, 0)
StopAll(1, 1)
StopAll(1, 0)
time.Sleep(time.Second * 6)
require.Empty(t, pluginmanager.DisabledLogtailConfig)
}
4 changes: 3 additions & 1 deletion pluginmanager/checkpoint_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,9 @@ func (p *checkPointManager) keyMatch(key []byte) bool {
// configName in checkpoint is real config Name, while configName in LogtailConfig has suffix '/1' or '/2'
// since checkpoint is only used in input, so configName can only be 'realConfigName/1', meaning go pipeline with input
configName += "/1"
_, existFlag := LogtailConfig.Load(configName)
LogtailConfigLock.RLock()
_, existFlag := LogtailConfig[configName]
LogtailConfigLock.RUnlock()
return existFlag
}

Expand Down
12 changes: 8 additions & 4 deletions pluginmanager/checkpoint_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,10 @@ func Test_checkPointManager_run(t *testing.T) {
func Test_checkPointManager_keyMatch(t *testing.T) {
CheckPointManager.Init()
t.Run("key match", func(t *testing.T) {
LogtailConfig.Store("test_1/1", nil)
LogtailConfig.Store("test_2/1", nil)
LogtailConfigLock.Lock()
LogtailConfig["test_1/1"] = nil
LogtailConfig["test_2/1"] = nil
LogtailConfigLock.Unlock()
if got := CheckPointManager.keyMatch([]byte("test_1")); got {
t.Errorf("checkPointManager.Test_checkPointManager_keyMatch()")
}
Expand All @@ -121,7 +123,9 @@ func Test_checkPointManager_keyMatch(t *testing.T) {
if got := CheckPointManager.keyMatch([]byte("texst_1^xxx")); got {
t.Errorf("checkPointManager.Test_checkPointManager_keyMatch()")
}
LogtailConfig.Delete("test_1/1")
LogtailConfig.Delete("test_2/1")
LogtailConfigLock.Lock()
delete(LogtailConfig, "test_1/1")
delete(LogtailConfig, "test_2/1")
LogtailConfigLock.Unlock()
})
}
Loading

0 comments on commit 9ddb08d

Please sign in to comment.