Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support jsonline protocol #1265

Merged
merged 2 commits into from
Dec 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 47 additions & 20 deletions docs/cn/data-pipeline/flusher/flusher-http.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,27 @@

## 配置参数

| 参数 | 类型 | 是否必选 | 说明 |
|------------------------------|--------------------| -------- |--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Type | String || 插件类型,固定为`flusher_http` |
| RemoteURL | String || 要发送到的URL地址,示例:`http://localhost:8086/write` |
| 参数 | 类型 | 是否必选 | 说明 |
|------------------------------|--------------------| -------- |------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| Type | String || 插件类型,固定为`flusher_http` |
| RemoteURL | String || 要发送到的URL地址,示例:`http://localhost:8086/write` |
| Headers | Map<String,String> || 发送时附加的http请求header,如可添加 Authorization、Content-Type等信息,支持动态变量写法,如`{"x-db":"%{tag.db}"}`<p>v2版本支持从Group的Metadata或者Group.Tags中获取动态变量,如`{"x-db":"%{metadata.db}"}`或者`{"x-db":"%{tag.db}"}`</p> |
| Query | Map<String,String> || 发送时附加到url上的query参数,支持动态变量写法,如`{"db":"%{tag.db}"}`<p>v2版本支持从Group的Metadata或者Group.Tags中获取动态变量,如`{"db":"%{metadata.db}"}`或者`{"db":"%{tag.db}"}`</p> |
| Timeout | String || 请求的超时时间,默认 `60s` |
| Retry.Enable | Boolean || 是否开启失败重试,默认为 `true` |
| Retry.MaxRetryTimes | Int || 最大重试次数,默认为 `3` |
| Retry.InitialDelay | String || 首次重试时间间隔,默认为 `1s`,重试间隔以会2的倍数递增 |
| Retry.MaxDelay | String || 最大重试时间间隔,默认为 `30s` |
| Convert | Struct || ilogtail数据转换协议配置 |
| Convert.Protocol | String || ilogtail数据转换协议,可选值:`custom_single`,`influxdb`。默认值:`custom_single`<p>v2版本可选值:`raw`</p> |
| Convert.Encoding | String || ilogtail flusher数据转换编码,可选值:`json`, `custom`,默认值:`json` |
| Convert.Separator | String || ilogtail数据转换时,PipelineGroupEvents中多个Events之间拼接使用的分隔符。如`\n`。若不设置,则默认不拼接Events,即每个Event作为独立请求向后发送。 默认值为空。<p>当前仅在`Convert.Protocol: raw`有效。</p> |
| Convert.IgnoreUnExpectedData | Boolean || ilogtail数据转换时,遇到非预期的数据的行为,true 跳过,false 报错。默认值 true |
| Convert.TagFieldsRename | Map<String,String> || 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map<String,String> || ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags``time` |
| Concurrency | Int || 向url发起请求的并发数,默认为`1` |
| QueueCapacity | Int | 否 | 内部channel的缓存大小,默认为1024
| AsyncIntercept | Boolean | 否 | 异步过滤数据,默认为否
| Query | Map<String,String> || 发送时附加到url上的query参数,支持动态变量写法,如`{"db":"%{tag.db}"}`<p>v2版本支持从Group的Metadata或者Group.Tags中获取动态变量,如`{"db":"%{metadata.db}"}`或者`{"db":"%{tag.db}"}`</p> |
| Timeout | String || 请求的超时时间,默认 `60s` |
| Retry.Enable | Boolean || 是否开启失败重试,默认为 `true` |
| Retry.MaxRetryTimes | Int || 最大重试次数,默认为 `3` |
| Retry.InitialDelay | String || 首次重试时间间隔,默认为 `1s`,重试间隔以会2的倍数递增 |
| Retry.MaxDelay | String || 最大重试时间间隔,默认为 `30s` |
| Convert | Struct || ilogtail数据转换协议配置 |
| Convert.Protocol | String || ilogtail数据转换协议,可选值:`custom_single`,`influxdb`, `jsonline`。默认值:`custom_single`<p>v2版本可选值:`raw`</p> |
| Convert.Encoding | String || ilogtail flusher数据转换编码,可选值:`json`, `custom`,默认值:`json` |
| Convert.Separator | String || ilogtail数据转换时,PipelineGroupEvents中多个Events之间拼接使用的分隔符。如`\n`。若不设置,则默认不拼接Events,即每个Event作为独立请求向后发送。 默认值为空。<p>当前仅在`Convert.Protocol: raw`有效。</p> |
| Convert.IgnoreUnExpectedData | Boolean || ilogtail数据转换时,遇到非预期的数据的行为,true 跳过,false 报错。默认值 true |
| Convert.TagFieldsRename | Map<String,String> || 对日志中tags中的json字段重命名 |
| Convert.ProtocolFieldsRename | Map<String,String> || ilogtail日志协议字段重命名,可当前可重命名的字段:`contents`,`tags``time` |
| Concurrency | Int || 向url发起请求的并发数,默认为`1` |
| QueueCapacity | Int | 否 | 内部channel的缓存大小,默认为1024
| AsyncIntercept | Boolean | 否 | 异步过滤数据,默认为否

## 样例

Expand All @@ -52,3 +52,30 @@ flushers:
Protocol: custom_single
Encoding: json
```
采集Docker日志,并将采集结果以`jsonline`协议发送到`http://localhost:9428/insert/jsonline`。

```yaml
enable: true
inputs:
- Type: service_docker_stdout
Stderr: true
Stdout: true
processors:
- Type: processor_json
SourceKey: content
KeepSource: true
ExpandDepth: 1
ExpandConnector: ""
KeepSourceIfParseError: true
flushers:
- Type: flusher_http
RemoteURL: http://localhsot:9428/insert/jsonline
QueueCapacity: 64
Convert:
Protocol: jsonline
Encoding: json
```

需要注意的是,由于使用`jsonline`协议(会将日志的content和tag打平),所以仅支持使用`json`格式进行提交。
由于`jsonline`默认会批量提交日志,所以建议调低`QueueCapacity`,避免在日志量较大的情况下,发生内存占用过多或OOM的问题。
6 changes: 6 additions & 0 deletions pkg/protocol/converter/converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
ProtocolCustomSingleFlatten = "custom_single_flatten"
ProtocolOtlpV1 = "otlp_v1"
ProtocolInfluxdb = "influxdb"
ProtocolJsonline = "jsonline"
ProtocolRaw = "raw"
)

Expand Down Expand Up @@ -110,6 +111,9 @@ var supportedEncodingMap = map[string]map[string]bool{
ProtocolInfluxdb: {
EncodingCustom: true,
},
ProtocolJsonline: {
EncodingJSON: true,
},
ProtocolRaw: {
EncodingCustom: true,
},
Expand Down Expand Up @@ -181,6 +185,8 @@ func (c *Converter) ToByteStreamWithSelectedFields(logGroup *protocol.LogGroup,
return c.ConvertToSingleProtocolStreamFlatten(logGroup, targetFields)
case ProtocolInfluxdb:
return c.ConvertToInfluxdbProtocolStream(logGroup, targetFields)
case ProtocolJsonline:
return c.ConvertToJsonlineProtocolStreamFlatten(logGroup)
default:
return nil, nil, fmt.Errorf("unsupported protocol: %s", c.Protocol)
}
Expand Down
57 changes: 57 additions & 0 deletions pkg/protocol/converter/jsonline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package protocol

import (
"bytes"
"fmt"

jsoniter "github.com/json-iterator/go"

"github.com/alibaba/ilogtail/pkg/protocol"
)

var (
sep = []byte("\n")
)

func (c *Converter) ConvertToJsonlineProtocolStreamFlatten(logGroup *protocol.LogGroup) ([]byte, []map[string]string, error) {
convertedLogs, _, err := c.ConvertToSingleProtocolLogsFlatten(logGroup, nil)
if err != nil {
return nil, nil, err
}
joinedStream := *GetPooledByteBuf()
for i, log := range convertedLogs {
switch c.Encoding {
case EncodingJSON:
var err error
joinedStream, err = marshalWithoutHTMLEscapedWithoutAlloc(log, bytes.NewBuffer(joinedStream))
if err != nil {
// release byte buffer
PutPooledByteBuf(&joinedStream)
return nil, nil, fmt.Errorf("unable to marshal log: %v", log)
}
// trim and append a \n
joinedStream = trimRightByte(joinedStream, sep[0])
if i < len(convertedLogs)-1 {
joinedStream = append(joinedStream, sep[0])
}
default:
return nil, nil, fmt.Errorf("unsupported encoding format: %s", c.Encoding)
}
}
return joinedStream, nil, nil
}

func marshalWithoutHTMLEscapedWithoutAlloc(data interface{}, bf *bytes.Buffer) ([]byte, error) {
enc := jsoniter.ConfigFastest.NewEncoder(bf)
if err := enc.Encode(data); err != nil {
return nil, err
}
return bf.Bytes(), nil
}

func trimRightByte(s []byte, c byte) []byte {
for len(s) > 0 && s[len(s)-1] == c {
s = s[:len(s)-1]
}
return s
}
Loading
Loading