IoTDB sink connector
Used to write data to IoTDB.
:::tip
There is a conflict of thrift version between IoTDB and Spark.Therefore, you need to execute rm -f $SPARK_HOME/jars/libthrift*
and cp $IOTDB_HOME/lib/libthrift* $SPARK_HOME/jars/
to resolve it.
:::
IoTDB supports the exactly-once
feature through idempotent writing. If two pieces of data have
the same key
and timestamp
, the new data will overwrite the old one.
name | type | required | default value |
---|---|---|---|
node_urls | list | yes | - |
username | string | yes | - |
password | string | yes | - |
key_device | string | yes | - |
key_timestamp | string | no | processing time |
key_measurement_fields | array | no | exclude device & timestamp |
storage_group | string | no | - |
batch_size | int | no | 1024 |
batch_interval_ms | int | no | - |
max_retries | int | no | - |
retry_backoff_multiplier_ms | int | no | - |
max_retry_backoff_ms | int | no | - |
default_thrift_buffer_size | int | no | - |
max_thrift_frame_size | int | no | - |
zone_id | string | no | - |
enable_rpc_compression | boolean | no | - |
connection_timeout_in_ms | int | no | - |
common-options | no | - |
IoTDB
cluster address, the format is ["host:port", ...]
IoTDB
user username
IoTDB
user password
Specify field name of the IoTDB
deviceId in SeaTunnelRow
Specify field-name of the IoTDB
timestamp in SeaTunnelRow. If not specified, use processing-time as timestamp
Specify field-name of the IoTDB
measurement list in SeaTunnelRow. If not specified, include all fields but exclude device
& timestamp
Specify device storage group(path prefix)
example: deviceId = ${storage_group} + "." + ${key_device}
For batch writing, when the number of buffers reaches the number of batch_size
or the time reaches batch_interval_ms
, the data will be flushed into the IoTDB
For batch writing, when the number of buffers reaches the number of batch_size
or the time reaches batch_interval_ms
, the data will be flushed into the IoTDB
The number of retries to flush failed
Using as a multiplier for generating the next delay for backoff
The amount of time to wait before attempting to retry a request to IoTDB
Thrift init buffer size in IoTDB
client
Thrift max frame size in IoTDB
client
java.time.ZoneId in IoTDB
client
Enable rpc compression in IoTDB
client
The maximum time (in ms) to wait when connecting to IoTDB
Sink plugin common parameters, please refer to Sink Common Options for details
Common options:
sink {
IoTDB {
node_urls = ["localhost:6667"]
username = "root"
password = "root"
batch_size = 1024
batch_interval_ms = 1000
}
}
When you assign key_device
is device_name
, for example:
sink {
IoTDB {
...
key_device = "device_name"
}
}
Upstream SeaTunnelRow data format is the following:
device_name | field_1 | field_2 |
---|---|---|
root.test_group.device_a | 1001 | 1002 |
root.test_group.device_b | 2001 | 2002 |
root.test_group.device_c | 3001 | 3002 |
Output to IoTDB
data format is the following:
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+-----------+----------+
| Time| Device| field_1| field_2|
+------------------------+------------------------+----------+-----------+
|2022-09-26T17:50:01.201Z|root.test_group.device_a| 1001| 1002|
|2022-09-26T17:50:01.202Z|root.test_group.device_b| 2001| 2002|
|2022-09-26T17:50:01.203Z|root.test_group.device_c| 3001| 3002|
+------------------------+------------------------+----------+-----------+
When you assign key_device
、key_timestamp
、key_measurement_fields
, for example:
sink {
IoTDB {
...
key_device = "device_name"
key_timestamp = "ts"
key_measurement_fields = ["temperature", "moisture"]
}
}
Upstream SeaTunnelRow data format is the following:
ts | device_name | field_1 | field_2 | temperature | moisture |
---|---|---|---|---|---|
1664035200001 | root.test_group.device_a | 1001 | 1002 | 36.1 | 100 |
1664035200001 | root.test_group.device_b | 2001 | 2002 | 36.2 | 101 |
1664035200001 | root.test_group.device_c | 3001 | 3002 | 36.3 | 102 |
Output to IoTDB
data format is the following:
IoTDB> SELECT * FROM root.test_group.* align by device;
+------------------------+------------------------+--------------+-----------+
| Time| Device| temperature| moisture|
+------------------------+------------------------+--------------+-----------+
|2022-09-25T00:00:00.001Z|root.test_group.device_a| 36.1| 100|
|2022-09-25T00:00:00.001Z|root.test_group.device_b| 36.2| 101|
|2022-09-25T00:00:00.001Z|root.test_group.device_c| 36.3| 102|
+------------------------+------------------------+--------------+-----------+
- Add IoTDB Sink Connector