Skip to content

Commit

Permalink
[flink] Support options filter in FlinkGenericCatalogFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangyuf committed Aug 3, 2024
1 parent d94137c commit 055b796
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

Expand Down Expand Up @@ -56,14 +59,34 @@ public Set<org.apache.flink.configuration.ConfigOption<?>> optionalOptions() {
@Override
public FlinkGenericCatalog createCatalog(Context context) {
CatalogFactory hiveFactory = createHiveCatalogFactory(context.getClassLoader());
Catalog catalog = hiveFactory.createCatalog(context);
Context flinkContext = createFlinkContext(context, hiveFactory);
Catalog flinkCatalog = hiveFactory.createCatalog(flinkContext);
return createCatalog(
context.getClassLoader(), context.getOptions(), context.getName(), catalog);
context.getClassLoader(), context.getOptions(), context.getName(), flinkCatalog);
}

@VisibleForTesting
public Context createFlinkContext(Context context, CatalogFactory catalogFactory) {
Set<ConfigOption<?>> catalogOptions = new HashSet<>(catalogFactory.requiredOptions());
catalogOptions.addAll(catalogFactory.optionalOptions());
Map<String, String> contextOptions = context.getOptions();
Map<String, String> flinkCatalogOptions = new HashMap<>();
catalogOptions.forEach(
option -> {
if (contextOptions.containsKey(option.key())) {
flinkCatalogOptions.put(option.key(), contextOptions.get(option.key()));
}
});
return new FactoryUtil.DefaultCatalogContext(
context.getName(),
flinkCatalogOptions,
context.getConfiguration(),
context.getClassLoader());
}

@VisibleForTesting
public static FlinkGenericCatalog createCatalog(
ClassLoader cl, Map<String, String> optionMap, String name, Catalog flinkCatalog) {
ClassLoader cl, Map<String, String> optionMap, String name, Catalog catalog) {
Options options = Options.fromMap(optionMap);
options.set(CatalogOptions.METASTORE, "hive");
FlinkCatalog paimon =
Expand All @@ -75,7 +98,7 @@ public static FlinkGenericCatalog createCatalog(
cl,
options);

return new FlinkGenericCatalog(paimon, flinkCatalog);
return new FlinkGenericCatalog(paimon, catalog);
}

private static CatalogFactory createHiveCatalogFactory(ClassLoader cl) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.flink.table.factories.CatalogFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TestCatalogFactory;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.File;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;

/** Test for {@link FlinkGenericCatalogFactory}. */
public class FlinkGenericCatalogFactoryTest {

private final FlinkGenericCatalogFactory genericCatalogFactory =
new FlinkGenericCatalogFactory();

@TempDir public static java.nio.file.Path temporaryFolder;

@Test
public void testGenericCatalogOptionsFilter() {
String path1 = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();
String path2 = new File(temporaryFolder.toFile(), UUID.randomUUID().toString()).toString();

TestCatalogFactory testCatalogFactory = new TestCatalogFactory();
String catalogName = "test-catalog";
Map<String, String> options = new HashMap<>();
options.put("warehouse", path1);
options.put(TestCatalogFactory.DEFAULT_DATABASE.key(), path2);
CatalogFactory.Context context =
new FactoryUtil.DefaultCatalogContext(
catalogName,
options,
null,
FlinkGenericCatalogFactoryTest.class.getClassLoader());

CatalogFactory.Context flinkContext =
genericCatalogFactory.createFlinkContext(context, testCatalogFactory);

Map<String, String> flinkOptions = flinkContext.getOptions();
assertThat(flinkOptions.get(TestCatalogFactory.DEFAULT_DATABASE.key()))
.isEqualTo(options.get(TestCatalogFactory.DEFAULT_DATABASE.key()));
assertThat(flinkOptions.get("warehouse")).isNull();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public class HiveCatalog extends AbstractCatalog {
private static final String SERDE_CLASS_NAME = "org.apache.paimon.hive.PaimonSerDe";
private static final String STORAGE_HANDLER_CLASS_NAME =
"org.apache.paimon.hive.PaimonStorageHandler";
private static final String HIVE_PREFIX = "hive.";
public static final String HIVE_PREFIX = "hive.";
public static final String HIVE_SITE_FILE = "hive-site.xml";

private final HiveConf hiveConf;
Expand Down

0 comments on commit 055b796

Please sign in to comment.