Skip to content

Commit

Permalink
Revert "[flink] Remove compatibility utils that are used for flink-1.…
Browse files Browse the repository at this point in the history
…14 (apache#3054)"

This reverts commit e589a42
  • Loading branch information
chenxi0599 committed May 7, 2024
1 parent 0c3773d commit e8628c8
Show file tree
Hide file tree
Showing 13 changed files with 174 additions and 44 deletions.
2 changes: 1 addition & 1 deletion docs/themes/book
Submodule book updated 108 files
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,11 @@ public DataStreamSink<?> sinkFrom(
createCommitterFactory(),
createCommittableStateManager()))
.setParallelism(input.getParallelism());
configureGlobalCommitter(committed, commitCpuCores, commitHeapMemory);
configureGlobalCommitter(
committed,
commitCpuCores,
commitHeapMemory,
StreamExecutionEnvironmentUtils.getConfiguration(env));
return committed.addSink(new DiscardingSink<>()).name("end").setParallelism(1);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.api.TableConfig;

/** Utils for {@link TableConfig}. */
public class TableConfigUtils {

public static Configuration extractConfiguration(ReadableConfig readableConfig) {
Configuration to = new Configuration();
copyConfiguration(readableConfig, to);
return to;
}

private static void copyConfiguration(ReadableConfig from, Configuration to) {
if (from instanceof Configuration) {
to.addAll((Configuration) from);
return;
}

if (!(from instanceof TableConfig)) {
throw new RuntimeException("Unknown readableConfig type: " + from.getClass());
}

TableConfig tableConfig = (TableConfig) from;

// copy root configuration first
copyConfiguration(tableConfig.getRootConfiguration(), to);

// copy table configuration
to.addAll(tableConfig.getConfiguration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.paimon.flink.FlinkCatalog;
import org.apache.paimon.flink.FlinkCatalogFactory;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
Expand Down Expand Up @@ -87,7 +88,7 @@ protected void initFlinkEnv(StreamExecutionEnvironment env) {
}

protected void execute(String defaultName) throws Exception {
ReadableConfig conf = env.getConfiguration();
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
String name = conf.getOptional(PipelineOptions.NAME).orElse(defaultName);
env.execute(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder;
import org.apache.paimon.flink.sink.CompactorSinkBuilder;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.table.FileStoreTable;

import org.apache.flink.api.common.RuntimeExecutionMode;
Expand Down Expand Up @@ -74,7 +75,7 @@ public CompactAction withPartitions(List<Map<String, String>> partitions) {

@Override
public void build() {
ReadableConfig conf = env.getConfiguration();
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
FileStoreTable fileStoreTable = (FileStoreTable) table;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.paimon.flink.sink.MultiTablesCompactorSink;
import org.apache.paimon.flink.source.CompactorSourceBuilder;
import org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
Expand Down Expand Up @@ -160,7 +161,7 @@ private void buildForDividedMode() {
!tableMap.isEmpty(),
"no tables to be compacted. possible cause is that there are no tables detected after pattern matching");

ReadableConfig conf = env.getConfiguration();
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
for (Map.Entry<String, FileStoreTable> entry : tableMap.entrySet()) {
Expand All @@ -185,7 +186,7 @@ private void buildForDividedMode() {

private void buildForCombinedMode() {

ReadableConfig conf = env.getConfiguration();
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
// TODO: Currently, multi-tables compaction don't support tables which bucketmode is UNWARE.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,15 @@
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.utils.TableEnvironmentUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.data.RowData;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -67,33 +65,7 @@ public TableResult batchSink(DataStream<RowData> dataStream) {

List<String> sinkIdentifierNames = Collections.singletonList(identifier.getFullName());

return executeInternal(transformations, sinkIdentifierNames);
}

/**
* Invoke {@code TableEnvironmentImpl#executeInternal(List<Transformation<?>>, List<String>)}
* from a {@link StreamTableEnvironment} instance through reflecting.
*/
private TableResult executeInternal(
List<Transformation<?>> transformations, List<String> sinkIdentifierNames) {
Class<?> clazz = batchTEnv.getClass().getSuperclass().getSuperclass();
try {
Method executeInternal =
clazz.getDeclaredMethod("executeInternal", List.class, List.class);
executeInternal.setAccessible(true);

return (TableResult)
executeInternal.invoke(batchTEnv, transformations, sinkIdentifierNames);
} catch (NoSuchMethodException e) {
throw new RuntimeException(
"Failed to get 'TableEnvironmentImpl#executeInternal(List, List)' method "
+ "from given StreamTableEnvironment instance by Java reflection. This is unexpected.",
e);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(
"Failed to invoke 'TableEnvironmentImpl#executeInternal(List, List)' method "
+ "from given StreamTableEnvironment instance by Java reflection. This is unexpected.",
e);
}
return TableEnvironmentUtils.executeInternal(
batchTEnv, transformations, sinkIdentifierNames);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.factories.Factory;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.StringUtils;

Expand Down Expand Up @@ -68,13 +69,13 @@ protected String[] execute(

protected String[] execute(ProcedureContext procedureContext, JobClient jobClient) {
StreamExecutionEnvironment env = procedureContext.getExecutionEnvironment();
ReadableConfig conf = env.getConfiguration();
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
return execute(jobClient, conf.get(TABLE_DML_SYNC));
}

protected String[] execute(StreamExecutionEnvironment env, String defaultJobName)
throws Exception {
ReadableConfig conf = env.getConfiguration();
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
String name = conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
return execute(env.executeAsync(name), conf.get(TABLE_DML_SYNC));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
Expand All @@ -37,7 +38,7 @@
public class QueryService {

public static void build(StreamExecutionEnvironment env, Table table, int parallelism) {
ReadableConfig conf = env.getConfiguration();
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
Preconditions.checkArgument(
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING,
"Query Service only supports streaming mode.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Set;
import java.util.UUID;

import static org.apache.flink.configuration.ClusterOptions.ENABLE_FINE_GRAINED_RESOURCE_MANAGEMENT;
import static org.apache.paimon.CoreOptions.FULL_COMPACTION_DELTA_COMMITS;
import static org.apache.paimon.flink.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
import static org.apache.paimon.flink.FlinkConnectorOptions.SINK_AUTO_TAG_FOR_SAVEPOINT;
Expand Down Expand Up @@ -187,7 +188,9 @@ private boolean hasSinkMaterializer(DataStream<T> input) {
public DataStream<Committable> doWrite(
DataStream<T> input, String commitUser, @Nullable Integer parallelism) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
boolean isStreaming = isStreaming(input);
boolean isStreaming =
env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;

boolean writeOnly = table.coreOptions().writeOnly();
SingleOutputStreamOperator<Committable> written =
Expand Down Expand Up @@ -217,8 +220,10 @@ public DataStream<Committable> doWrite(

protected DataStreamSink<?> doCommit(DataStream<Committable> written, String commitUser) {
StreamExecutionEnvironment env = written.getExecutionEnvironment();
ReadableConfig conf = env.getConfiguration();
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
boolean streamingCheckpointEnabled =
isStreaming(written) && checkpointConfig.isCheckpointingEnabled();
if (streamingCheckpointEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.VersionedSerializerWrapper;
import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
import org.apache.paimon.manifest.WrappedManifestCommittable;
import org.apache.paimon.options.Options;

Expand Down Expand Up @@ -88,7 +89,8 @@ public SingleOutputStreamOperator<MultiTableCommittable> doWrite(
DataStream<RowData> input, String commitUser, Integer parallelism) {
StreamExecutionEnvironment env = input.getExecutionEnvironment();
boolean isStreaming =
env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE)
StreamExecutionEnvironmentUtils.getConfiguration(env)
.get(ExecutionOptions.RUNTIME_MODE)
== RuntimeExecutionMode.STREAMING;

SingleOutputStreamOperator<MultiTableCommittable> written =
Expand All @@ -112,7 +114,7 @@ public SingleOutputStreamOperator<MultiTableCommittable> doWrite(
protected DataStreamSink<?> doCommit(
DataStream<MultiTableCommittable> written, String commitUser) {
StreamExecutionEnvironment env = written.getExecutionEnvironment();
ReadableConfig conf = env.getConfiguration();
ReadableConfig conf = StreamExecutionEnvironmentUtils.getConfiguration(env);
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
boolean isStreaming =
conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/** Utility methods for {@link StreamExecutionEnvironment}. */
public class StreamExecutionEnvironmentUtils {

public static ReadableConfig getConfiguration(StreamExecutionEnvironment env) {
return env.getConfiguration();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.utils;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;

/** Utility methods for {@link TableEnvironment} and its subclasses. */
public class TableEnvironmentUtils {

/**
* Invoke {@code TableEnvironmentImpl#executeInternal(List<Transformation<?>>, List<String>)}
* from a {@link StreamTableEnvironment} instance through reflecting.
*/
public static TableResult executeInternal(
StreamTableEnvironment tEnv,
List<Transformation<?>> transformations,
List<String> sinkIdentifierNames) {
Class<?> clazz = tEnv.getClass().getSuperclass().getSuperclass();
try {
Method executeInternal =
clazz.getDeclaredMethod("executeInternal", List.class, List.class);
executeInternal.setAccessible(true);

return (TableResult) executeInternal.invoke(tEnv, transformations, sinkIdentifierNames);
} catch (NoSuchMethodException e) {
throw new RuntimeException(
"Failed to get 'TableEnvironmentImpl#executeInternal(List, List)' method "
+ "from given StreamTableEnvironment instance by Java reflection. This is unexpected.",
e);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(
"Failed to invoke 'TableEnvironmentImpl#executeInternal(List, List)' method "
+ "from given StreamTableEnvironment instance by Java reflection. This is unexpected.",
e);
}
}
}

0 comments on commit e8628c8

Please sign in to comment.