Skip to content

Commit

Permalink
Support jsonline protocol (#1265)
Browse files Browse the repository at this point in the history
* Support jsonline protocol

* support jsonline protocol
* add UTs
* support field rename and protocol rename
* reuse bytebuf
* use jsoniter instead of encoding/json

* add docs
  • Loading branch information
lujiajing1126 committed Dec 22, 2023
1 parent 8a19fb1 commit 3e0a541
Show file tree
Hide file tree
Showing 5 changed files with 357 additions and 21 deletions.
67 changes: 47 additions & 20 deletions docs/cn/data-pipeline/flusher/flusher-http.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,27 @@

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
|------------------------------|--------------------| -------- |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Type | String || 插件类型,固定为`flusher_http` |
| RemoteURL | String || 要发送到的URL地址,示例:`http://localhost:8086/write` |
| 参数 | 类型 | 是否必选 | 说明 |
|------------------------------|--------------------| -------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Type | String || 插件类型,固定为`flusher_http` |
| RemoteURL | String || 要发送到的URL地址,示例:`http://localhost:8086/write` |
| Headers | Map<String,String> || 发送时附加的http请求header,如可添加 Authorization、Content-Type等信息,支持动态变量写法,如`{"x-db":"%{tag.db}"}`<p>v2版本支持从Group的Metadata或者Group.Tags中获取动态变量,如`{"x-db":"%{metadata.db}"}`或者`{"x-db":"%{tag.db}"}`</p> |
| Query | Map<String,String> || 发送时附加到url上的query参数,支持动态变量写法,如`{"db":"%{tag.db}"}`<p>v2版本支持从Group的Metadata或者Group.Tags中获取动态变量,如`{"db":"%{metadata.db}"}`或者`{"db":"%{tag.db}"}`</p> |
| Timeout | String || 请求的超时时间,默认 `60s` |
| Retry.Enable | Boolean || 是否开启失败重试,默认为 `true` |
| Retry.MaxRetryTimes | Int || 最大重试次数,默认为 `3` |
| Retry.InitialDelay | String || 首次重试时间间隔,默认为 `1s`,重试间隔以会2的倍数递增 |
| Retry.MaxDelay | String || 最大重试时间间隔,默认为 `30s` |
| Convert | Struct || ilogtail数据转换协议配置 |
| Convert.Protocol | String || ilogtail数据转换协议,可选值:`custom_single`,`influxdb`。默认值:`custom_single`<p>v2版本可选值:`raw`</p> |
| Convert.Encoding | String || ilogtail flusher数据转换编码,可选值:`json`, `custom`,默认值:`json` |
| Convert.Separator | String || ilogtail数据转换时,PipelineGroupEvents中多个Events之间拼接使用的分隔符。如`\n`。若不设置,则默认不拼接Events,即每个Event作为独立请求向后发送。 默认值为空。<p>当前仅在`Convert.Protocol: raw`有效。</p> |
| Convert.IgnoreUnExpectedData | Boolean || ilogtail数据转换时,遇到非预期的数据的行为,true 跳过,false 报错。默认值 true |
| Convert.TagFieldsRename | Map<String,String> || 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map<String,String> || ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags``time` |
| Concurrency | Int || 向url发起请求的并发数,默认为`1` |
| QueueCapacity | Int | 否 | 内部channel的缓存大小,默认为1024
| AsyncIntercept | Boolean | 否 | 异步过滤数据,默认为否
| Query | Map<String,String> || 发送时附加到url上的query参数,支持动态变量写法,如`{"db":"%{tag.db}"}`<p>v2版本支持从Group的Metadata或者Group.Tags中获取动态变量,如`{"db":"%{metadata.db}"}`或者`{"db":"%{tag.db}"}`</p> |
| Timeout | String || 请求的超时时间,默认 `60s` |
| Retry.Enable | Boolean || 是否开启失败重试,默认为 `true` |
| Retry.MaxRetryTimes | Int || 最大重试次数,默认为 `3` |
| Retry.InitialDelay | String || 首次重试时间间隔,默认为 `1s`,重试间隔以会2的倍数递增 |
| Retry.MaxDelay | String || 最大重试时间间隔,默认为 `30s` |
| Convert | Struct || ilogtail数据转换协议配置 |
| Convert.Protocol | String || ilogtail数据转换协议,可选值:`custom_single`,`influxdb`, `jsonline`。默认值:`custom_single`<p>v2版本可选值:`raw`</p> |
| Convert.Encoding | String || ilogtail flusher数据转换编码,可选值:`json`, `custom`,默认值:`json` |
| Convert.Separator | String || ilogtail数据转换时,PipelineGroupEvents中多个Events之间拼接使用的分隔符。如`\n`。若不设置,则默认不拼接Events,即每个Event作为独立请求向后发送。 默认值为空。<p>当前仅在`Convert.Protocol: raw`有效。</p> |
| Convert.IgnoreUnExpectedData | Boolean || ilogtail数据转换时,遇到非预期的数据的行为,true 跳过,false 报错。默认值 true |
| Convert.TagFieldsRename | Map<String,String> || 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map<String,String> || ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags``time` |
| Concurrency | Int || 向url发起请求的并发数,默认为`1` |
| QueueCapacity | Int | 否 | 内部channel的缓存大小,默认为1024
| AsyncIntercept | Boolean | 否 | 异步过滤数据,默认为否

## 样例

Expand All @@ -52,3 +52,30 @@ flushers:
Protocol: custom_single
Encoding: json
```
采集Docker日志,并将采集结果以`jsonline`协议发送到`http://localhost:9428/insert/jsonline`。

```yaml
enable: true
inputs:
- Type: service_docker_stdout
Stderr: true
Stdout: true
processors:
- Type: processor_json
SourceKey: content
KeepSource: true
ExpandDepth: 1
ExpandConnector: ""
KeepSourceIfParseError: true
flushers:
- Type: flusher_http
RemoteURL: http://localhsot:9428/insert/jsonline
QueueCapacity: 64
Convert:
Protocol: jsonline
Encoding: json
```

需要注意的是,由于使用`jsonline`协议(会将日志的content和tag打平),所以仅支持使用`json`格式进行提交。
由于`jsonline`默认会批量提交日志,所以建议调低`QueueCapacity`,避免在日志量较大的情况下,发生内存占用过多或OOM的问题。
6 changes: 6 additions & 0 deletions pkg/protocol/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
ProtocolCustomSingleFlatten = "custom_single_flatten"
ProtocolOtlpV1 = "otlp_v1"
ProtocolInfluxdb = "influxdb"
ProtocolJsonline = "jsonline"
ProtocolRaw = "raw"
)

Expand Down Expand Up @@ -110,6 +111,9 @@ var supportedEncodingMap = map[string]map[string]bool{
ProtocolInfluxdb: {
EncodingCustom: true,
},
ProtocolJsonline: {
EncodingJSON: true,
},
ProtocolRaw: {
EncodingCustom: true,
},
Expand Down Expand Up @@ -181,6 +185,8 @@ func (c *Converter) ToByteStreamWithSelectedFields(logGroup *protocol.LogGroup,
return c.ConvertToSingleProtocolStreamFlatten(logGroup, targetFields)
case ProtocolInfluxdb:
return c.ConvertToInfluxdbProtocolStream(logGroup, targetFields)
case ProtocolJsonline:
return c.ConvertToJsonlineProtocolStreamFlatten(logGroup)
default:
return nil, nil, fmt.Errorf("unsupported protocol: %s", c.Protocol)
}
Expand Down
57 changes: 57 additions & 0 deletions pkg/protocol/converter/jsonline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package protocol

import (
"bytes"
"fmt"

jsoniter "github.com/json-iterator/go"

"github.com/alibaba/ilogtail/pkg/protocol"
)

var (
sep = []byte("\n")
)

func (c *Converter) ConvertToJsonlineProtocolStreamFlatten(logGroup *protocol.LogGroup) ([]byte, []map[string]string, error) {
convertedLogs, _, err := c.ConvertToSingleProtocolLogsFlatten(logGroup, nil)
if err != nil {
return nil, nil, err
}
joinedStream := *GetPooledByteBuf()
for i, log := range convertedLogs {
switch c.Encoding {
case EncodingJSON:
var err error
joinedStream, err = marshalWithoutHTMLEscapedWithoutAlloc(log, bytes.NewBuffer(joinedStream))
if err != nil {
// release byte buffer
PutPooledByteBuf(&joinedStream)
return nil, nil, fmt.Errorf("unable to marshal log: %v", log)
}
// trim and append a \n
joinedStream = trimRightByte(joinedStream, sep[0])
if i < len(convertedLogs)-1 {
joinedStream = append(joinedStream, sep[0])
}
default:
return nil, nil, fmt.Errorf("unsupported encoding format: %s", c.Encoding)
}
}
return joinedStream, nil, nil
}

func marshalWithoutHTMLEscapedWithoutAlloc(data interface{}, bf *bytes.Buffer) ([]byte, error) {
enc := jsoniter.ConfigFastest.NewEncoder(bf)
if err := enc.Encode(data); err != nil {
return nil, err
}
return bf.Bytes(), nil
}

func trimRightByte(s []byte, c byte) []byte {
for len(s) > 0 && s[len(s)-1] == c {
s = s[:len(s)-1]
}
return s
}
Loading

0 comments on commit 3e0a541

Please sign in to comment.