From 58beebec9646723ce2615fda11493591d1128ecc Mon Sep 17 00:00:00 2001 From: melin Date: Sun, 7 Apr 2024 15:07:54 +0800 Subject: [PATCH] [flink] Support custom initial flink catalog & env in ActionBase (#3152) --- .../org/apache/paimon/flink/action/ActionBase.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java index 3a9e39b4eda1..dd32c52c6ca0 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java @@ -55,8 +55,8 @@ public ActionBase(String warehouse, Map catalogConfig) { catalogOptions = Options.fromMap(catalogConfig); catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse); - catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions); - flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog, catalogOptions); + catalog = initPaimonCatalog(); + flinkCatalog = initFlinkCatalog(); // use the default env if user doesn't pass one initFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment()); @@ -67,7 +67,15 @@ public ActionBase withStreamExecutionEnvironment(StreamExecutionEnvironment env) return this; } - private void initFlinkEnv(StreamExecutionEnvironment env) { + protected Catalog initPaimonCatalog() { + return FlinkCatalogFactory.createPaimonCatalog(catalogOptions); + } + + protected FlinkCatalog initFlinkCatalog() { + return FlinkCatalogFactory.createCatalog(catalogName, catalog, catalogOptions); + } + + protected void initFlinkEnv(StreamExecutionEnvironment env) { this.env = env; // we enable object reuse, we copy the un-reusable object ourselves. this.env.getConfig().enableObjectReuse();