Skip to content

Commit

Permalink
flink: support additional fields for table creation
Browse files Browse the repository at this point in the history
  • Loading branch information
HelenMel committed Nov 19, 2021
1 parent 1f4f21b commit 23a90d6
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 16 deletions.
66 changes: 53 additions & 13 deletions aiven/client/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -3120,6 +3120,7 @@ def service__flink__table__list(self):
"integration_id",
"table_id",
"table_name",
"schema_sql",
]]
self.print_response(
self.client.list_flink_tables(project_name, self.args.service_name), json=self.args.json, table_layout=layout
Expand All @@ -3130,8 +3131,32 @@ def service__flink__table__list(self):
@arg("integration_id", help="Service integration ID")
@arg("-n", "--table-name", required=True, help="Table name")
@arg("--kafka-topic", required=False, help="Topic name, used as a source/sink. (Kafka integration only)")
@arg(
"--kafka-connector-type",
required=False,
help="Kafka connector type (Kafka integration only)",
choices=["upsert-kafka", "kafka"]
)
@arg(
"--kafka-key-format",
required=False,
help="Key format. (Kafka integration only)",
choices=["avro", "avro-confluent", "debezium-avro-confluent", "debezium-json", "json"]
)
@arg("--kafka-key-fields", nargs="*", default=[], required=False, help="Key fields names used in table schema")
@arg(
"--kafka-value-format",
required=False,
help="Value format. (Kafka integration only)",
choices=["avro", "avro-confluent", "debezium-avro-confluent", "debezium-json", "json"]
)
@arg(
"--kafka-startup-mode",
required=False,
help="Kafka startup mode (Kafka integration only)",
choices=["earliest-offset", "latest-offset"]
)
@arg("--jdbc-table", required=False, help="Table name in Database, used as a source/sink. (PG integration only)")
@arg("-p", "--partitioned-by", required=False, help="A column from a schema, table will be partitioned by")
@arg(
"-l",
"--like-options",
Expand All @@ -3147,19 +3172,32 @@ def service__flink__table__create(self):
"integration_id",
"table_id",
"table_name",
"schema_sql",
"columns",
]]
new_table = self.client.create_flink_table(
project_name,
self.args.service_name,
self.args.integration_id,
self.args.table_name,
self.args.schema_sql,
self.args.kafka_topic,
self.args.jdbc_table,
self.args.partitioned_by,
self.args.like_options,
)
self.print_response([new_table], json=self.args.json, table_layout=layout)
try:
new_table = self.client.create_flink_table(
project_name,
self.args.service_name,
self.args.integration_id,
self.args.table_name,
self.args.schema_sql,
self.args.kafka_topic,
self.args.kafka_connector_type,
self.args.kafka_key_format,
self.args.kafka_key_fields,
self.args.kafka_value_format,
self.args.kafka_startup_mode,
self.args.jdbc_table,
self.args.like_options,
)
self.print_response([new_table], json=self.args.json, table_layout=layout)
except client.Error as ex:
if "errors" in ex.response.json():
error_reason = ex.response.json()["errors"][0]["message"]
raise argx.UserError("Table creation failed. Reason: {}".format(error_reason))
else:
raise argx.UserError("Table creation failed. Reason: {}".format(ex.response.text))

@arg.project
@arg.service_name
Expand All @@ -3172,6 +3210,8 @@ def service__flink__table__get(self):
"integration_id",
"table_id",
"table_name",
"schema_sql",
"columns",
]]
table = self.client.get_flink_table(
project_name,
Expand Down
18 changes: 15 additions & 3 deletions aiven/client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1031,8 +1031,12 @@ def create_flink_table(
table_name,
schema_sql,
kafka_topic=None,
kafka_connector_type=None,
kafka_key_format=None,
kafka_key_fields=None,
kafka_value_format=None,
kafka_startup_mode=None,
jdbc_table=None,
partitioned_by=None,
like_options=None
):
path = self.build_path(
Expand All @@ -1050,10 +1054,18 @@ def create_flink_table(
}
if kafka_topic:
body["kafka_topic"] = kafka_topic
if kafka_connector_type:
body["kafka_connector_type"] = kafka_connector_type
if kafka_key_format:
body["kafka_key_format"] = kafka_key_format
if kafka_key_fields:
body["kafka_key_fields"] = kafka_key_fields
if kafka_value_format:
body["kafka_value_format"] = kafka_value_format
if kafka_startup_mode:
body["kafka_startup_mode"] = kafka_startup_mode
if jdbc_table:
body["jdbc_table"] = jdbc_table
if partitioned_by:
body["partitioned_by"] = partitioned_by
if like_options:
body["like_options"] = like_options
return self.verify(self.post, path, body=body)
Expand Down

0 comments on commit 23a90d6

Please sign in to comment.