Skip to content

Commit

Permalink
Fix dropping JVM metrics when collecting multiple skywalking instance…
Browse files Browse the repository at this point in the history
…s's data (#1163)

* fix missing skywalking metrics issue

* add changelog

* update changelog

---------

Co-authored-by: Tom Yu <[email protected]>
  • Loading branch information
qiansheng91 and yyuuttaaoo committed Nov 8, 2023
1 parent cdd2a07 commit f110c69
Show file tree
Hide file tree
Showing 4 changed files with 14 additions and 11 deletions.
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ your changes, such as:
- [public] [both] [updated] add a new feature

## [Unreleased]
- [public] [both] [fixed] fix dropping jvm metrics when collecting multiple skywalking instances's data
- [public] [both] [fixed] fix elasticsearch flusher authentication tls config and http config
- [public] [both] [fixed] fix profiling wrong type when the different profiling type having same stack.
- [public] [both] [added] add UsingOldContentTag. When UsingOldContentTag is set to false, the Tag is now placed in the Meta instead of Logs during cgo.
- [public] [both] [fixed] fix send local buffer failed when upgrade iLogtail from version earlier than 1.3.
- [public] [both] [updated] Updated strptime_ns to parse %c format from "%x %X" to "%a %b %d %H:%M:%S %Y" for consistent behavior with striptime.
- [public] [both] [fixed] fix topic key does not support underscore.
- [public] [both] [fixed] fix jmxfetch status error when exist multi jmxfetch config in the same machine.
- [public] [both] [fixed] fix user container wss increase problem.
- [public] [both] [fixed] fix increasing WSS memory issue in collected containers.
- [public] [both] [fixed] fix cannot log blacklist config error
2 changes: 2 additions & 0 deletions plugins/input/skywalkingv2/jvm_metric_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"github.com/alibaba/ilogtail/pkg/helper"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/pipeline"
"github.com/alibaba/ilogtail/pkg/protocol"
"github.com/alibaba/ilogtail/plugins/input/skywalkingv2/skywalking/apm/network/language/agent"
Expand All @@ -35,6 +36,7 @@ func (j *JVMMetricServiceHandle) Collect(ctx context.Context, metrics *agent.JVM
applicationInstance, ok := j.RegistryInformationCache.findApplicationInstanceRegistryInfo(metrics.ApplicationInstanceId)

if !ok {
logger.Warning(j.context.GetRuntimeContext(), "Failed to find application instance registry info", "applicationInstanceId", metrics.ApplicationInstanceId)
return &agent.Downstream{}, nil
}

Expand Down
16 changes: 9 additions & 7 deletions plugins/input/skywalkingv3/jvm_metric_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,15 @@ type JVMMetricHandler struct {
func (h *JVMMetricHandler) Collect(ctx context.Context, req *skywalking.JVMMetricCollection) (*v3.Commands, error) {
defer panicRecover()
for _, metric := range req.GetMetrics() {
fixTime := metric.Time - metric.Time%1000
// 原始数据上传过于频繁, 不需要
if fixTime-h.lastTime >= h.interval {
h.lastTime = fixTime
} else {
return &v3.Commands{}, nil
}
// 在充当Collector角色(一对多个实例)时,将会出现问题,因为每个实例的时间不一致,所以这里不做时间过滤
// fixTime := metric.Time - metric.Time%1000
// // 原始数据上传过于频繁, 不需要
// if fixTime-h.lastTime >= h.interval {
// h.lastTime = fixTime
// } else {
// logger.Warning(h.context.GetRuntimeContext(), "", "lastTime", h.lastTime, "fixTime", fixTime, "interval", h.interval)
// return &v3.Commands{}, nil
// }
logs := h.toMetricStoreFormat(metric, req.Service, req.ServiceInstance)
for _, log := range logs {
h.collector.AddRawLog(log)
Expand Down
3 changes: 0 additions & 3 deletions plugins/input/skywalkingv3/management_handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,6 @@ func (r *ResourcePropertiesCache) save(ctx pipeline.Context) {
err := ctx.SaveCheckPoint(r.cacheKey, jsonBytes)
if err != nil {
logger.Error(ctx.GetRuntimeContext(), "SKYWALKING_SAVE_CHECKPOINT_FAIL", "err", err.Error())
} else {
logger.Info(ctx.GetRuntimeContext(), "msg", "skywalking save checkpoint done")
}
}

Expand All @@ -92,7 +90,6 @@ func (r *ResourcePropertiesCache) load(ctx pipeline.Context) bool {
logger.Error(ctx.GetRuntimeContext(), "SKYWALKING_LOAD_CHECKPOINT_FAIL", "err", err.Error())
return false
}
logger.Info(ctx.GetRuntimeContext(), "msg", "skywalking load checkpoint done", "count", len(r.cache))
}
return true
}
Expand Down

0 comments on commit f110c69

Please sign in to comment.