diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java index 99d933d89c1b..c0156e841cfa 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/FlinkCdcMultiTableSink.java @@ -32,7 +32,6 @@ import org.apache.paimon.flink.sink.StoreSinkWrite; import org.apache.paimon.flink.sink.StoreSinkWriteImpl; import org.apache.paimon.flink.sink.WrappedManifestCommittableSerializer; -import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.manifest.WrappedManifestCommittable; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -133,10 +132,7 @@ public DataStreamSink sinkFrom( createCommittableStateManager())) .setParallelism(input.getParallelism()); configureGlobalCommitter( - committed, - commitCpuCores, - commitHeapMemory, - StreamExecutionEnvironmentUtils.getConfiguration(env)); + committed, commitCpuCores, commitHeapMemory, env.getConfiguration()); return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableConfigUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableConfigUtils.java deleted file mode 100644 index ffeb9b92ea43..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/TableConfigUtils.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink; - -import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.table.api.TableConfig; - -/** Utils for {@link TableConfig}. */ -public class TableConfigUtils { - - public static Configuration extractConfiguration(ReadableConfig readableConfig) { - Configuration to = new Configuration(); - copyConfiguration(readableConfig, to); - return to; - } - - private static void copyConfiguration(ReadableConfig from, Configuration to) { - if (from instanceof Configuration) { - to.addAll((Configuration) from); - return; - } - - if (!(from instanceof TableConfig)) { - throw new RuntimeException("Unknown readableConfig type: " + from.getClass()); - } - - TableConfig tableConfig = (TableConfig) from; - - // copy root configuration first - copyConfiguration(tableConfig.getRootConfiguration(), to); - - // copy table configuration - to.addAll(tableConfig.getConfiguration()); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java index 7f4c03a38f29..3a9e39b4eda1 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/ActionBase.java @@ -23,7 +23,6 @@ import org.apache.paimon.flink.FlinkCatalog; import org.apache.paimon.flink.FlinkCatalogFactory; import org.apache.paimon.flink.LogicalTypeConversion; -import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.options.CatalogOptions; import org.apache.paimon.options.Options; import org.apache.paimon.types.DataType; @@ -80,7 +79,7 @@ private void initFlinkEnv(StreamExecutionEnvironment env) { } protected void execute(String defaultName) throws Exception { - ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + ReadableConfig conf = env.getConfiguration(); String name = conf.getOptional(PipelineOptions.NAME).orElse(defaultName); env.execute(name); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java index 313a7a5d44d9..e24f8d9a9aab 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java @@ -22,7 +22,6 @@ import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder; import org.apache.paimon.flink.sink.CompactorSinkBuilder; import org.apache.paimon.flink.source.CompactorSourceBuilder; -import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.table.FileStoreTable; import org.apache.flink.api.common.RuntimeExecutionMode; @@ -75,7 +74,7 @@ public CompactAction withPartitions(List> partitions) { @Override public void build() { - ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + ReadableConfig conf = env.getConfiguration(); boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; FileStoreTable fileStoreTable = (FileStoreTable) table; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java index a97b3048e822..408d5768cf4c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java @@ -28,7 +28,6 @@ import org.apache.paimon.flink.sink.MultiTablesCompactorSink; import org.apache.paimon.flink.source.CompactorSourceBuilder; import org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder; -import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -161,7 +160,7 @@ private void buildForDividedMode() { !tableMap.isEmpty(), "no tables to be compacted. possible cause is that there are no tables detected after pattern matching"); - ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + ReadableConfig conf = env.getConfiguration(); boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; for (Map.Entry entry : tableMap.entrySet()) { @@ -186,7 +185,7 @@ private void buildForDividedMode() { private void buildForCombinedMode() { - ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + ReadableConfig conf = env.getConfiguration(); boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; // TODO: Currently, multi-tables compaction don't support tables which bucketmode is UNWARE. diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java index 06d234469ea2..a97335cd2763 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/TableActionBase.java @@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.sink.FlinkSinkBuilder; -import org.apache.paimon.flink.utils.TableEnvironmentUtils; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; import org.apache.paimon.utils.Preconditions; @@ -30,8 +29,11 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.table.api.TableResult; +import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.data.RowData; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -68,8 +70,34 @@ public TableResult batchSink(DataStream dataStream) { List sinkIdentifierNames = Collections.singletonList(identifier.getFullName()); - return TableEnvironmentUtils.executeInternal( - batchTEnv, transformations, sinkIdentifierNames); + return executeInternal(transformations, sinkIdentifierNames); + } + + /** + * Invoke {@code TableEnvironmentImpl#executeInternal(List>, List)} + * from a {@link StreamTableEnvironment} instance through reflecting. + */ + private TableResult executeInternal( + List> transformations, List sinkIdentifierNames) { + Class clazz = batchTEnv.getClass().getSuperclass().getSuperclass(); + try { + Method executeInternal = + clazz.getDeclaredMethod("executeInternal", List.class, List.class); + executeInternal.setAccessible(true); + + return (TableResult) + executeInternal.invoke(batchTEnv, transformations, sinkIdentifierNames); + } catch (NoSuchMethodException e) { + throw new RuntimeException( + "Failed to get 'TableEnvironmentImpl#executeInternal(List, List)' method " + + "from given StreamTableEnvironment instance by Java reflection. This is unexpected.", + e); + } catch (IllegalAccessException | InvocationTargetException e) { + throw new RuntimeException( + "Failed to invoke 'TableEnvironmentImpl#executeInternal(List, List)' method " + + "from given StreamTableEnvironment instance by Java reflection. This is unexpected.", + e); + } } /** diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java index fd7f74148889..7d5542109d28 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java @@ -22,7 +22,6 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.factories.Factory; import org.apache.paimon.flink.action.ActionBase; -import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.table.Table; import org.apache.paimon.utils.StringUtils; @@ -69,13 +68,13 @@ protected String[] execute( protected String[] execute(ProcedureContext procedureContext, JobClient jobClient) { StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment(); - ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + ReadableConfig conf = env.getConfiguration(); return execute(jobClient, conf.get(TABLE_DML_SYNC)); } protected String[] execute(StreamExecutionEnvironment env, String defaultJobName) throws Exception { - ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + ReadableConfig conf = env.getConfiguration(); String name = conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName); return execute(env.executeAsync(name), conf.get(TABLE_DML_SYNC)); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java index 5b2c13c8488d..8a4814d0ae4c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java @@ -20,7 +20,6 @@ import org.apache.paimon.data.InternalRow; import org.apache.paimon.flink.utils.InternalTypeInfo; -import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -38,7 +37,7 @@ public class QueryService { public static void build(StreamExecutionEnvironment env, Table table, int parallelism) { - ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + ReadableConfig conf = env.getConfiguration(); Preconditions.checkArgument( conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING, "Query Service only supports streaming mode."); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java index 545bd7f07072..582fcfc35af2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkSink.java @@ -20,7 +20,6 @@ import org.apache.paimon.CoreOptions.ChangelogProducer; import org.apache.paimon.CoreOptions.TagCreationMode; -import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.manifest.ManifestCommittable; import org.apache.paimon.options.MemorySize; import org.apache.paimon.options.Options; @@ -190,8 +189,7 @@ public DataStream doWrite( DataStream input, String commitUser, @Nullable Integer parallelism) { StreamExecutionEnvironment env = input.getExecutionEnvironment(); boolean isStreaming = - StreamExecutionEnvironmentUtils.getConfiguration(env) - .get(ExecutionOptions.RUNTIME_MODE) + env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; boolean writeOnly = table.coreOptions().writeOnly(); @@ -222,7 +220,7 @@ public DataStream doWrite( protected DataStreamSink doCommit(DataStream written, String commitUser) { StreamExecutionEnvironment env = written.getExecutionEnvironment(); - ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + ReadableConfig conf = env.getConfiguration(); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java index d9ded153d82a..659296633155 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java @@ -20,7 +20,6 @@ import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.VersionedSerializerWrapper; -import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; import org.apache.paimon.manifest.WrappedManifestCommittable; import org.apache.paimon.options.Options; @@ -87,8 +86,7 @@ public SingleOutputStreamOperator doWrite( DataStream input, String commitUser, Integer parallelism) { StreamExecutionEnvironment env = input.getExecutionEnvironment(); boolean isStreaming = - StreamExecutionEnvironmentUtils.getConfiguration(env) - .get(ExecutionOptions.RUNTIME_MODE) + env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; SingleOutputStreamOperator written = @@ -112,7 +110,7 @@ public SingleOutputStreamOperator doWrite( protected DataStreamSink doCommit( DataStream written, String commitUser) { StreamExecutionEnvironment env = written.getExecutionEnvironment(); - ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env); + ReadableConfig conf = env.getConfiguration(); CheckpointConfig checkpointConfig = env.getCheckpointConfig(); boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java deleted file mode 100644 index 9f3b28bbe764..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/StreamExecutionEnvironmentUtils.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.utils; - -import org.apache.flink.configuration.ReadableConfig; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; - -/** Utility methods for {@link StreamExecutionEnvironment}. */ -public class StreamExecutionEnvironmentUtils { - - public static ReadableConfig getConfiguration(StreamExecutionEnvironment env) { - return env.getConfiguration(); - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java deleted file mode 100644 index 6a5820ce84e7..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/utils/TableEnvironmentUtils.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.utils; - -import org.apache.flink.api.dag.Transformation; -import org.apache.flink.table.api.TableEnvironment; -import org.apache.flink.table.api.TableResult; -import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; - -import java.lang.reflect.InvocationTargetException; -import java.lang.reflect.Method; -import java.util.List; - -/** Utility methods for {@link TableEnvironment} and its subclasses. */ -public class TableEnvironmentUtils { - - /** - * Invoke {@code TableEnvironmentImpl#executeInternal(List>, List)} - * from a {@link StreamTableEnvironment} instance through reflecting. - */ - public static TableResult executeInternal( - StreamTableEnvironment tEnv, - List> transformations, - List sinkIdentifierNames) { - Class clazz = tEnv.getClass().getSuperclass().getSuperclass(); - try { - Method executeInternal = - clazz.getDeclaredMethod("executeInternal", List.class, List.class); - executeInternal.setAccessible(true); - - return (TableResult) executeInternal.invoke(tEnv, transformations, sinkIdentifierNames); - } catch (NoSuchMethodException e) { - throw new RuntimeException( - "Failed to get 'TableEnvironmentImpl#executeInternal(List, List)' method " - + "from given StreamTableEnvironment instance by Java reflection. This is unexpected.", - e); - } catch (IllegalAccessException | InvocationTargetException e) { - throw new RuntimeException( - "Failed to invoke 'TableEnvironmentImpl#executeInternal(List, List)' method " - + "from given StreamTableEnvironment instance by Java reflection. This is unexpected.", - e); - } - } -}