Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add K8s meta self metrics #1765

Merged
merged 29 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,24 @@
#include "common/version.h"
#include "config/ConfigDiff.h"
#include "config/watcher/ConfigWatcher.h"
#include "file_server/EventDispatcher.h"
#include "file_server/event_handler/LogInput.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/FileServer.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "file_server/event_handler/LogInput.h"
#include "go_pipeline/LogtailPlugin.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/MetricExportor.h"
#include "monitor/Monitor.h"
#include "pipeline/InstanceConfigManager.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/plugin/PluginRegistry.h"
#include "runner/LogProcess.h"
#include "pipeline/queue/ExactlyOnceQueueManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "runner/FlusherRunner.h"
#include "runner/LogProcess.h"
#include "runner/sink/http/HttpSink.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
Expand Down Expand Up @@ -272,6 +272,13 @@ void Application::Start() { // GCOVR_EXCL_START
LogtailPlugin::GetInstance()->LoadPluginBase();
}

const char* deployMode = getenv("DEPLOY_MODE");
const char* enableK8sMeta = getenv("ENABLE_KUBERNETES_META");
if (deployMode != NULL && strlen(deployMode) > 0 && strcmp(deployMode, "singleton") == 0
&& strcmp(enableK8sMeta, "true") == 0) {
LogtailPlugin::GetInstance()->LoadPluginBase();
}

LogProcess::GetInstance()->Start();

time_t curTime = 0, lastProfilingCheckTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0,
Expand Down
3 changes: 3 additions & 0 deletions pkg/helper/k8smeta/k8s_meta_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ func (m *k8sMetaCache) watch(stopCh <-chan struct{}) {
LastObservedTime: nowTime,
},
}
metaManager.AddEventCount()
},
UpdateFunc: func(oldObj interface{}, obj interface{}) {
nowTime := time.Now().Unix()
Expand All @@ -98,6 +99,7 @@ func (m *k8sMetaCache) watch(stopCh <-chan struct{}) {
LastObservedTime: nowTime,
},
}
metaManager.UpdateEventCount()
},
DeleteFunc: func(obj interface{}) {
m.eventCh <- &K8sMetaEvent{
Expand All @@ -108,6 +110,7 @@ func (m *k8sMetaCache) watch(stopCh <-chan struct{}) {
LastObservedTime: time.Now().Unix(),
},
}
metaManager.DeleteEventCount()
},
})
go factory.Start(stopCh)
Expand Down
45 changes: 40 additions & 5 deletions pkg/helper/k8smeta/k8s_meta_http_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"
"strconv"
"strings"
"sync/atomic"
"time"

app "k8s.io/api/apps/v1"
Expand All @@ -21,11 +22,16 @@ type requestBody struct {

type metadataHandler struct {
metaManager *MetaManager

// self metrics
requestCount atomic.Int64
totalLatency atomic.Int64
maxLatency atomic.Int64
}

func newMetadataHandler() *metadataHandler {
func newMetadataHandler(metaManager *MetaManager) *metadataHandler {
metadataHandler := &metadataHandler{
metaManager: GetMetaManagerInstance(),
metaManager: metaManager,
}
return metadataHandler
}
Expand All @@ -46,9 +52,9 @@ func (m *metadataHandler) K8sServerRun(stopCh <-chan struct{}) error {
mux := http.NewServeMux()

// TODO: add port in ip endpoint
mux.HandleFunc("/metadata/ip", m.handlePodMetaByUniqueID)
mux.HandleFunc("/metadata/containerid", m.handlePodMetaByUniqueID)
mux.HandleFunc("/metadata/host", m.handlePodMetaByHostIP)
mux.HandleFunc("/metadata/ip", m.handler(m.handlePodMetaByUniqueID))
mux.HandleFunc("/metadata/containerid", m.handler(m.handlePodMetaByUniqueID))
mux.HandleFunc("/metadata/host", m.handler(m.handlePodMetaByHostIP))
server.Handler = mux
for {
if m.metaManager.IsReady() {
Expand All @@ -66,6 +72,35 @@ func (m *metadataHandler) K8sServerRun(stopCh <-chan struct{}) error {
return nil
}

func (m *metadataHandler) GetMetrics() map[string]string {
avgLatency := "0"
if m.requestCount.Load() != 0 {
avgLatency = strconv.FormatFloat(float64(m.totalLatency.Load())/float64(m.requestCount.Load()), 'f', -1, 64)
}
metrics := map[string]string{
"value.k8s_meta_http_request_count": strconv.FormatInt(m.requestCount.Load(), 10),
"value.k8s_meta_http_avg_latency": avgLatency,
"value.k8s_meta_http_max_latency": strconv.FormatInt(m.maxLatency.Load(), 10),
}
m.requestCount.Add(-m.requestCount.Load())
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
m.totalLatency.Add(-m.totalLatency.Load())
m.maxLatency.Store(0)
return metrics
}

func (m *metadataHandler) handler(handleFunc func(w http.ResponseWriter, r *http.Request)) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
m.requestCount.Add(1)
handleFunc(w, r)
latency := time.Since(startTime).Milliseconds()
m.totalLatency.Add(latency)
if latency > m.maxLatency.Load() {
m.maxLatency.Store(latency)
}
}
}

func (m *metadataHandler) handlePodMetaByUniqueID(w http.ResponseWriter, r *http.Request) {
defer r.Body.Close()
var rBody requestBody
Expand Down
62 changes: 50 additions & 12 deletions pkg/helper/k8smeta/k8s_meta_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ import (
"k8s.io/client-go/tools/clientcmd"
controllerConfig "sigs.k8s.io/controller-runtime/pkg/client/config"

"github.com/alibaba/ilogtail/pkg/helper"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/pipeline"
)

var metaManager *MetaManager
Expand All @@ -40,23 +38,27 @@ type MetaManager struct {
clientset *kubernetes.Clientset
stopCh chan struct{}

eventCh chan *K8sMetaEvent
ready atomic.Bool
ready atomic.Bool

metadataHandler *metadataHandler
cacheMap map[string]MetaCache
linkGenerator *LinkGenerator
linkRegisterMap map[string][]string
linkRegisterLock sync.RWMutex

metricContext pipeline.Context
// self metrics
addEventCount atomic.Int64
updateEventCount atomic.Int64
deleteEventCount atomic.Int64
cacheResourceCount atomic.Int64
}

func GetMetaManagerInstance() *MetaManager {
onceManager.Do(func() {
metaManager = &MetaManager{
stopCh: make(chan struct{}),
eventCh: make(chan *K8sMetaEvent, 1000),
stopCh: make(chan struct{}),
}
metaManager.metadataHandler = newMetadataHandler(metaManager)
metaManager.cacheMap = make(map[string]MetaCache)
for _, resource := range AllResources {
metaManager.cacheMap[resource] = newK8sMetaCache(metaManager.stopCh, resource)
Expand Down Expand Up @@ -84,7 +86,6 @@ func (m *MetaManager) Init(configPath string) (err error) {
return err
}
m.clientset = clientset
m.metricContext = &helper.LocalContext{}

go func() {
startTime := time.Now()
Expand Down Expand Up @@ -145,13 +146,50 @@ func (m *MetaManager) UnRegisterSendFunc(configName string, resourceType string)
}
}

func (m *MetaManager) GetMetricContext() pipeline.Context {
return m.metricContext
func GetMetaManagerMetrics() []map[string]string {
manager := GetMetaManagerInstance()
if manager == nil || !manager.IsReady() {
return nil
}
metrics := make([]map[string]string, 0)
// cache
queueLen := 0
for _, cache := range manager.cacheMap {
queueLen += len(cache.(*k8sMetaCache).eventCh)
}
metrics = append(metrics, map[string]string{
"value.k8s_meta_add_event_count": fmt.Sprintf("%d", manager.addEventCount.Load()),
"value.k8s_meta_update_event_count": fmt.Sprintf("%d", manager.updateEventCount.Load()),
"value.k8s_meta_delete_event_count": fmt.Sprintf("%d", manager.deleteEventCount.Load()),
"value.k8s_meta_cache_resource_count": fmt.Sprintf("%d", manager.cacheResourceCount.Load()),
"value.k8s_meta_event_queue_len_total": fmt.Sprintf("%d", queueLen),
})
manager.addEventCount.Add(-manager.addEventCount.Load())
manager.updateEventCount.Add(-manager.updateEventCount.Load())
manager.deleteEventCount.Add(-manager.deleteEventCount.Load())

// http server
httpServerMetrics := manager.metadataHandler.GetMetrics()
metrics = append(metrics, httpServerMetrics)
return metrics
}

func (m *MetaManager) AddEventCount() {
m.addEventCount.Add(1)
m.cacheResourceCount.Add(1)
}

func (m *MetaManager) UpdateEventCount() {
m.updateEventCount.Add(1)
}

func (m *MetaManager) DeleteEventCount() {
m.deleteEventCount.Add(1)
m.cacheResourceCount.Add(-1)
}

func (m *MetaManager) runServer() {
metadataHandler := newMetadataHandler()
go metadataHandler.K8sServerRun(m.stopCh)
go m.metadataHandler.K8sServerRun(m.stopCh)
}

func isEntity(resourceType string) bool {
Expand Down
4 changes: 4 additions & 0 deletions pluginmanager/metric_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
goruntimemetrics "runtime/metrics"
"strconv"
"strings"

"github.com/alibaba/ilogtail/pkg/helper/k8smeta"
)

const (
Expand Down Expand Up @@ -50,6 +52,8 @@ func GetGoDirectMetrics() []map[string]string {
metrics := make([]map[string]string, 0)
// go plugin metrics
metrics = append(metrics, GetGoPluginMetrics()...)
// k8s meta metrics
metrics = append(metrics, k8smeta.GetMetaManagerMetrics()...)
return metrics
}

Expand Down
37 changes: 22 additions & 15 deletions plugins/input/kubernetesmetav2/meta_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ func (m *metaCollector) processEntityCommonPart(logContents models.LogContents,
// entity reserved fields
logContents.Add(entityDomainFieldName, m.serviceK8sMeta.Domain)
logContents.Add(entityTypeFieldName, m.genEntityTypeKey(kind))
logContents.Add(entityIDFieldName, m.genKey(namespace, name))
logContents.Add(entityIDFieldName, m.genKey(kind, namespace, name))
logContents.Add(entityMethodFieldName, method)

logContents.Add(entityFirstObservedTimeFieldName, strconv.FormatInt(firstObservedTime, 10))
Expand All @@ -264,19 +264,18 @@ func (m *metaCollector) processEntityCommonPart(logContents models.LogContents,
func (m *metaCollector) processEntityLinkCommonPart(logContents models.LogContents, srcKind, srcNamespace, srcName, destKind, destNamespace, destName, method string, firstObservedTime, lastObservedTime int64) {
logContents.Add(entityLinkSrcDomainFieldName, m.serviceK8sMeta.Domain)
logContents.Add(entityLinkSrcEntityTypeFieldName, m.genEntityTypeKey(srcKind))
logContents.Add(entityLinkSrcEntityIDFieldName, m.genKey(srcNamespace, srcName))
logContents.Add(entityLinkSrcEntityIDFieldName, m.genKey(srcKind, srcNamespace, srcName))

logContents.Add(entityLinkDestDomainFieldName, m.serviceK8sMeta.Domain)
logContents.Add(entityLinkDestEntityTypeFieldName, m.genEntityTypeKey(destKind))
logContents.Add(entityLinkDestEntityIDFieldName, m.genKey(destNamespace, destName))
logContents.Add(entityLinkDestEntityIDFieldName, m.genKey(destKind, destNamespace, destName))

logContents.Add(entityMethodFieldName, method)

logContents.Add(entityFirstObservedTimeFieldName, strconv.FormatInt(firstObservedTime, 10))
logContents.Add(entityLastObservedTimeFieldName, strconv.FormatInt(lastObservedTime, 10))
logContents.Add(entityKeepAliveSecondsFieldName, strconv.FormatInt(int64(m.serviceK8sMeta.Interval*2), 10))
logContents.Add(entityCategoryFieldName, defaultEntityLinkCategory)
logContents.Add(entityClusterIDFieldName, m.serviceK8sMeta.clusterID)
}

func (m *metaCollector) processEntityJSONObject(obj map[string]string) string {
Expand Down Expand Up @@ -331,25 +330,29 @@ func (m *metaCollector) sendInBackground() {
entityGroup.Events = append(entityGroup.Events, e)
if len(entityGroup.Events) >= 100 {
sendFunc(entityGroup)
m.serviceK8sMeta.entityCount.Add(int64(len(entityGroup.Events)))
}
case e := <-m.entityLinkBuffer:
entityLinkGroup.Events = append(entityLinkGroup.Events, e)
if len(entityLinkGroup.Events) >= 100 {
sendFunc(entityLinkGroup)
m.serviceK8sMeta.linkCount.Add(int64(len(entityLinkGroup.Events)))
}
case <-time.After(3 * time.Second):
if len(entityGroup.Events) > 0 {
sendFunc(entityGroup)
m.serviceK8sMeta.entityCount.Add(int64(len(entityGroup.Events)))
}
if len(entityLinkGroup.Events) > 0 {
sendFunc(entityLinkGroup)
m.serviceK8sMeta.linkCount.Add(int64(len(entityLinkGroup.Events)))
}
case <-m.stopCh:
return
}
if time.Since(lastSendClusterTime) > time.Duration(m.serviceK8sMeta.Interval)*time.Second {
// send cluster entity if in infra domain
if m.serviceK8sMeta.Domain == "infra" {
if m.serviceK8sMeta.Domain == infraDomain {
clusterEntity := m.generateClusterEntity()
m.collector.AddRawLog(convertPipelineEvent2Log(clusterEntity))
lastSendClusterTime = time.Now()
Expand All @@ -358,8 +361,8 @@ func (m *metaCollector) sendInBackground() {
}
}

func (m *metaCollector) genKey(namespace, name string) string {
key := m.serviceK8sMeta.clusterID + namespace + name
func (m *metaCollector) genKey(kind, namespace, name string) string {
key := m.serviceK8sMeta.clusterID + kind + namespace + name
// #nosec G401
return fmt.Sprintf("%x", md5.Sum([]byte(key)))
}
Expand All @@ -369,8 +372,8 @@ func (m *metaCollector) generateClusterEntity() models.PipelineEvent {
log.Contents = models.NewLogContents()
log.Timestamp = uint64(time.Now().Unix())
log.Contents.Add(entityDomainFieldName, m.serviceK8sMeta.Domain)
log.Contents.Add(entityTypeFieldName, "infra.k8s.cluster")
log.Contents.Add(entityIDFieldName, m.genKey("", ""))
log.Contents.Add(entityTypeFieldName, m.genEntityTypeKey("cluster"))
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
log.Contents.Add(entityIDFieldName, m.genKey("", "", ""))
log.Contents.Add(entityMethodFieldName, "Update")
log.Contents.Add(entityFirstObservedTimeFieldName, strconv.FormatInt(time.Now().Unix(), 10))
log.Contents.Add(entityLastObservedTimeFieldName, strconv.FormatInt(time.Now().Unix(), 10))
Expand All @@ -389,8 +392,8 @@ func (m *metaCollector) generateEntityClusterLink(entityEvent models.PipelineEve
log.Contents.Add(entityLinkSrcEntityIDFieldName, content.Get(entityIDFieldName))

log.Contents.Add(entityLinkDestDomainFieldName, m.serviceK8sMeta.Domain)
log.Contents.Add(entityLinkDestEntityTypeFieldName, "ack.cluster")
log.Contents.Add(entityLinkDestEntityIDFieldName, m.serviceK8sMeta.clusterID)
log.Contents.Add(entityLinkDestEntityTypeFieldName, m.genEntityTypeKey("cluster"))
log.Contents.Add(entityLinkDestEntityIDFieldName, m.genKey("", "", ""))

log.Contents.Add(entityLinkRelationTypeFieldName, "runs")
log.Contents.Add(entityMethodFieldName, content.Get(entityMethodFieldName))
Expand All @@ -399,15 +402,19 @@ func (m *metaCollector) generateEntityClusterLink(entityEvent models.PipelineEve
log.Contents.Add(entityLastObservedTimeFieldName, content.Get(entityLastObservedTimeFieldName))
log.Contents.Add(entityKeepAliveSecondsFieldName, m.serviceK8sMeta.Interval*2)
log.Contents.Add(entityCategoryFieldName, defaultEntityLinkCategory)
log.Contents.Add(entityClusterIDFieldName, m.serviceK8sMeta.clusterID)
log.Timestamp = uint64(time.Now().Unix())
return log
}

func (m *metaCollector) genEntityTypeKey(kind string) string {
prefix := ""
if p, ok := DomainEntityTypePrefix[m.serviceK8sMeta.Domain]; ok {
prefix = p
var prefix string
switch {
case kind == "":
prefix = "k8s."
case kind == "cluster" && m.serviceK8sMeta.Domain == acsDomain:
prefix = m.serviceK8sMeta.Domain + ".ack."
default:
prefix = m.serviceK8sMeta.Domain + ".k8s."
}
return fmt.Sprintf("%s%s", prefix, strings.ToLower(kind))
}
Expand Down
8 changes: 4 additions & 4 deletions plugins/input/kubernetesmetav2/meta_collector_const.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
entityLinkRelationTypeFieldName = "__relation_type__"
)

var DomainEntityTypePrefix = map[string]string{
"acs": "acs.ack.cluster.",
"infra": "infra.k8s.cluster.",
}
const (
acsDomain = "acs"
infraDomain = "infra"
)
Loading
Loading