diff --git a/aiven/client/cli.py b/aiven/client/cli.py index 5f2c0627..28b25b46 100644 --- a/aiven/client/cli.py +++ b/aiven/client/cli.py @@ -3120,6 +3120,7 @@ def service__flink__table__list(self): "integration_id", "table_id", "table_name", + "schema_sql", ]] self.print_response( self.client.list_flink_tables(project_name, self.args.service_name), json=self.args.json, table_layout=layout @@ -3130,8 +3131,32 @@ def service__flink__table__list(self): @arg("integration_id", help="Service integration ID") @arg("-n", "--table-name", required=True, help="Table name") @arg("--kafka-topic", required=False, help="Topic name, used as a source/sink. (Kafka integration only)") + @arg( + "--kafka-connector-type", + required=False, + help="Kafka connector type (Kafka integration only)", + choices=["upsert-kafka", "kafka"] + ) + @arg( + "--kafka-key-format", + required=False, + help="Key format. (Kafka integration only)", + choices=["avro", "avro-confluent", "debezium-avro-confluent", "debezium-json", "json"] + ) + @arg("--kafka-key-fields", nargs="*", default=[], required=False, help="Key fields names used in table schema") + @arg( + "--kafka-value-format", + required=False, + help="Value format. (Kafka integration only)", + choices=["avro", "avro-confluent", "debezium-avro-confluent", "debezium-json", "json"] + ) + @arg( + "--kafka-startup-mode", + required=False, + help="Kafka startup mode (Kafka integration only)", + choices=["earliest-offset", "latest-offset"] + ) @arg("--jdbc-table", required=False, help="Table name in Database, used as a source/sink. (PG integration only)") - @arg("-p", "--partitioned-by", required=False, help="A column from a schema, table will be partitioned by") @arg( "-l", "--like-options", @@ -3147,19 +3172,32 @@ def service__flink__table__create(self): "integration_id", "table_id", "table_name", + "schema_sql", + "columns", ]] - new_table = self.client.create_flink_table( - project_name, - self.args.service_name, - self.args.integration_id, - self.args.table_name, - self.args.schema_sql, - self.args.kafka_topic, - self.args.jdbc_table, - self.args.partitioned_by, - self.args.like_options, - ) - self.print_response([new_table], json=self.args.json, table_layout=layout) + try: + new_table = self.client.create_flink_table( + project_name, + self.args.service_name, + self.args.integration_id, + self.args.table_name, + self.args.schema_sql, + self.args.kafka_topic, + self.args.kafka_connector_type, + self.args.kafka_key_format, + self.args.kafka_key_fields, + self.args.kafka_value_format, + self.args.kafka_startup_mode, + self.args.jdbc_table, + self.args.like_options, + ) + self.print_response([new_table], json=self.args.json, table_layout=layout) + except client.Error as ex: + if "errors" in ex.response.json(): + error_reason = ex.response.json()["errors"][0]["message"] + raise argx.UserError("Table creation failed. Reason: {}".format(error_reason)) + else: + raise argx.UserError("Table creation failed. Reason: {}".format(ex.response.text)) @arg.project @arg.service_name @@ -3172,6 +3210,8 @@ def service__flink__table__get(self): "integration_id", "table_id", "table_name", + "schema_sql", + "columns", ]] table = self.client.get_flink_table( project_name, diff --git a/aiven/client/client.py b/aiven/client/client.py index fa72ec13..c2e30915 100644 --- a/aiven/client/client.py +++ b/aiven/client/client.py @@ -1031,8 +1031,12 @@ def create_flink_table( table_name, schema_sql, kafka_topic=None, + kafka_connector_type=None, + kafka_key_format=None, + kafka_key_fields=None, + kafka_value_format=None, + kafka_startup_mode=None, jdbc_table=None, - partitioned_by=None, like_options=None ): path = self.build_path( @@ -1050,10 +1054,18 @@ def create_flink_table( } if kafka_topic: body["kafka_topic"] = kafka_topic + if kafka_connector_type: + body["kafka_connector_type"] = kafka_connector_type + if kafka_key_format: + body["kafka_key_format"] = kafka_key_format + if kafka_key_fields: + body["kafka_key_fields"] = kafka_key_fields + if kafka_value_format: + body["kafka_value_format"] = kafka_value_format + if kafka_startup_mode: + body["kafka_startup_mode"] = kafka_startup_mode if jdbc_table: body["jdbc_table"] = jdbc_table - if partitioned_by: - body["partitioned_by"] = partitioned_by if like_options: body["like_options"] = like_options return self.verify(self.post, path, body=body)