Apache Iceberg source connector
Source connector for Apache Iceberg. It can support batch and stream mode.
- batch
- stream
- exactly-once
- column projection
- parallelism
- support user-defined split
- data format
- parquet
- orc
- avro
- iceberg catalog
- hadoop(2.7.1 , 2.7.5 , 3.1.3)
- hive(2.3.9 , 3.1.2)
name | type | required | default value |
---|---|---|---|
catalog_name | string | yes | - |
catalog_type | string | yes | - |
uri | string | no | - |
warehouse | string | yes | - |
namespace | string | yes | - |
table | string | yes | - |
schema | config | no | - |
case_sensitive | boolean | no | false |
start_snapshot_timestamp | long | no | - |
start_snapshot_id | long | no | - |
end_snapshot_id | long | no | - |
use_snapshot_id | long | no | - |
use_snapshot_timestamp | long | no | - |
stream_scan_strategy | enum | no | FROM_LATEST_SNAPSHOT |
common-options | no | - |
User-specified catalog name.
The optional values are:
- hive: The hive metastore catalog.
- hadoop: The hadoop catalog.
The Hive metastore’s thrift URI.
The location to store metadata files and data files.
The iceberg database name in the backend catalog.
The iceberg table name in the backend catalog.
If data columns where selected via schema [config], controls whether the match to the schema will be done with case sensitivity.
Use projection to select data columns and columns order.
e.g.
schema {
fields {
f2 = "boolean"
f1 = "bigint"
f3 = "int"
f4 = "bigint"
}
}
Instructs this scan to look for changes starting from a particular snapshot (exclusive).
Instructs this scan to look for changes starting from the most recent snapshot for the table as of the timestamp. timestamp – the timestamp in millis since the Unix epoch
Instructs this scan to look for changes up to a particular snapshot (inclusive).
Instructs this scan to look for use the given snapshot ID.
Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp – the timestamp in millis since the Unix epoch
Starting strategy for stream mode execution, Default to use FROM_LATEST_SNAPSHOT
if don’t specify any value.
The optional values are:
- TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode.
- FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive.
- FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive.
- FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive.
- FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive.
Source plugin common parameters, please refer to Source Common Options for details.
simple
source {
Iceberg {
catalog_name = "seatunnel"
catalog_type = "hadoop"
warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
namespace = "your_iceberg_database"
table = "your_iceberg_table"
}
}
Or
source {
Iceberg {
catalog_name = "seatunnel"
catalog_type = "hive"
uri = "thrift://localhost:9083"
warehouse = "hdfs://your_cluster//tmp/seatunnel/iceberg/"
namespace = "your_iceberg_database"
table = "your_iceberg_table"
}
}
column projection
source {
Iceberg {
catalog_name = "seatunnel"
catalog_type = "hadoop"
warehouse = "hdfs://your_cluster/tmp/seatunnel/iceberg/"
namespace = "your_iceberg_database"
table = "your_iceberg_table"
schema {
fields {
f2 = "boolean"
f1 = "bigint"
f3 = "int"
f4 = "bigint"
}
}
}
}
:::tip
In order to be compatible with different versions of Hadoop and Hive, the scope of hive-exec and flink-shaded-hadoop-2 in the project pom file are provided, so if you use the Flink engine, first you may need to add the following Jar packages to <FLINK_HOME>/lib directory, if you are using the Spark engine and integrated with Hadoop, then you do not need to add the following Jar packages.
:::
flink-shaded-hadoop-x-xxx.jar
hive-exec-xxx.jar
libfb303-xxx.jar
Some versions of the hive-exec package do not have libfb303-xxx.jar, so you also need to manually import the Jar package.
- Add Iceberg Source Connector