IoTDB source connector
Read external data source data through IoTDB.
supports query SQL and can achieve projection effect.
name | type | required | default value |
---|---|---|---|
host | string | no | - |
port | int | no | - |
node_urls | string | no | - |
username | string | yes | - |
password | string | yes | - |
sql | string | yes | - |
fields | config | yes | - |
fetch_size | int | no | - |
lower_bound | long | no | - |
upper_bound | long | no | - |
num_partitions | int | no | - |
thrift_default_buffer_size | int | no | - |
enable_cache_leader | boolean | no | - |
version | string | no | - |
common-options | no | - |
host [string] the host of the IoTDB when you select host of the IoTDB
port [int] the port of the IoTDB when you select
node_urls [string] the node_urls of the IoTDB when you select
e.g.
127.0.0.1:8080,127.0.0.2:8080
sql [string] execute sql statement e.g.
select name,age from test
the fields of the IoTDB when you select
the field type is SeaTunnel field type org.apache.seatunnel.api.table.type.SqlType
e.g.
fields{
name=STRING
age=INT
}
the fetch_size of the IoTDB when you select
the username of the IoTDB when you select
the password of the IoTDB when you select
the lower_bound of the IoTDB when you select
the upper_bound of the IoTDB when you select
the num_partitions of the IoTDB when you select
the thrift_default_buffer_size of the IoTDB when you select
enable_cache_leader of the IoTDB when you select
Version represents the SQL semantic version used by the client, which is used to be compatible with the SQL semantics of 0.12 when upgrading 0.13. The possible values are: V_0_12, V_0_13.
we can split the partitions of the IoTDB and we used time column split
split num
upper bound of the time column
lower bound of the time column
split the time range into numPartitions parts
if numPartitions is 1, use the whole time range
if numPartitions < (upper_bound - lower_bound), use (upper_bound - lower_bound) partitions
eg: lower_bound = 1, upper_bound = 10, numPartitions = 2
sql = "select * from test where age > 0 and age < 10"
split result
split 1: select * from test where (time >= 1 and time < 6) and ( age > 0 and age < 10 )
split 2: select * from test where (time >= 6 and time < 11) and ( age > 0 and age < 10 )
Source plugin common parameters, please refer to Source Common Options for details
Common options:
source {
IoTDB {
node_urls = "localhost:6667"
username = "root"
password = "root"
}
}
When you assign sql
、fields
、partition
, for example:
sink {
IoTDB {
...
sql = "SELECT temperature, moisture FROM root.test_group.* WHERE time < 4102329600000 align by device"
lower_bound = 1
upper_bound = 4102329600000
num_partitions = 10
fields {
ts = bigint
device_name = string
temperature = float
moisture = bigint
}
}
}
Upstream IoTDB
data format is the following:
IoTDB> SELECT temperature, moisture FROM root.test_group.* WHERE time < 4102329600000 align by device;
+------------------------+------------------------+--------------+-----------+
| Time| Device| temperature| moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102|
+------------------------+------------------------+--------------+-----------+
Loaded to SeaTunnelRow data format is the following:
ts | device_name | temperature | moisture |
---|---|---|---|
1664035200001 | root.test_group.device_a | 36.1 | 100 |
1664035200001 | root.test_group.device_b | 36.2 | 101 |
1664035200001 | root.test_group.device_c | 36.3 | 102 |
- Add IoTDB Source Connector
- [Improve] Improve IoTDB Source Connector (2917)
- Support extract timestamp、device、measurement from SeaTunnelRow
- Support TINYINT、SMALLINT
- Support flush cache to database before prepareCommit