Skip to content

Commit

Permalink
feat: add kafka net config MaxOpenRequests (#1224)
Browse files Browse the repository at this point in the history
* feat: add kafka net config MaxOpenRequests

* feat: add docs and format

* feat: add docs
  • Loading branch information
7y-9 committed Nov 14, 2023
1 parent b2f091c commit 4e9bd0e
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 32 deletions.
72 changes: 42 additions & 30 deletions docs/cn/data-pipeline/flusher/flusher-kafka_v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
| Compression | String || 压缩算法,可选值:`none`, `snappy``lz4``gzip`,默认值`none` |
| CompressionLevel | Int || 压缩级别,可选值:`1~9`,默认值:`4`,设置为`0`则禁用`Compression` |
| MaxMessageBytes | Int || 一个批次提交的大小限制,配置和`message.max.bytes`对应,默认值:`1000000` |
| MaxOpenRequests | Int || 一个连接允许的最大打开的请求数,默认值:`5` |
| MaxRetries | Int || 提交失败重试次数,最大`3`次,默认值:`3` |
| BulkMaxSize | Int || 单次请求提交事件数,默认`2048` |
| BulkFlushFrequency | Int || 发送批量 Kafka 请求之前等待的时间,0标识没有时延,默认值:`0` |
Expand All @@ -61,8 +62,9 @@
| ClientID | String || 写入Kafka的Client ID,默认取值:`LogtailPlugin`|

- `Version`需要填写的是`kafka protocol version`版本号,`flusher_kafka_v2`当前支持的`kafka`版本范围:`0.8.2.x~3.3.1`
请根据自己的`kafka`版本号参照下面的`kafka protocol version`规则进行配置。**建议根据自己的`kafka`版本指定对应`protocol version`**,
`kafka protocol version`支持版本号如下:
请根据自己的`kafka`版本号参照下面的`kafka protocol version`规则进行配置。**建议根据自己的`kafka`
版本指定对应`protocol version`**,
`kafka protocol version`支持版本号如下:

```plain
0.8.2.0,0.8.2.1,0.8.2.2
Expand Down Expand Up @@ -103,7 +105,7 @@ inputs:
FilePattern: "*.log"
flushers:
- Type: flusher_kafka_v2
Brokers:
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Expand Down Expand Up @@ -140,7 +142,7 @@ flushers:
"time": "2022-07-20 16:55:05.415"
},
"tags": {
"k8s.namespace.name":"java_app",
"k8s.namespace.name": "java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log"
Expand All @@ -160,6 +162,7 @@ Topic: test_%{content.application}

最后`ilogtail`就自动将日志推送到`test_springboot-docker`这个`topic`中。
`topic`动态表达式规则:

- `%{content.fieldname}`。`content`代表从`contents`中取指定字段值
- `%{tag.fieldname}`,`tag`表示从`tags`中取指定字段值,例如:`%{tag.k8s.namespace.name}`
- `${env_name}`, 读取系统变量绑定到动态`topic`上,`ilogtail 1.5.0`开始支持。
Expand All @@ -168,12 +171,13 @@ Topic: test_%{content.application}
#### 动态topic中使用系统变量

动态`topic`绑定系统变量的两种场景:

- 将系统变量采集添加到日志的`tag`中,然后使用`%{tag.fieldname}`规则完成绑定。
- 对系统变量无采集存储需求,只是想根据设定的系统变量将日志推送到指定的`topic`中,直接采用`${env_name}`规则完成绑定,此方式需要`1.5.0`才支持。
- 对系统变量无采集存储需求,只是想根据设定的系统变量将日志推送到指定的`topic`中,直接采用`${env_name}`
规则完成绑定,此方式需要`1.5.0`才支持。

由于上面提到的两种系统变量的采集绑定都需要做一些特殊配置,因此下面将分别介绍下相关的配置操作。


**(1)将系统变量采集到日志中完成动态`topic`绑定**

将系统变量采集添加到日志中有两种方式,一种是在`ilogtail`容器`env`添加,另一种是通过`processor_add_fields` 插件添加,
Expand All @@ -184,35 +188,38 @@ Topic: test_%{content.application}
```yaml
env:
- name: ALIYUN_LOG_ENV_TAGS # add log tags from env
value: _node_name_|_node_ip_|_app_name_
value: _node_name_|_node_ip_|_app_name_
- name: _app_name_ # 添加自定义_app_name_变量,
value: kafka
```

自定义的变量`_app_name_`被添加到`ALIYUN_LOG_ENV_TAGS`中,日志的`tags`中会看到自定义的变量, 此时动态 `topic`采用`%{tag.fieldname}`规则配置即可。
自定义的变量`_app_name_`被添加到`ALIYUN_LOG_ENV_TAGS`中,日志的`tags`中会看到自定义的变量, 此时动态 `topic`
采用`%{tag.fieldname}`规则配置即可。

- 使用`processor_add_fields` 插件系统变量添加到日志中,配置参考如下:

```yaml
processors:
- Type: processor_add_fields
Fields:
service: ${env_name}
IgnoreIfExist: false
- Type: processor_add_fields
Fields:
service: ${env_name}
IgnoreIfExist: false
```

这里`${env_name}`生效依赖于`ilogtail`的`enable_env_ref_in_config`配置,从`ilogtail 1.5.0`开始支持。

**(2)直接采用`$`符将系统变量绑定动态`topic`中**

在`daemonset`或者`sidecar`方式部署的`ilogtail`容器`env`配置部分添加自定义的系统变量,配置参考案例如下:

```yaml
env:
- name: ALIYUN_LOG_ENV_TAGS # add log tags from env
value: _node_name_|_node_ip_
- name: app_name # 添加自定义app_name变量,
value: kafka
```

`app_name`添加到系统变量中后,直接采用动态topic的:`${env_name}`规则即可绑定。

```yaml
Expand All @@ -223,14 +230,14 @@ inputs:
FilePattern: "*.log"
flushers:
- Type: flusher_kafka_v2
Brokers:
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
- 192.XX.XX.3:9092
Topic: ilogtail_${app_name}
```
- `${app_name}`就是我们上面添加的系统变量。

- `${app_name}`就是我们上面添加的系统变量。

### TagFieldsRename

Expand Down Expand Up @@ -304,7 +311,7 @@ flushers:
- Type: flusher_kafka_v2
PartitionerType: hash
HashKeys:
- content.application
- content.application
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
Expand All @@ -313,11 +320,12 @@ flushers:
```

- `content.application`中表示从`contents`中取数据`application`字段数据,如果对`contents`协议字段做了重命名,
例如重名为`messege`,则应该配置为`messege.application`
例如重名为`messege`,则应该配置为`messege.application`

### 配置Headers

`iLogtail`中`Kafka`的消息头是以键值对数组的形式配置的。`header`中`value`仅支持字符串类型。

```yaml
enable: true
inputs:
Expand All @@ -337,10 +345,12 @@ flushers:
- key: "key2"
value: "value2"
```

### 数据平铺

`ilogtail 1.8.0`新增数据平铺协议`custom_single_flatten`,`contents`、`tags`和`time`三个`convert`层的协议字段中数据做一级打平。
当前`convert`协议在单条数据处理仅支持`json`编码,因此`custom_single_flatten`需要配合`json`编码一起使用。

```yaml
enable: true
inputs:
Expand Down Expand Up @@ -372,28 +382,30 @@ flushers:
"@time": "2022-07-20 16:55:05.415"
},
"tags": {
"k8s.namespace.name":"java_app",
"k8s.namespace.name": "java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log"
},
"time": 1664435098
}
```

使用平铺协议后`custom_single_flatten`,`json`全部被一级平铺。

```json
{
"class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
"application": "springboot-docker",
"level": "ERROR",
"message": "Completed initialization in 9 ms",
"thread": "http-nio-8080-exec-10",
"@time": "2022-07-20 16:55:05.415",
"k8s.namespace.name":"java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log",
"time": 1664435098
"class": "org.springframework.web.servlet.DispatcherServlet@initServletBean:547",
"application": "springboot-docker",
"level": "ERROR",
"message": "Completed initialization in 9 ms",
"thread": "http-nio-8080-exec-10",
"@time": "2022-07-20 16:55:05.415",
"k8s.namespace.name": "java_app",
"host.ip": "192.168.6.128",
"host.name": "master",
"log.file.path": "/data/test.log",
"time": 1664435098
}
```

Expand All @@ -420,7 +432,7 @@ flushers:
- Type: flusher_kafka_v2
PartitionerType: hash
HashKeys:
- content.application
- content.application
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
Expand Down Expand Up @@ -450,7 +462,7 @@ flushers:
- Type: flusher_kafka_v2
PartitionerType: hash
HashKeys:
- content.application
- content.application
Brokers:
- 192.XX.XX.1:9092
- 192.XX.XX.2:9092
Expand Down
8 changes: 6 additions & 2 deletions plugins/flusher/kafkav2/flusher_kafka_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ type FlusherKafka struct {
// The compression level must be in the range of 1 (best speed) to 9 (best compression)
// The default value is 4.
CompressionLevel int
// How many outstanding requests a connection is allowed to have before
// sending on it blocks (default 5).
MaxOpenRequests int

// The maximum number of events to bulk in a single Kafka request. The default is 2048.
BulkMaxSize int
Expand Down Expand Up @@ -179,6 +182,7 @@ func NewFlusherKafka() *FlusherKafka {
KeepAlive: 0,
MaxMessageBytes: nil, // use library default
RequiredACKs: nil, // use library default
MaxOpenRequests: 5,
BrokerTimeout: 10 * time.Second,
Compression: "none",
CompressionLevel: 4,
Expand Down Expand Up @@ -387,6 +391,7 @@ func newSaramaConfig(config *FlusherKafka) (*sarama.Config, error) {
k := sarama.NewConfig()

// configure network level properties
k.Net.MaxOpenRequests = config.MaxOpenRequests
timeout := config.Timeout
k.Net.DialTimeout = timeout
k.Net.ReadTimeout = timeout
Expand Down Expand Up @@ -559,8 +564,7 @@ func (k *FlusherKafka) makeHeaders() []sarama.RecordHeader {
}

func (k *FlusherKafka) getConverter() (*converter.Converter, error) {
logger.Debug(k.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", k.Convert.Protocol,
"Encoding", k.Convert.Encoding, "TagFieldsRename", k.Convert.TagFieldsRename, "ProtocolFieldsRename", k.Convert.ProtocolFieldsRename)
logger.Debug(k.context.GetRuntimeContext(), "[ilogtail data convert config] Protocol", k.Convert.Protocol, "Encoding", k.Convert.Encoding, "TagFieldsRename", k.Convert.TagFieldsRename, "ProtocolFieldsRename", k.Convert.ProtocolFieldsRename)
return converter.NewConverter(k.Convert.Protocol, k.Convert.Encoding, k.Convert.TagFieldsRename, k.Convert.ProtocolFieldsRename)
}

Expand Down

0 comments on commit 4e9bd0e

Please sign in to comment.