Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add K8s meta self metrics #1765

Merged
merged 29 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,24 @@
#include "common/version.h"
#include "config/ConfigDiff.h"
#include "config/watcher/ConfigWatcher.h"
#include "file_server/EventDispatcher.h"
#include "file_server/event_handler/LogInput.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/FileServer.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "file_server/event_handler/LogInput.h"
#include "go_pipeline/LogtailPlugin.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/MetricExportor.h"
#include "monitor/Monitor.h"
#include "pipeline/InstanceConfigManager.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/plugin/PluginRegistry.h"
#include "runner/LogProcess.h"
#include "pipeline/queue/ExactlyOnceQueueManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "runner/FlusherRunner.h"
#include "runner/LogProcess.h"
#include "runner/sink/http/HttpSink.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
Expand Down Expand Up @@ -269,6 +269,13 @@ void Application::Start() { // GCOVR_EXCL_START
LogtailPlugin::GetInstance()->LoadPluginBase();
}

const char* deployMode = getenv("DEPLOY_MODE");
const char* enableK8sMeta = getenv("ENABLE_KUBERNETES_META");
if (deployMode != NULL && strlen(deployMode) > 0 && strcmp(deployMode, "singleton") == 0
&& strcmp(enableK8sMeta, "true") == 0) {
LogtailPlugin::GetInstance()->LoadPluginBase();
}

LogProcess::GetInstance()->Start();

time_t curTime = 0, lastProfilingCheckTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ require (
github.com/valyala/fastjson v1.6.3 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
github.com/valyala/gozstd v1.21.1 // indirect
github.com/valyala/gozstd v1.17.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
github.com/valyala/quicktemplate v1.7.0 // indirect
github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1534,8 +1534,8 @@ github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo=
github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/gozstd v1.21.1 h1:TQFZVTk5zo7iJcX3o4XYBJujPdO31LFb4fVImwK873A=
github.com/valyala/gozstd v1.21.1/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ=
github.com/valyala/gozstd v1.17.0 h1:M4Ds4MIrw+pD+s6vYtuFZ8D3iEw9htzfdytOV3C3iQU=
github.com/valyala/gozstd v1.17.0/go.mod h1:y5Ew47GLlP37EkTB+B4s7r6A5rdaeB7ftbl9zoYiIPQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
github.com/valyala/quicktemplate v1.7.0 h1:LUPTJmlVcb46OOUY3IeD9DojFpAVbsG+5WFTcjMJzCM=
Expand Down
2 changes: 2 additions & 0 deletions pkg/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ var (
DeployMode = flag.String("DEPLOY_MODE", DeployDaemonset, "alibaba log deploy mode, daemonset or statefulset or singleton")
EnableKubernetesMeta = flag.Bool("ENABLE_KUBERNETES_META", false, "enable kubernetes meta")
ClusterID = flag.String("GLOBAL_CLUSTER_ID", "", "cluster id")
ClusterType = flag.String("GLOBAL_CLUSTER_TYPE", "", "cluster type, supporting ack, one, asi and k8s")
)

func init() {
Expand Down Expand Up @@ -144,6 +145,7 @@ func init() {
_ = util.InitFromEnvString("DEPLOY_MODE", DeployMode, *DeployMode)
_ = util.InitFromEnvBool("ENABLE_KUBERNETES_META", EnableKubernetesMeta, *EnableKubernetesMeta)
_ = util.InitFromEnvString("GLOBAL_CLUSTER_ID", ClusterID, *ClusterID)
_ = util.InitFromEnvString("GLOBAL_CLUSTER_TYPE", ClusterType, *ClusterType)

if len(*DefaultRegion) == 0 {
*DefaultRegion = util.GuessRegionByEndpoint(*LogServiceEndpoint, "cn-hangzhou")
Expand Down
11 changes: 11 additions & 0 deletions pkg/helper/k8smeta/k8s_meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ func (m *k8sMetaCache) Get(key []string) map[string][]*ObjectWrapper {
return m.metaStore.Get(key)
}

func (m *k8sMetaCache) GetSize() int {
return len(m.metaStore.Items)
}

func (m *k8sMetaCache) GetQueueSize() int {
return len(m.eventCh)
}

func (m *k8sMetaCache) List() []*ObjectWrapper {
return m.metaStore.List()
}
Expand Down Expand Up @@ -86,6 +94,7 @@ func (m *k8sMetaCache) watch(stopCh <-chan struct{}) {
LastObservedTime: nowTime,
},
}
metaManager.addEventCount.Add(1)
},
UpdateFunc: func(oldObj interface{}, obj interface{}) {
nowTime := time.Now().Unix()
Expand All @@ -98,6 +107,7 @@ func (m *k8sMetaCache) watch(stopCh <-chan struct{}) {
LastObservedTime: nowTime,
},
}
metaManager.updateEventCount.Add(1)
},
DeleteFunc: func(obj interface{}) {
m.eventCh <- &K8sMetaEvent{
Expand All @@ -108,6 +118,7 @@ func (m *k8sMetaCache) watch(stopCh <-chan struct{}) {
LastObservedTime: time.Now().Unix(),
},
}
metaManager.deleteEventCount.Add(1)
},
})
go factory.Start(stopCh)
Expand Down
32 changes: 20 additions & 12 deletions pkg/helper/k8smeta/k8s_meta_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ type metadataHandler struct {
metaManager *MetaManager
}

func newMetadataHandler() *metadataHandler {
func newMetadataHandler(metaManager *MetaManager) *metadataHandler {
metadataHandler := &metadataHandler{
metaManager: GetMetaManagerInstance(),
metaManager: metaManager,
}
return metadataHandler
}
Expand All @@ -46,17 +46,10 @@ func (m *metadataHandler) K8sServerRun(stopCh <-chan struct{}) error {
mux := http.NewServeMux()

// TODO: add port in ip endpoint
mux.HandleFunc("/metadata/ip", m.handlePodMetaByUniqueID)
mux.HandleFunc("/metadata/containerid", m.handlePodMetaByUniqueID)
mux.HandleFunc("/metadata/host", m.handlePodMetaByHostIP)
mux.HandleFunc("/metadata/ip", m.handler(m.handlePodMetaByUniqueID))
mux.HandleFunc("/metadata/containerid", m.handler(m.handlePodMetaByUniqueID))
mux.HandleFunc("/metadata/host", m.handler(m.handlePodMetaByHostIP))
server.Handler = mux
for {
if m.metaManager.IsReady() {
break
}
time.Sleep(1 * time.Second)
logger.Warning(context.Background(), "K8S_META_SERVER_WAIT", "waiting for k8s meta manager to be ready")
}
logger.Info(context.Background(), "k8s meta server", "started", "port", port)
go func() {
defer panicRecover()
Expand All @@ -66,6 +59,21 @@ func (m *metadataHandler) K8sServerRun(stopCh <-chan struct{}) error {
return nil
}

func (m *metadataHandler) handler(handleFunc func(w http.ResponseWriter, r *http.Request)) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if !m.metaManager.IsReady() {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
startTime := time.Now()
m.metaManager.httpRequestCount.Add(1)
handleFunc(w, r)
latency := time.Since(startTime).Milliseconds()
m.metaManager.httpAvgDelayMs.Add(latency)
m.metaManager.httpMaxDelayMs.Set(float64(latency))
}
}

func (m *metadataHandler) handlePodMetaByUniqueID(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
var rBody requestBody
Expand Down
55 changes: 45 additions & 10 deletions pkg/helper/k8smeta/k8s_meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ var onceManager sync.Once

type MetaCache interface {
Get(key []string) map[string][]*ObjectWrapper
GetSize() int
GetQueueSize() int
List() []*ObjectWrapper
RegisterSendFunc(key string, sendFunc SendFunc, interval int)
UnRegisterSendFunc(key string)
Expand All @@ -40,23 +42,32 @@ type MetaManager struct {
clientset *kubernetes.Clientset
stopCh chan struct{}

eventCh chan *K8sMetaEvent
ready atomic.Bool
ready atomic.Bool

metadataHandler *metadataHandler
cacheMap map[string]MetaCache
linkGenerator *LinkGenerator
linkRegisterMap map[string][]string
linkRegisterLock sync.RWMutex

metricContext pipeline.Context
// self metrics
metricRecord pipeline.MetricsRecord
addEventCount pipeline.CounterMetric
updateEventCount pipeline.CounterMetric
deleteEventCount pipeline.CounterMetric
cacheResourceGauge pipeline.GaugeMetric
queueSizeGauge pipeline.GaugeMetric
httpRequestCount pipeline.CounterMetric
httpAvgDelayMs pipeline.CounterMetric
httpMaxDelayMs pipeline.GaugeMetric
}

func GetMetaManagerInstance() *MetaManager {
onceManager.Do(func() {
metaManager = &MetaManager{
stopCh: make(chan struct{}),
eventCh: make(chan *K8sMetaEvent, 1000),
stopCh: make(chan struct{}),
}
metaManager.metadataHandler = newMetadataHandler(metaManager)
metaManager.cacheMap = make(map[string]MetaCache)
for _, resource := range AllResources {
metaManager.cacheMap[resource] = newK8sMetaCache(metaManager.stopCh, resource)
Expand Down Expand Up @@ -84,7 +95,16 @@ func (m *MetaManager) Init(configPath string) (err error) {
return err
}
m.clientset = clientset
m.metricContext = &helper.LocalContext{}

m.metricRecord = pipeline.MetricsRecord{}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里直接初始化?label是什么?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这个是runner级别的,在导出的时候,会填入cluster id,runner name和project。

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

默认的project要带上

m.addEventCount = helper.NewCounterMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaAddEventTotal)
m.updateEventCount = helper.NewCounterMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaUpdateEventTotal)
m.deleteEventCount = helper.NewCounterMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaDeleteEventTotal)
m.cacheResourceGauge = helper.NewGaugeMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaCacheSize)
m.queueSizeGauge = helper.NewGaugeMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaQueueSize)
m.httpRequestCount = helper.NewCounterMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaHTTPRequestTotal)
m.httpAvgDelayMs = helper.NewAverageMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaHTTPAvgDelayMs)
m.httpMaxDelayMs = helper.NewMaxMetricAndRegister(&m.metricRecord, helper.MetricComponentK8sMetaHTTPMaxDelayMs)

go func() {
startTime := time.Now()
Expand Down Expand Up @@ -145,13 +165,28 @@ func (m *MetaManager) UnRegisterSendFunc(configName string, resourceType string)
}
}

func (m *MetaManager) GetMetricContext() pipeline.Context {
return m.metricContext
func GetMetaManagerMetrics() []map[string]string {
manager := GetMetaManagerInstance()
if manager == nil || !manager.IsReady() {
return nil
}
// cache
queueSize := 0
cacheSize := 0
for _, cache := range manager.cacheMap {
queueSize += cache.GetQueueSize()
cacheSize += cache.GetSize()

}
manager.queueSizeGauge.Set(float64(queueSize))
manager.cacheResourceGauge.Set(float64(cacheSize))
return []map[string]string{
manager.metricRecord.ExportMetricRecords(),
}
}

func (m *MetaManager) runServer() {
metadataHandler := newMetadataHandler()
go metadataHandler.K8sServerRun(m.stopCh)
go m.metadataHandler.K8sServerRun(m.stopCh)
}

func isEntity(resourceType string) bool {
Expand Down
44 changes: 44 additions & 0 deletions pkg/helper/self_metrics_agent_constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package helper

import (
"github.com/alibaba/ilogtail/pkg/pipeline"
)

//////////////////////////////////////////////////////////////////////////
// agent
//////////////////////////////////////////////////////////////////////////

// metric keys
const (
MetricAgentMemoryGo = "agent_go_memory_used_mb"
MetricAgentGoRoutinesTotal = "agent_go_routines_total"
)

func GetCommonLabels(context pipeline.Context, pluginMeta *pipeline.PluginMeta) []pipeline.LabelPair {
labels := make([]pipeline.LabelPair, 0)
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyProject, Value: context.GetProject()})
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyLogstore, Value: context.GetLogstore()})
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPipelineName, Value: context.GetConfigName()})

if len(pluginMeta.PluginID) > 0 {
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPluginID, Value: pluginMeta.PluginID})
}
if len(pluginMeta.PluginType) > 0 {
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPluginType, Value: pluginMeta.PluginType})
}
return labels
}
33 changes: 33 additions & 0 deletions pkg/helper/self_metrics_component_constants.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package helper

//////////////////////////////////////////////////////////////////////////
// component
//////////////////////////////////////////////////////////////////////////

/**********************************************************
* k8s meta
**********************************************************/
const (
MetricComponentK8sMetaAddEventTotal = "component_k8s_meta_add_event_total"
MetricComponentK8sMetaUpdateEventTotal = "component_k8s_meta_update_event_total"
MetricComponentK8sMetaDeleteEventTotal = "component_k8s_meta_delete_event_total"
MetricComponentK8sMetaCacheSize = "component_k8s_meta_cache_size"
MetricComponentK8sMetaQueueSize = "component_k8s_meta_queue_size"
MetricComponentK8sMetaHTTPRequestTotal = "component_k8s_meta_http_request_total"
MetricComponentK8sMetaHTTPAvgDelayMs = "component_k8s_meta_avg_delay_ms"
MetricComponentK8sMetaHTTPMaxDelayMs = "component_k8s_meta_max_delay_ms"
)
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,6 @@

package helper

import (
"github.com/alibaba/ilogtail/pkg/pipeline"
)

//////////////////////////////////////////////////////////////////////////
// agent
//////////////////////////////////////////////////////////////////////////

// metric keys
const (
MetricAgentMemoryGo = "agent_go_memory_used_mb"
MetricAgentGoRoutinesTotal = "agent_go_routines_total"
)

//////////////////////////////////////////////////////////////////////////
// plugin
//////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -81,6 +67,7 @@ const (
/**********************************************************
* service_mysql
* service_rdb
* service_kubernetes_meta
**********************************************************/
const (
MetricPluginCollectAvgCostTimeMs = "plugin_collect_avg_cost_time_ms"
Expand All @@ -105,18 +92,3 @@ const (
const (
PluginPairsPerLogTotal = "plugin_pairs_per_log_total"
)

func GetCommonLabels(context pipeline.Context, pluginMeta *pipeline.PluginMeta) []pipeline.LabelPair {
labels := make([]pipeline.LabelPair, 0)
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyProject, Value: context.GetProject()})
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyLogstore, Value: context.GetLogstore()})
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPipelineName, Value: context.GetConfigName()})

if len(pluginMeta.PluginID) > 0 {
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPluginID, Value: pluginMeta.PluginID})
}
if len(pluginMeta.PluginType) > 0 {
labels = append(labels, pipeline.LabelPair{Key: MetricLabelKeyPluginType, Value: pluginMeta.PluginType})
}
return labels
}
Loading
Loading