Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Sep 13, 2024
1 parent a6556f3 commit a323173
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 60 deletions.
2 changes: 1 addition & 1 deletion core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class Pipeline {
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& GetPluginStatistics() const {
return mPluginCntMap;
}
bool LoadGoPipelines() const; // 应当放在private,过渡期间放在public

// only for input_observer_network for compatability
const std::vector<std::unique_ptr<InputInstance>>& GetInputs() const { return mInputs; }
Expand All @@ -81,6 +80,7 @@ class Pipeline {
std::string GetConfigNameOfGoPipelineWithoutInput() const { return mName + "/2"; }

private:
bool LoadGoPipelines() const;
void MergeGoPipeline(const Json::Value& src, Json::Value& dst);
void AddPluginToGoPipeline(const std::string& type,
const Json::Value& plugin,
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/plugin/interface/Flusher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ bool Flusher::Start() {

bool Flusher::Stop(bool isPipelineRemoving) {
if (HasContext()) {
unique_ptr<SenderQueueItem> tombStone = make_unique<SenderQueueItem>();
unique_ptr<SenderQueueItem> tombStone = make_unique<SenderQueueItem>("", 0, this, mQueueKey);
tombStone->mPipeline = PipelineManager::GetInstance()->FindConfigByName(mContext->GetConfigName());
if (!tombStone->mPipeline) {
LOG_ERROR(sLogger, ("failed to find pipeline when stop flusher", mContext->GetConfigName()));
Expand Down
1 change: 0 additions & 1 deletion core/pipeline/queue/SenderQueueItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <memory>
#include <string>

#include "pipeline/PipelineManager.h"
#include "pipeline/queue/QueueKey.h"

namespace logtail {
Expand Down
4 changes: 2 additions & 2 deletions core/plugin/flusher/sls/SLSClientManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
#include "common/LogtailCommonFlags.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/flusher/sls/SendResult.h"
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/flusher/sls/SendResult.h"
#include "sdk/Exception.h"
#include "sls_control/SLSControl.h"

Expand Down
Binary file modified pkg/logtail/libPluginAdapter.so
Binary file not shown.
1 change: 0 additions & 1 deletion plugin_main/plugin_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ func initPluginBase(cfgStr string) int {
rst := 0
initOnce.Do(func() {
logger.Init()
flags.OverrideByEnv()
if pluginmanager.StatisticsConfig != nil {
pluginmanager.StatisticsConfig.Start()
}
Expand Down
122 changes: 73 additions & 49 deletions pluginmanager/logstore_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,12 +323,15 @@ func hasDockerStdoutInput(plugins map[string]interface{}) bool {
if !valid {
continue
}
typeName, valid := cfg["type"]
pluginTypeWithID, valid := cfg["type"]
if !valid {
continue
}
if val, valid := typeName.(string); valid && val == input.ServiceDockerStdoutPluginName {
return true
if val, valid := pluginTypeWithID.(string); valid {
pluginType := getPluginType(val)
if pluginType == input.ServiceDockerStdoutPluginName {
return true
}
}
}
return false
Expand All @@ -350,6 +353,7 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
contextImp.logstoreC = logstoreC

var plugins = make(map[string]interface{})
logger.Info(contextImp.GetRuntimeContext(), "load plugin config", jsonStr)
if err = json.Unmarshal([]byte(jsonStr), &plugins); err != nil {
return nil, err
}
Expand All @@ -372,11 +376,16 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
if !valid {
continue
}
typeName, valid := cfg["type"]
pluginTypeWithID, valid := cfg["type"]
if !valid {
continue
}
val, valid := pluginTypeWithID.(string)
if !valid {
continue
}
if val, valid := typeName.(string); valid && (val == input.ServiceDockerStdoutPluginName || val == input.MetricDocierFilePluginName) {
pluginType := getPluginType(val)
if pluginType == input.ServiceDockerStdoutPluginName || pluginType == input.MetricDocierFilePluginName {
configDetail, valid := cfg["detail"]
if !valid {
continue
Expand Down Expand Up @@ -472,16 +481,19 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
if !ok {
return nil, fmt.Errorf("invalid extension type")
}
typeName, ok := extension["type"].(string)
if !ok {
return nil, fmt.Errorf("invalid extension type")
}
logger.Debug(contextImp.GetRuntimeContext(), "add extension", typeName)
err = loadExtension(logstoreC.genPluginMeta(typeName, false, false), logstoreC, extension["detail"])
if err != nil {
return nil, err
if pluginTypeWithID, ok := extension["type"]; ok {
pluginTypeWithIDStr, ok := pluginTypeWithID.(string)
if !ok {
return nil, fmt.Errorf("invalid extension type")
}
pluginType := getPluginType(pluginTypeWithIDStr)
logger.Debug(contextImp.GetRuntimeContext(), "add extension", pluginType)
err = loadExtension(logstoreC.genPluginMeta(pluginTypeWithIDStr, false, false), logstoreC, extension["detail"])
if err != nil {
return nil, err
}
contextImp.AddPlugin(pluginType)
}
contextImp.AddPlugin(typeName)
}
}

Expand All @@ -492,20 +504,21 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
for _, inputInterface := range inputs {
input, ok := inputInterface.(map[string]interface{})
if ok {
if typeName, ok := input["type"]; ok {
if typeNameStr, ok := typeName.(string); ok {
if _, isMetricInput := pipeline.MetricInputs[typeNameStr]; isMetricInput {
if pluginTypeWithID, ok := input["type"]; ok {
if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok {
pluginType := getPluginType(pluginTypeWithIDStr)
if _, isMetricInput := pipeline.MetricInputs[pluginType]; isMetricInput {
// Load MetricInput plugin defined in pipeline.MetricInputs
// pipeline.MetricInputs will be renamed in a future version
err = loadMetric(logstoreC.genPluginMeta(typeNameStr, true, false), logstoreC, input["detail"])
} else if _, isServiceInput := pipeline.ServiceInputs[typeNameStr]; isServiceInput {
err = loadMetric(logstoreC.genPluginMeta(pluginTypeWithIDStr, true, false), logstoreC, input["detail"])
} else if _, isServiceInput := pipeline.ServiceInputs[pluginType]; isServiceInput {
// Load ServiceInput plugin defined in pipeline.ServiceInputs
err = loadService(logstoreC.genPluginMeta(typeNameStr, true, false), logstoreC, input["detail"])
err = loadService(logstoreC.genPluginMeta(pluginTypeWithIDStr, true, false), logstoreC, input["detail"])
}
if err != nil {
return nil, err
}
contextImp.AddPlugin(typeNameStr)
contextImp.AddPlugin(pluginType)
continue
}
}
Expand All @@ -523,14 +536,15 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
for i, processorInterface := range processors {
processor, ok := processorInterface.(map[string]interface{})
if ok {
if typeName, ok := processor["type"]; ok {
if typeNameStr, ok := typeName.(string); ok {
logger.Debug(contextImp.GetRuntimeContext(), "add processor", typeNameStr)
err = loadProcessor(logstoreC.genPluginMeta(typeNameStr, true, false), i, logstoreC, processor["detail"])
if pluginTypeWithID, ok := processor["type"]; ok {
if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok {
pluginType := getPluginType(pluginTypeWithIDStr)
logger.Debug(contextImp.GetRuntimeContext(), "add processor", pluginType)
err = loadProcessor(logstoreC.genPluginMeta(pluginTypeWithIDStr, true, false), i, logstoreC, processor["detail"])
if err != nil {
return nil, err
}
contextImp.AddPlugin(typeNameStr)
contextImp.AddPlugin(pluginType)
continue
}
}
Expand All @@ -549,14 +563,15 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
for _, aggregatorInterface := range aggregators {
aggregator, ok := aggregatorInterface.(map[string]interface{})
if ok {
if typeName, ok := aggregator["type"]; ok {
if typeNameStr, ok := typeName.(string); ok {
logger.Debug(contextImp.GetRuntimeContext(), "add aggregator", typeNameStr)
err = loadAggregator(logstoreC.genPluginMeta(typeNameStr, true, false), logstoreC, aggregator["detail"])
if pluginTypeWithID, ok := aggregator["type"]; ok {
if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok {
pluginType := getPluginType(pluginTypeWithIDStr)
logger.Debug(contextImp.GetRuntimeContext(), "add aggregator", pluginType)
err = loadAggregator(logstoreC.genPluginMeta(pluginTypeWithIDStr, true, false), logstoreC, aggregator["detail"])
if err != nil {
return nil, err
}
contextImp.AddPlugin(typeNameStr)
contextImp.AddPlugin(pluginType)
continue
}
}
Expand All @@ -579,18 +594,19 @@ func createLogstoreConfig(project string, logstore string, configName string, lo
for num, flusherInterface := range flushers {
flusher, ok := flusherInterface.(map[string]interface{})
if ok {
if typeName, ok := flusher["type"]; ok {
if typeNameStr, ok := typeName.(string); ok {
logger.Debug(contextImp.GetRuntimeContext(), "add flusher", typeNameStr)
if pluginTypeWithID, ok := flusher["type"]; ok {
if pluginTypeWithIDStr, ok := pluginTypeWithID.(string); ok {
pluginType := getPluginType(pluginTypeWithIDStr)
logger.Debug(contextImp.GetRuntimeContext(), "add flusher", pluginType)
lastOne := false
if num == flushersLen-1 {
lastOne = true
}
err = loadFlusher(logstoreC.genPluginMeta(typeNameStr, true, lastOne), logstoreC, flusher["detail"])
err = loadFlusher(logstoreC.genPluginMeta(pluginTypeWithIDStr, true, lastOne), logstoreC, flusher["detail"])
if err != nil {
return nil, err
}
contextImp.AddPlugin(typeNameStr)
contextImp.AddPlugin(pluginType)
continue
}
}
Expand Down Expand Up @@ -763,14 +779,21 @@ func applyPluginConfig(plugin interface{}, pluginConfig interface{}) error {
return err
}

// Rule: pluginName=pluginType/pluginID#pluginPriority.
func (lc *LogstoreConfig) genPluginMeta(pluginName string, genNodeID bool, lastOne bool) *pipeline.PluginMeta {
// Rule: pluginTypeWithID=pluginType/pluginID#pluginPriority.
func getPluginType(pluginTypeWithID string) string {
if ids := strings.IndexByte(pluginTypeWithID, '/'); ids != -1 {
return pluginTypeWithID[:ids]
}
return pluginTypeWithID
}

func (lc *LogstoreConfig) genPluginMeta(pluginTypeWithID string, genNodeID bool, lastOne bool) *pipeline.PluginMeta {
nodeID := ""
childNodeID := ""
if isPluginTypeWithID(pluginName) {
pluginTypeWithID := pluginName
if idx := strings.IndexByte(pluginName, '#'); idx != -1 {
pluginTypeWithID = pluginName[:idx]
if isPluginTypeWithID(pluginTypeWithID) {
pluginTypeWithID := pluginTypeWithID
if idx := strings.IndexByte(pluginTypeWithID, '#'); idx != -1 {
pluginTypeWithID = pluginTypeWithID[:idx]
}
if ids := strings.IndexByte(pluginTypeWithID, '/'); ids != -1 {
if genNodeID {
Expand All @@ -788,30 +811,31 @@ func (lc *LogstoreConfig) genPluginMeta(pluginName string, genNodeID bool, lastO
}
}
}
pluginType := pluginTypeWithID
pluginID := lc.genPluginID()
if genNodeID {
nodeID, childNodeID = lc.genNodeID(lastOne)
}
pluginTypeWithID := fmt.Sprintf("%s/%s", pluginName, pluginID)
pluginTypeWithID = fmt.Sprintf("%s/%s", pluginType, pluginID)
return &pipeline.PluginMeta{
PluginTypeWithID: pluginTypeWithID,
PluginType: pluginName,
PluginType: pluginType,
PluginID: pluginID,
NodeID: nodeID,
ChildNodeID: childNodeID,
}
}

func isPluginTypeWithID(pluginName string) bool {
if idx := strings.IndexByte(pluginName, '/'); idx != -1 {
func isPluginTypeWithID(pluginTypeWithID string) bool {
if idx := strings.IndexByte(pluginTypeWithID, '/'); idx != -1 {
return true
}
return false
}

func GetPluginPriority(pluginName string) int {
if idx := strings.IndexByte(pluginName, '#'); idx != -1 {
val, err := strconv.Atoi(pluginName[idx+1:])
func GetPluginPriority(pluginTypeWithID string) int {
if idx := strings.IndexByte(pluginTypeWithID, '#'); idx != -1 {
val, err := strconv.Atoi(pluginTypeWithID[idx+1:])
if err != nil {
return 0
}
Expand Down
6 changes: 4 additions & 2 deletions pluginmanager/metric_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,11 @@ func GetGoCppProvidedMetrics() []map[string]string {
// go 插件指标,直接输出
func GetGoPluginMetrics() []map[string]string {
metrics := make([]map[string]string, 0)
for _, config := range LogtailConfig {
LogtailConfig.Range(func(key, value interface{}) bool {
config := value.(*LogstoreConfig)
metrics = append(metrics, config.Context.ExportMetricRecords()...)
}
return true
})
return metrics
}

Expand Down
7 changes: 4 additions & 3 deletions pluginmanager/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,12 @@ func Stop(configName string, exitFlag bool) error {
func Start(configName string) error {
defer panicRecover("Run plugin")
if ToStartLogtailConfig == nil {
return fmt.Errorf("no pipeline for the config is created: %s", configName)
return fmt.Errorf("no pipeline loaded for the config: %s", configName)
}
if ToStartLogtailConfig.ConfigName != configName {
realConfigName := config.GetRealConfigName(configName)
if ToStartLogtailConfig.ConfigName != realConfigName {
// should never happen
return fmt.Errorf("config unmatch with to start pipeline: given %s, expect %s", configName, ToStartLogtailConfig.ConfigName)
return fmt.Errorf("config unmatch with the loaded pipeline: given %s, expect %s", configName, ToStartLogtailConfig.ConfigName)
}
ToStartLogtailConfig.Start()
LogtailConfig.Store(configName, ToStartLogtailConfig)
Expand Down

0 comments on commit a323173

Please sign in to comment.