Skip to content

Commit

Permalink
[doc](insert) Add group commit docs
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Nov 2, 2023
1 parent 6838322 commit 05eb1d1
Show file tree
Hide file tree
Showing 3 changed files with 378 additions and 1 deletion.
188 changes: 188 additions & 0 deletions docs/en/docs/data-operate/import/import-way/group-commit-manual.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
---
{
"title": "Group Commit",
"language": "zh-CN"
}
---

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Group Commit

Group commit load does not introduce a new import method, but an extension of `INSERT INTO tbl VALUS(...)``Stream Load``Http Stream`.

In Doris, all methods of data loading are independent jobs which initiate a new transaction and generate a new data version. In the scenario of high-frequency writes, both transactions and compactions are under great pressure. Group commit load reduces the number of transactions and compactions by combining multiple small load tasks into one load job, and thus improve write performance.

It should be noted that the group commit is returned after the data is writed to WAL, at this time, the data is not visible for users, the default time interval is 10 seconds.

## Basic operations

If the table schema is:
```sql
CREATE TABLE `dt` (
`id` int(11) NOT NULL,
`name` varchar(50) NULL,
`score` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
```

### INSERT INTO VALUES

```sql
# Config session variable to enable the group commit, the default value is false
mysql> set enable_insert_group_commit = true;

# The retured label is start with 'group_commit', which is the label of the real load job
mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99);
Query OK, 2 rows affected (0.05 sec)
{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}

# The returned label and txn_id are the same as the above, which means they are handled in on load job
mysql> insert into dt(id, name) values(3, 'John');
Query OK, 1 row affected (0.01 sec)
{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}

# The data is not visible
mysql> select * from dt;
Empty set (0.01 sec)

# After about 10 seconds, the data is visible
mysql> select * from dt;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 1 | Bob | 90 |
| 2 | Alice | 99 |
| 3 | John | NULL |
+------+-------+-------+
3 rows in set (0.02 sec)
```

### Stream Load

If the content of `data.csv` is:
```sql
4,Amy,60
5,Ross,98
```

```sql
# Add 'group_commit:true' configuration in the http header

curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load
{
"TxnId": 7009,
"Label": "group_commit_c84d2099208436ab_96e33fda01eddba8",
"Comment": "",
"GroupCommit": true,
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 2,
"NumberLoadedRows": 2,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 19,
"LoadTimeMs": 35,
"StreamLoadPutTimeMs": 5,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 26
}

# The returned 'GroupCommit' is 'true', which means this is a group commit load
# The retured label is start with 'group_commit', which is the label of the real load job
```

### Http Stream

```sql
# Add 'group_commit:true' configuration in the http header

curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" -H "sql:insert into db.dt select * from http_stream('column_separator'=',', 'format' = 'CSV')" http://{fe_host}:{http_port}/api/_http_stream
{
"TxnId": 7011,
"Label": "group_commit_3b45c5750d5f15e5_703428e462e1ebb0",
"Comment": "",
"GroupCommit": true,
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 2,
"NumberLoadedRows": 2,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 19,
"LoadTimeMs": 65,
"StreamLoadPutTimeMs": 41,
"ReadDataTimeMs": 47,
"WriteDataTimeMs": 23
}

# The returned 'GroupCommit' is 'true', which means this is a group commit load
# The retured label is start with 'group_commit', which is the label of the real load job
```

### Use `PreparedStatement`

To reduce the CPU cost of SQL parsing and query planning, we provide the `PreparedStatement` in the FE. When using `PreparedStatement`, the SQL and its plan will be cached in the session level memory cache and will be reused later on, which reduces the CPU cost of FE. The following is an example of using PreparedStatement in JDBC:

1. Setup JDBC url and enable server side prepared statement

```
url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true
```

2. Using `PreparedStatement`

```java
PreparedStatement statement = conn.prepareStatement("INSERT INTO dt VALUES (?, ?, ?)");
statement.setInt(1, 5);
statement.setString(2, "Tiger");
statement.setInt(3, 100);
int rows = statement.executeUpdate();
```

## Relevant system configuration

### Session variable

+ enable_insert_group_commit

If this configuration is true, FE will judge whether the `INSERT INTO VALUES` can be group commit, the conditions are as follows:
+ Not a transaction insert, as `Begin`; `INSERT INTO VALUES`; `COMMIT`
+ Not specifying partition, as `INSERT INTO dt PARTITION()`
+ Not specifying label, as `INSERT INTO dt WITH LABEL {label} VALUES`
+ VALUES does not contain any expression, as `INSERT INTO dt VALUES (1 + 100)`

The default value is false, use `SET enable_insert_group_commit = true;` command to enable it.

### BE configuration

+ group_commit_interval_ms

The time interval of the internal group commit load job will stop and start a new internal job, the default value is 10000 milliseconds.

+ group_commit_replay_wal_dir
+ group_commit_sync_wal_batch
+ group_commit_replay_wal_retry_num
+ group_commit_replay_wal_retry_interval_seconds
3 changes: 2 additions & 1 deletion docs/sidebars.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,8 @@
"data-operate/import/import-way/mysql-load-manual",
"data-operate/import/import-way/s3-load-manual",
"data-operate/import/import-way/insert-into-manual",
"data-operate/import/import-way/load-json-format"
"data-operate/import/import-way/load-json-format",
"data-operate/import/import-way/group-commit-manual"
]
},
{
Expand Down
188 changes: 188 additions & 0 deletions docs/zh-CN/docs/data-operate/import/import-way/group-commit-manual.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
---
{
"title": "Group Commit",
"language": "zh-CN"
}
---

<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Group Commit

攒批写入没有引入一种新的导入方式,而是对`INSERT INTO tbl VALUS(...)``Stream Load``Http Stream`的扩展。

在 Doris 中,所有的数据写入都是一个独立的导入作业,发起一个新的事务,产生一个新的数据版本。在高频写入的场景下,对transaction和compaction都产生了较大的压力。攒批写通过把多个小的写入合成一个写入作业,减少了transaction和compaction的次数,缓解了系统内部的压力,提高了写入的性能。

需要注意的是,攒批写入在数据写入WAL后即返回,此时不能立刻读出数据,默认为10秒后可以读出。

## 基本操作

假如表的结构为:
```sql
CREATE TABLE `dt` (
`id` int(11) NOT NULL,
`name` varchar(50) NULL,
`score` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1"
);
```

### INSERT INTO VALUES

```sql
# 配置session变量开启攒批,默认为false
mysql> set enable_insert_group_commit = true;

# 这里返回的label是group_commit开头的,是真正消费数据的导入关联的label,可以区分出是否攒批了
mysql> insert into dt values(1, 'Bob', 90), (2, 'Alice', 99);
Query OK, 2 rows affected (0.05 sec)
{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}

# 可以看出这个label, txn_id和上一个相同,说明是攒到了同一个导入任务中
mysql> insert into dt(id, name) values(3, 'John');
Query OK, 1 row affected (0.01 sec)
{'label':'group_commit_a145ce07f1c972fc-bd2c54597052a9ad', 'status':'PREPARE', 'txnId':'181508'}

# 不能立刻查询到
mysql> select * from dt;
Empty set (0.01 sec)

# 10秒后可以查询到
mysql> select * from dt;
+------+-------+-------+
| id | name | score |
+------+-------+-------+
| 1 | Bob | 90 |
| 2 | Alice | 99 |
| 3 | John | NULL |
+------+-------+-------+
3 rows in set (0.02 sec)
```

### Stream Load

假如`data.csv`的内容为:
```sql
4,Amy,60
5,Ross,98
```

```sql
# 导入时在header中增加"group_commit:true"配置

curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" -H "column_separator:," http://{fe_host}:{http_port}/api/db/dt/_stream_load
{
"TxnId": 7009,
"Label": "group_commit_c84d2099208436ab_96e33fda01eddba8",
"Comment": "",
"GroupCommit": true,
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 2,
"NumberLoadedRows": 2,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 19,
"LoadTimeMs": 35,
"StreamLoadPutTimeMs": 5,
"ReadDataTimeMs": 0,
"WriteDataTimeMs": 26
}

# 返回的GroupCommit为true,说明进入了攒批的流程
# 返回的Label是group_commit开头的,是真正消费数据的导入关联的label
```

### Http Stream

```sql
# 导入时在header中增加"group_commit:true"配置

curl --location-trusted -u {user}:{passwd} -T data.csv -H "group_commit:true" -H "sql:insert into db.dt select * from http_stream('column_separator'=',', 'format' = 'CSV')" http://{fe_host}:{http_port}/api/_http_stream
{
"TxnId": 7011,
"Label": "group_commit_3b45c5750d5f15e5_703428e462e1ebb0",
"Comment": "",
"GroupCommit": true,
"Status": "Success",
"Message": "OK",
"NumberTotalRows": 2,
"NumberLoadedRows": 2,
"NumberFilteredRows": 0,
"NumberUnselectedRows": 0,
"LoadBytes": 19,
"LoadTimeMs": 65,
"StreamLoadPutTimeMs": 41,
"ReadDataTimeMs": 47,
"WriteDataTimeMs": 23
}

# 返回的GroupCommit为true,说明进入了攒批的流程
# 返回的Label是group_commit开头的,是真正消费数据的导入关联的label
```

### 使用`PreparedStatement`

为了减少 SQL 解析和生成规划的开销, 我们在 FE 端支持了 MySQL 协议的`PreparedStatement`特性。当使用`PreparedStatement`时,SQL 和其导入规划将被缓存到 Session 级别的内存缓存中,后续的导入直接使用缓存对象,降低了 FE 的 CPU 压力。下面是在 JDBC 中使用 PreparedStatement 的例子:

1. 设置 JDBC url 并在 Server 端开启 prepared statement

```
url = jdbc:mysql://127.0.0.1:9030/db?useServerPrepStmts=true
```

2. 使用 `PreparedStatement`

```java
PreparedStatement statement = conn.prepareStatement("INSERT INTO dt VALUES (?, ?, ?)");
statement.setInt(1, 5);
statement.setString(2, "Tiger");
statement.setInt(3, 100);
int rows = statement.executeUpdate();
```

## 相关系统配置

### Session变量

+ enable_insert_group_commit

当该参数设置为 true 时,会判断用户发起的`INSERT INTO VALUES`语句是否符合攒批的条件,如果符合,该语句的执行会进入到攒批写入中。主要的判断逻辑包括:
+ 不是事务写入,即`Begin`; `INSERT INTO VALUES`; `COMMIT`方式
+ 不指定partition,即`INSERT INTO dt PARTITION()`等指定partition的语句
+ 不指定label,即`INSERT INTO dt WITH LABEL {label} VALUES`
+ VALUES中不能包含表达式,即`INSERT INTO dt VALUES (1 + 100)`

默认为 false。可通过 `SET enable_insert_group_commit = true;` 来设置。

### BE 配置

+ group_commit_interval_ms

攒批写入开启多久后结束,默认为10000,即10秒。

+ group_commit_replay_wal_dir
+ group_commit_sync_wal_batch
+ group_commit_replay_wal_retry_num
+ group_commit_replay_wal_retry_interval_seconds

0 comments on commit 05eb1d1

Please sign in to comment.