Skip to content

Commit

Permalink
[flink] Support custom initial flink catalog & env in ActionBase (#3152)
Browse files Browse the repository at this point in the history
  • Loading branch information
melin authored Apr 7, 2024
1 parent ca88ed9 commit 58beebe
Showing 1 changed file with 11 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ public ActionBase(String warehouse, Map<String, String> catalogConfig) {
catalogOptions = Options.fromMap(catalogConfig);
catalogOptions.set(CatalogOptions.WAREHOUSE, warehouse);

catalog = FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
flinkCatalog = FlinkCatalogFactory.createCatalog(catalogName, catalog, catalogOptions);
catalog = initPaimonCatalog();
flinkCatalog = initFlinkCatalog();

// use the default env if user doesn't pass one
initFlinkEnv(StreamExecutionEnvironment.getExecutionEnvironment());
Expand All @@ -67,7 +67,15 @@ public ActionBase withStreamExecutionEnvironment(StreamExecutionEnvironment env)
return this;
}

private void initFlinkEnv(StreamExecutionEnvironment env) {
protected Catalog initPaimonCatalog() {
return FlinkCatalogFactory.createPaimonCatalog(catalogOptions);
}

protected FlinkCatalog initFlinkCatalog() {
return FlinkCatalogFactory.createCatalog(catalogName, catalog, catalogOptions);
}

protected void initFlinkEnv(StreamExecutionEnvironment env) {
this.env = env;
// we enable object reuse, we copy the un-reusable object ourselves.
this.env.getConfig().enableObjectReuse();
Expand Down

0 comments on commit 58beebe

Please sign in to comment.