Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow to discard pipeline events when flusher_http's queue is full #1220

Closed
wants to merge 16 commits into from
Closed
4 changes: 2 additions & 2 deletions docs/cn/data-pipeline/flusher/flusher-http.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
| Convert.TagFieldsRename | Map<String,String> | 否 | 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map<String,String> | 否 | ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags`和`time` |
| Concurrency | Int | 否 | 向url发起请求的并发数,默认为`1` |
| QueueCapacity | Int | 否 | 内部channel的缓存大小,默认为1024
| AsyncIntercept | Boolean | 否 | 异步过滤数据,默认为否
| QueueCapacity | Int | 否 | 内部channel的缓存大小,默认为1024 |
| AsyncIntercept | Boolean | 否 | 异步过滤数据,默认为否 |

## 样例

Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion pkg/helper/dumper.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ import (
"sync"
"time"

"github.com/alibaba/ilogtail/pkg/helper/async"
"github.com/alibaba/ilogtail/pkg/logger"
"github.com/alibaba/ilogtail/pkg/util"
"github.com/alibaba/ilogtail/plugins/test/async"
)

type DumpDataReq struct {
Expand Down
105 changes: 90 additions & 15 deletions plugins/flusher/http/flusher_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"math/big"
"net/http"
"net/url"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -50,6 +51,10 @@ var contentTypeMaps = map[string]string{
converter.EncodingCustom: defaultContentType,
}

var (
sensitiveLabels = []string{"u", "user", "username", "p", "password", "passwd", "pwd"}
)

type retryConfig struct {
Enable bool // If enable retry, default is true
MaxRetryTimes int // Max retry times, default is 3
Expand All @@ -58,18 +63,19 @@ type retryConfig struct {
}

type FlusherHTTP struct {
RemoteURL string // RemoteURL to request
Headers map[string]string // Headers to append to the http request
Query map[string]string // Query parameters to append to the http request
Timeout time.Duration // Request timeout, default is 60s
Retry retryConfig // Retry strategy, default is retry 3 times with delay time begin from 1second, max to 30 seconds
Convert helper.ConvertConfig // Convert defines which protocol and format to convert to
Concurrency int // How many requests can be performed in concurrent
Authenticator *extensions.ExtensionConfig // name and options of the extensions.ClientAuthenticator extension to use
FlushInterceptor *extensions.ExtensionConfig // name and options of the extensions.FlushInterceptor extension to use
AsyncIntercept bool // intercept the event asynchronously
RequestInterceptors []extensions.ExtensionConfig // custom request interceptor settings
QueueCapacity int // capacity of channel
RemoteURL string // RemoteURL to request
Headers map[string]string // Headers to append to the http request
Query map[string]string // Query parameters to append to the http request
Timeout time.Duration // Request timeout, default is 60s
Retry retryConfig // Retry strategy, default is retry 3 times with delay time begin from 1second, max to 30 seconds
Convert helper.ConvertConfig // Convert defines which protocol and format to convert to
Concurrency int // How many requests can be performed in concurrent
Authenticator *extensions.ExtensionConfig // name and options of the extensions.ClientAuthenticator extension to use
FlushInterceptor *extensions.ExtensionConfig // name and options of the extensions.FlushInterceptor extension to use
AsyncIntercept bool // intercept the event asynchronously
RequestInterceptors []extensions.ExtensionConfig // custom request interceptor settings
QueueCapacity int // capacity of channel
DropEventWhenQueueFull bool // If true, pipeline events will be dropped when the queue is full

varKeys []string

Expand All @@ -78,8 +84,12 @@ type FlusherHTTP struct {
client *http.Client
interceptor extensions.FlushInterceptor

queue chan interface{}
counter sync.WaitGroup
queue chan interface{}
counter sync.WaitGroup
droppedGroups pipeline.CounterMetric
droppedEvents pipeline.CounterMetric
retryCounts pipeline.CounterMetric
flushLatency pipeline.CounterMetric
}

func (f *FlusherHTTP) Description() string {
Expand Down Expand Up @@ -140,6 +150,17 @@ func (f *FlusherHTTP) Init(context pipeline.Context) error {
f.buildVarKeys()
f.fillRequestContentType()

metricLabels := f.buildLabels()
f.droppedGroups = helper.NewCounterMetric("http_flusher_dropped_groups", metricLabels...)
f.droppedEvents = helper.NewCounterMetric("http_flusher_dropped_events", metricLabels...)
f.retryCounts = helper.NewCounterMetric("http_flusher_retry_counts", metricLabels...)
f.flushLatency = helper.NewAverageMetric("http_flusher_flush_latency_ns", metricLabels...) // cannot use latency metric

context.RegisterCounterMetric(f.droppedGroups)
context.RegisterCounterMetric(f.droppedEvents)
context.RegisterCounterMetric(f.retryCounts)
context.RegisterCounterMetric(f.flushLatency)

logger.Info(f.context.GetRuntimeContext(), "http flusher init", "initialized")
return nil
}
Expand Down Expand Up @@ -251,7 +272,34 @@ func (f *FlusherHTTP) getConverter() (*converter.Converter, error) {

func (f *FlusherHTTP) addTask(log interface{}) {
f.counter.Add(1)
f.queue <- log
if f.DropEventWhenQueueFull {
select {
case f.queue <- log:
default:
f.handleDroppedEvent(log)
}
} else {
f.queue <- log
}
}

// handleDroppedEvent handles a dropped event and reports metrics.
func (f *FlusherHTTP) handleDroppedEvent(log interface{}) {
f.counter.Done()
f.droppedGroups.Add(1)

// Update the dropped events counter based on the type of the log.
switch v := log.(type) {
case *protocol.LogGroup:
if v != nil {
f.droppedEvents.Add(int64(len(v.Logs)))
}
case *models.PipelineGroupEvents:
if v != nil {
f.droppedEvents.Add(int64(len(v.Events)))
}
}
logger.Warningf(f.context.GetRuntimeContext(), "FLUSHER_FLUSH_ALARM", "http flusher dropped a group event since the queue is full")
}

func (f *FlusherHTTP) countDownTask() {
Expand Down Expand Up @@ -316,6 +364,11 @@ func (f *FlusherHTTP) convertAndFlush(data interface{}) error {

func (f *FlusherHTTP) flushWithRetry(data []byte, varValues map[string]string) error {
var err error
start := time.Now()
defer func() {
f.flushLatency.Add(time.Since(start).Nanoseconds())
}()

for i := 0; i <= f.Retry.MaxRetryTimes; i++ {
ok, retryable, e := f.flush(data, varValues)
if ok || !retryable || !f.Retry.Enable {
Expand All @@ -324,6 +377,7 @@ func (f *FlusherHTTP) flushWithRetry(data []byte, varValues map[string]string) e
}
err = e
<-time.After(f.getNextRetryDelay(i))
f.retryCounts.Add(1)
}
converter.PutPooledByteBuf(&data)
return err
Expand Down Expand Up @@ -422,6 +476,26 @@ func (f *FlusherHTTP) flush(data []byte, varValues map[string]string) (ok, retry
}
}

func (f *FlusherHTTP) buildLabels() []*protocol.Log_Content {
labels := make([]*protocol.Log_Content, 0, len(f.Headers)+1)
labels = append(labels, &protocol.Log_Content{Key: "RemoteURL", Value: f.RemoteURL})
for k, v := range f.Query {
if !isSensitiveKey(k) {
labels = append(labels, &protocol.Log_Content{Key: k, Value: v})
}
}
return labels
}

func isSensitiveKey(label string) bool {
for _, sensitiveKey := range sensitiveLabels {
if strings.ToLower(label) == sensitiveKey {
return true
}
}
return false
}

func (f *FlusherHTTP) buildVarKeys() {
cache := map[string]struct{}{}
defines := []map[string]string{f.Query, f.Headers}
Expand Down Expand Up @@ -479,6 +553,7 @@ func init() {
InitialDelay: time.Second,
MaxDelay: 30 * time.Second,
},
DropEventWhenQueueFull: true,
}
}
}
78 changes: 78 additions & 0 deletions plugins/flusher/http/flusher_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,6 +734,78 @@ func TestHttpFlusherFlushWithInterceptor(t *testing.T) {
})
}

func TestHttpFlusherDropEvents(t *testing.T) {
Convey("Given a http flusher that drops events when queue is full", t, func() {
mockIntercepter := &mockInterceptor{}
flusher := &FlusherHTTP{
RemoteURL: "http://test.com/write",
Convert: helper.ConvertConfig{
Protocol: converter.ProtocolInfluxdb,
Encoding: converter.EncodingCustom,
},
interceptor: mockIntercepter,
context: mock.NewEmptyContext("p", "l", "c"),
AsyncIntercept: true,
Timeout: defaultTimeout,
Concurrency: 1,
queue: make(chan interface{}, 1),
DropEventWhenQueueFull: true,
}

metricLabels := flusher.buildLabels()
flusher.droppedGroups = helper.NewCounterMetric("http_flusher_dropped_groups", metricLabels...)
flusher.droppedEvents = helper.NewCounterMetric("http_flusher_dropped_events", metricLabels...)
flusher.retryCounts = helper.NewCounterMetric("http_flusher_retry_counts", metricLabels...)
flusher.flushLatency = helper.NewAverageMetric("http_flusher_flush_latency_ns", metricLabels...)

Convey("should discard events when queue is full", func() {
groupEvents := models.PipelineGroupEvents{
Events: []models.PipelineEvent{&models.Metric{
Name: "cpu.load.short",
Timestamp: 1672321328000000000,
Tags: models.NewTagsWithKeyValues("host", "server01", "region", "cn"),
Value: &models.MetricSingleValue{Value: 0.64},
}},
}
err := flusher.Export([]*models.PipelineGroupEvents{&groupEvents}, nil)
So(err, ShouldBeNil)
err = flusher.Export([]*models.PipelineGroupEvents{&groupEvents}, nil)
So(err, ShouldBeNil)
So(len(flusher.queue), ShouldEqual, 1)
err = flusher.convertAndFlush(<-flusher.queue)
So(err, ShouldBeNil)
log := &protocol.Log{}
flusher.context.MetricSerializeToPB(log)
So(len(log.Contents), ShouldBeGreaterThanOrEqualTo, 1)
})
})
}

func TestBuildLabels(t *testing.T) {
flusher := &FlusherHTTP{
RemoteURL: "http://example.com",
Query: map[string]string{
"u": "user",
"password": "secret",
"status": "active",
},
}

expectedLabels := []*protocol.Log_Content{
{Key: "RemoteURL", Value: "http://example.com"},
{Key: "status", Value: "active"},
}

labels := flusher.buildLabels()

assert.Equal(t, len(expectedLabels), len(labels))

for i, expected := range expectedLabels {
assert.Equal(t, expected.Key, labels[i].Key)
assert.Equal(t, expected.Value, labels[i].Value)
}
}

type mockContext struct {
pipeline.Context
basicAuth *basicAuth
Expand All @@ -751,6 +823,12 @@ func (c mockContext) GetExtension(name string, cfg any) (pipeline.Extension, err
return nil, fmt.Errorf("basicAuth not set")
}

func (c mockContext) RegisterCounterMetric(metric pipeline.CounterMetric) {
}

func (c mockContext) RegisterLatencyMetric(metric pipeline.LatencyMetric) {
}

func (c mockContext) GetConfigName() string {
return "ctx"
}
Expand Down
Loading