Skip to content

Commit

Permalink
[fix](stream load)add test case and doc for arrow type of stream load (
Browse files Browse the repository at this point in the history
…#28098)

add test case and doc for arrow type of stream load
  • Loading branch information
wuwenchi authored Dec 22, 2023
1 parent f38e11e commit 7710c85
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -166,9 +166,11 @@ Stream load uses HTTP protocol, so all parameters related to import tasks are se

+ format

Specify the import data format, support csv, json, the default is csv
Specify the import data format, support csv, json, arrow, the default is csv

<version since="1.2">supports `csv_with_names` (csv file line header filter), `csv_with_names_and_types` (csv file first two lines filter), parquet, orc</version>
<version since="1.2">supports `csv_with_names` (csv file line header filter), `csv_with_names_and_types` (csv file first two lines filter), parquet, orc.</version>

<version since="2.1.0">supports `arrow` format.</version>

+ exec\_mem\_limit

Expand Down
3 changes: 2 additions & 1 deletion docs/en/docs/ecosystem/spark-doris-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,8 @@ kafkaSource.selectExpr("CAST(key AS STRING)", "CAST(value as STRING)")
| doris.write.fields | -- | Specifies the fields (or the order of the fields) to write to the Doris table, fileds separated by commas.<br/>By default, all fields are written in the order of Doris table fields. |
| doris.sink.batch.size | 100000 | Maximum number of lines in a single write BE |
| doris.sink.max-retries | 0 | Number of retries after writing BE failed |
| sink.properties.* | -- | Import parameters for Stream Load. <br/>For example:<br/>Specify column separator: `'doris.sink.properties.column_separator' = ','`, specify write data format: `'doris.sink.properties.format' = 'json'` [More parameter details](https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/stream-load-manual) |
| doris.sink.properties.format | -- | Data format of the stream load.<br/>Supported formats: csv, json, arrow(since version 1.4.0)<br/> [More Multi-parameter details](https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/stream-load-manual) |
| doris.sink.properties.* | -- | Import parameters for Stream Load. <br/>For example:<br/>Specify column separator: `'doris.sink.properties.column_separator' = ','`.<br/>[More parameter details](https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/stream-load-manual) |
| doris.sink.task.partition.size | -- | The number of partitions corresponding to the Writing task. After filtering and other operations, the number of partitions written in Spark RDD may be large, but the number of records corresponding to each Partition is relatively small, resulting in increased writing frequency and waste of computing resources. The smaller this value is set, the less Doris write frequency and less Doris merge pressure. It is generally used with doris.sink.task.use.repartition. |
| doris.sink.task.use.repartition | false | Whether to use repartition mode to control the number of partitions written by Doris. The default value is false, and coalesce is used (note: if there is no Spark action before the write, the whole computation will be less parallel). If it is set to true, then repartition is used (note: you can set the final number of partitions at the cost of shuffle). |
| doris.sink.batch.interval.ms | 50 | The interval time of each batch sink, unit ms. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,11 @@ Stream Load 由于使用的是 HTTP 协议,所以所有导入任务有关的

- format

指定导入数据格式,支持 `csv``json`,默认是 `csv`
<version since="1.2"> 支持 `csv_with_names` (csv文件行首过滤)、`csv_with_names_and_types`(csv文件前两行过滤)、`parquet``orc`</version>
指定导入数据格式,支持 `csv``json``arrow` ,默认是 `csv`

<version since="1.2"> 支持 `csv_with_names` (csv文件行首过滤)、`csv_with_names_and_types`(csv文件前两行过滤)、`parquet``orc`。</version>

<version since="2.1.0"> 支持 `arrow`格式。</version>

```text
列顺序变换例子:原始数据有三列(src_c1,src_c2,src_c3), 目前doris表也有三列(dst_c1,dst_c2,dst_c3)
Expand Down
3 changes: 2 additions & 1 deletion docs/zh-CN/docs/ecosystem/spark-doris-connector.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,8 @@ kafkaSource.selectExpr("CAST(value as STRING)")
| doris.write.fields | -- | 指定写入Doris表的字段或者字段顺序,多列之间使用逗号分隔。<br />默认写入时要按照Doris表字段顺序写入全部字段。 |
| doris.sink.batch.size | 100000 | 单次写BE的最大行数 |
| doris.sink.max-retries | 0 | 写BE失败之后的重试次数 |
| doris.sink.properties.* | -- | Stream Load 的导入参数。<br/>例如:<br/>指定列分隔符:`'doris.sink.properties.column_separator' = ','`、 指定写入数据格式:`'doris.sink.properties.format' = 'json'` [更多参数详情](https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/stream-load-manual) |
| doris.sink.properties.format | csv | Stream Load 的数据格式。<br/>共支持3种格式:csv,json,arrow(1.4.0版本开始支持)<br/> [更多参数详情](https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/stream-load-manual) |
| doris.sink.properties.* | -- | Stream Load 的导入参数。<br/>例如:<br/>指定列分隔符:`'doris.sink.properties.column_separator' = ','`等<br/> [更多参数详情](https://doris.apache.org/zh-CN/docs/dev/data-operate/import/import-way/stream-load-manual) |
| doris.sink.task.partition.size | -- | Doris写入任务对应的 Partition 个数。Spark RDD 经过过滤等操作,最后写入的 Partition 数可能会比较大,但每个 Partition 对应的记录数比较少,导致写入频率增加和计算资源浪费。<br/>此数值设置越小,可以降低 Doris 写入频率,减少 Doris 合并压力。该参数配合 doris.sink.task.use.repartition 使用。 |
| doris.sink.task.use.repartition | false | 是否采用 repartition 方式控制 Doris写入 Partition数。默认值为 false,采用 coalesce 方式控制(注意: 如果在写入之前没有 Spark action 算子,可能会导致整个计算并行度降低)。<br/>如果设置为 true,则采用 repartition 方式(注意: 可设置最后 Partition 数,但会额外增加 shuffle 开销)。 |
| doris.sink.batch.interval.ms | 50 | 每个批次sink的间隔时间,单位 ms。 |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !q01 --
1 true 1 2 3 4 123456789 6.6 7.7 3.12000 2023-09-08 2023-09-08T17:12:34.123456 char varchar string
1 true 1 2 3 4 123456789 6.6 7.7 3.12000 2023-09-08 2023-09-08T17:12:34.123456 char varchar string
1 true 1 2 3 4 123456789 6.6 7.7 3.12000 2023-09-08 2023-09-08T17:12:34.123456 char varchar string
1 true 1 2 3 4 123456789 6.6 7.7 3.12000 2023-09-08 2023-09-08T17:12:34.123456 char varchar string
1 true 1 2 3 4 123456789 6.6 7.7 3.12000 2023-09-08 2023-09-08T17:12:34.123456 char varchar string
1 true 1 2 3 4 123456789 6.6 7.7 3.12000 2023-09-08 2023-09-08T17:12:34.123456 char varchar string
1 true 1 2 3 4 123456789 6.6 7.7 3.12000 2023-09-08 2023-09-08T17:12:34.123456 char varchar string

-- !q02 --
1 [1, 0, 0, 1, 1] [1, 2, 3] [2, 12, 32] [3, 4, 5, 6] [4, 5, 6] [123456789, 987654321, 123789456] [6.6, 6.7, 7.8] [7.7, 8.8, 8.8999996185302734] [3.12000, 1.12345] ["2023-09-08", "2027-10-28"] ["2023-09-08 17:12:34.123456", "2024-09-08 18:12:34.123456"] ["char", "char2"] ["varchar", "varchar2"] ["string", "string2"]
1 [1, 0, 0, 1, 1] [1, 2, 3] [2, 12, 32] [3, 4, 5, 6] [4, 5, 6] [123456789, 987654321, 123789456] [6.6, 6.7, 7.8] [7.7, 8.8, 8.8999996185302734] [3.12000, 1.12345] ["2023-09-08", "2027-10-28"] ["2023-09-08 17:12:34.123456", "2024-09-08 18:12:34.123456"] ["char", "char2"] ["varchar", "varchar2"] ["string", "string2"]
1 [1, 0, 0, 1, 1] [1, 2, 3] [2, 12, 32] [3, 4, 5, 6] [4, 5, 6] [123456789, 987654321, 123789456] [6.6, 6.7, 7.8] [7.7, 8.8, 8.8999996185302734] [3.12000, 1.12345] ["2023-09-08", "2027-10-28"] ["2023-09-08 17:12:34.123456", "2024-09-08 18:12:34.123456"] ["char", "char2"] ["varchar", "varchar2"] ["string", "string2"]
1 [1, 0, 0, 1, 1] [1, 2, 3] [2, 12, 32] [3, 4, 5, 6] [4, 5, 6] [123456789, 987654321, 123789456] [6.6, 6.7, 7.8] [7.7, 8.8, 8.8999996185302734] [3.12000, 1.12345] ["2023-09-08", "2027-10-28"] ["2023-09-08 17:12:34.123456", "2024-09-08 18:12:34.123456"] ["char", "char2"] ["varchar", "varchar2"] ["string", "string2"]
1 [1, 0, 0, 1, 1] [1, 2, 3] [2, 12, 32] [3, 4, 5, 6] [4, 5, 6] [123456789, 987654321, 123789456] [6.6, 6.7, 7.8] [7.7, 8.8, 8.8999996185302734] [3.12000, 1.12345] ["2023-09-08", "2027-10-28"] ["2023-09-08 17:12:34.123456", "2024-09-08 18:12:34.123456"] ["char", "char2"] ["varchar", "varchar2"] ["string", "string2"]
1 [1, 0, 0, 1, 1] [1, 2, 3] [2, 12, 32] [3, 4, 5, 6] [4, 5, 6] [123456789, 987654321, 123789456] [6.6, 6.7, 7.8] [7.7, 8.8, 8.8999996185302734] [3.12000, 1.12345] ["2023-09-08", "2027-10-28"] ["2023-09-08 17:12:34.123456", "2024-09-08 18:12:34.123456"] ["char", "char2"] ["varchar", "varchar2"] ["string", "string2"]
1 [1, 0, 0, 1, 1] [1, 2, 3] [2, 12, 32] [3, 4, 5, 6] [4, 5, 6] [123456789, 987654321, 123789456] [6.6, 6.7, 7.8] [7.7, 8.8, 8.8999996185302734] [3.12000, 1.12345] ["2023-09-08", "2027-10-28"] ["2023-09-08 17:12:34.123456", "2024-09-08 18:12:34.123456"] ["char", "char2"] ["varchar", "varchar2"] ["string", "string2"]

-- !q03 --
1 {1:1, 0:1} {1:2, 3:4} {2:4, 5:6} {3:4, 7:8} {4:5, 1:2} {123456789:987654321, 789456123:456789123} {6.6:8.8, 9.9:10.1} {7.7:1.1, 2.2:3.3} {3.12000:1.23000, 2.34000:5.67000} {"2023-09-08":"2024-09-08", "1023-09-08":"2023-09-08"} {"":"2023-09-08 17:12:34.123456", "3023-09-08 17:12:34.123456":"4023-09-08 17:12:34.123456"} {"char":"char2", "char2":"char3"} {"varchar":"varchar2", "varchar3":"varchar4"} {"string":"string2", "string3":"string4"}
1 {1:1, 0:1} {1:2, 3:4} {2:4, 5:6} {3:4, 7:8} {4:5, 1:2} {123456789:987654321, 789456123:456789123} {6.6:8.8, 9.9:10.1} {7.7:1.1, 2.2:3.3} {3.12000:1.23000, 2.34000:5.67000} {"2023-09-08":"2024-09-08", "1023-09-08":"2023-09-08"} {"":"2023-09-08 17:12:34.123456", "3023-09-08 17:12:34.123456":"4023-09-08 17:12:34.123456"} {"char":"char2", "char2":"char3"} {"varchar":"varchar2", "varchar3":"varchar4"} {"string":"string2", "string3":"string4"}
1 {1:1, 0:1} {1:2, 3:4} {2:4, 5:6} {3:4, 7:8} {4:5, 1:2} {123456789:987654321, 789456123:456789123} {6.6:8.8, 9.9:10.1} {7.7:1.1, 2.2:3.3} {3.12000:1.23000, 2.34000:5.67000} {"2023-09-08":"2024-09-08", "1023-09-08":"2023-09-08"} {"":"2023-09-08 17:12:34.123456", "3023-09-08 17:12:34.123456":"4023-09-08 17:12:34.123456"} {"char":"char2", "char2":"char3"} {"varchar":"varchar2", "varchar3":"varchar4"} {"string":"string2", "string3":"string4"}
1 {1:1, 0:1} {1:2, 3:4} {2:4, 5:6} {3:4, 7:8} {4:5, 1:2} {123456789:987654321, 789456123:456789123} {6.6:8.8, 9.9:10.1} {7.7:1.1, 2.2:3.3} {3.12000:1.23000, 2.34000:5.67000} {"2023-09-08":"2024-09-08", "1023-09-08":"2023-09-08"} {"":"2023-09-08 17:12:34.123456", "3023-09-08 17:12:34.123456":"4023-09-08 17:12:34.123456"} {"char":"char2", "char2":"char3"} {"varchar":"varchar2", "varchar3":"varchar4"} {"string":"string2", "string3":"string4"}
1 {1:1, 0:1} {1:2, 3:4} {2:4, 5:6} {3:4, 7:8} {4:5, 1:2} {123456789:987654321, 789456123:456789123} {6.6:8.8, 9.9:10.1} {7.7:1.1, 2.2:3.3} {3.12000:1.23000, 2.34000:5.67000} {"2023-09-08":"2024-09-08", "1023-09-08":"2023-09-08"} {"":"2023-09-08 17:12:34.123456", "3023-09-08 17:12:34.123456":"4023-09-08 17:12:34.123456"} {"char":"char2", "char2":"char3"} {"varchar":"varchar2", "varchar3":"varchar4"} {"string":"string2", "string3":"string4"}
1 {1:1, 0:1} {1:2, 3:4} {2:4, 5:6} {3:4, 7:8} {4:5, 1:2} {123456789:987654321, 789456123:456789123} {6.6:8.8, 9.9:10.1} {7.7:1.1, 2.2:3.3} {3.12000:1.23000, 2.34000:5.67000} {"2023-09-08":"2024-09-08", "1023-09-08":"2023-09-08"} {"":"2023-09-08 17:12:34.123456", "3023-09-08 17:12:34.123456":"4023-09-08 17:12:34.123456"} {"char":"char2", "char2":"char3"} {"varchar":"varchar2", "varchar3":"varchar4"} {"string":"string2", "string3":"string4"}
1 {1:1, 0:1} {1:2, 3:4} {2:4, 5:6} {3:4, 7:8} {4:5, 1:2} {123456789:987654321, 789456123:456789123} {6.6:8.8, 9.9:10.1} {7.7:1.1, 2.2:3.3} {3.12000:1.23000, 2.34000:5.67000} {"2023-09-08":"2024-09-08", "1023-09-08":"2023-09-08"} {"":"2023-09-08 17:12:34.123456", "3023-09-08 17:12:34.123456":"4023-09-08 17:12:34.123456"} {"char":"char2", "char2":"char3"} {"varchar":"varchar2", "varchar3":"varchar4"} {"string":"string2", "string3":"string4"}

-- !q04 --
1 {"c_bool": 1, "c_tinyint": 1, "c_smallint": 2, "c_int": 3, "c_bigint": 4, "c_largeint": 123456789, "c_float": 6.6, "c_double": 7.7, "c_decimal": 3.12000, "c_date": "2023-09-08", "c_datetime": "2023-09-08 17:12:34.123456", "c_char": "char", "c_varchar": "varchar", "c_string": "string"}
1 {"c_bool": 1, "c_tinyint": 1, "c_smallint": 2, "c_int": 3, "c_bigint": 4, "c_largeint": 123456789, "c_float": 6.6, "c_double": 7.7, "c_decimal": 3.12000, "c_date": "2023-09-08", "c_datetime": "2023-09-08 17:12:34.123456", "c_char": "char", "c_varchar": "varchar", "c_string": "string"}
1 {"c_bool": 1, "c_tinyint": 1, "c_smallint": 2, "c_int": 3, "c_bigint": 4, "c_largeint": 123456789, "c_float": 6.6, "c_double": 7.7, "c_decimal": 3.12000, "c_date": "2023-09-08", "c_datetime": "2023-09-08 17:12:34.123456", "c_char": "char", "c_varchar": "varchar", "c_string": "string"}
1 {"c_bool": 1, "c_tinyint": 1, "c_smallint": 2, "c_int": 3, "c_bigint": 4, "c_largeint": 123456789, "c_float": 6.6, "c_double": 7.7, "c_decimal": 3.12000, "c_date": "2023-09-08", "c_datetime": "2023-09-08 17:12:34.123456", "c_char": "char", "c_varchar": "varchar", "c_string": "string"}
1 {"c_bool": 1, "c_tinyint": 1, "c_smallint": 2, "c_int": 3, "c_bigint": 4, "c_largeint": 123456789, "c_float": 6.6, "c_double": 7.7, "c_decimal": 3.12000, "c_date": "2023-09-08", "c_datetime": "2023-09-08 17:12:34.123456", "c_char": "char", "c_varchar": "varchar", "c_string": "string"}
1 {"c_bool": 1, "c_tinyint": 1, "c_smallint": 2, "c_int": 3, "c_bigint": 4, "c_largeint": 123456789, "c_float": 6.6, "c_double": 7.7, "c_decimal": 3.12000, "c_date": "2023-09-08", "c_datetime": "2023-09-08 17:12:34.123456", "c_char": "char", "c_varchar": "varchar", "c_string": "string"}
1 {"c_bool": 1, "c_tinyint": 1, "c_smallint": 2, "c_int": 3, "c_bigint": 4, "c_largeint": 123456789, "c_float": 6.6, "c_double": 7.7, "c_decimal": 3.12000, "c_date": "2023-09-08", "c_datetime": "2023-09-08 17:12:34.123456", "c_char": "char", "c_varchar": "varchar", "c_string": "string"}

Loading

0 comments on commit 7710c85

Please sign in to comment.