diff --git a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java index 3ef3da40e6c34..660ea9b679775 100644 --- a/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java +++ b/paimon-core/src/main/java/org/apache/paimon/append/AppendOnlyCompactionTask.java @@ -18,6 +18,7 @@ package org.apache.paimon.append; +import org.apache.paimon.catalog.Identifier; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.io.CompactIncrement; import org.apache.paimon.io.DataFileMeta; @@ -39,13 +40,22 @@ public class AppendOnlyCompactionTask { private final List compactBefore; private final List compactAfter; + private final Identifier tableIdentifier; + public AppendOnlyCompactionTask(BinaryRow partition, List files) { + this(partition, files, null); + } + + public AppendOnlyCompactionTask( + BinaryRow partition, List files, Identifier identifier) { + Preconditions.checkArgument( files != null && files.size() > 1, "AppendOnlyCompactionTask need more than one file input."); this.partition = partition; compactBefore = new ArrayList<>(files); compactAfter = new ArrayList<>(); + this.tableIdentifier = identifier; } public BinaryRow partition() { @@ -72,8 +82,12 @@ public CommitMessage doCompact(AppendOnlyFileStoreWrite write) throws Exception compactIncrement); } + public Identifier tableIdentifier() { + return tableIdentifier; + } + public int hashCode() { - return Objects.hash(partition, compactBefore, compactAfter); + return Objects.hash(partition, compactBefore, compactAfter, tableIdentifier); } @Override @@ -88,7 +102,8 @@ public boolean equals(Object o) { AppendOnlyCompactionTask that = (AppendOnlyCompactionTask) o; return Objects.equals(partition, that.partition) && Objects.equals(compactBefore, that.compactBefore) - && Objects.equals(compactAfter, that.compactAfter); + && Objects.equals(compactAfter, that.compactAfter) + && Objects.equals(tableIdentifier, that.tableIdentifier); } @Override @@ -97,7 +112,8 @@ public String toString() { "CompactionTask {" + "partition = %s, " + "compactBefore = %s, " - + "compactAfter = %s}", - partition, compactBefore, compactAfter); + + "compactAfter = %s, " + + "tableIdentifier = %s}", + partition, compactBefore, compactAfter, tableIdentifier); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java index a97b3048e822e..e1efbe79d09a6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java @@ -19,13 +19,14 @@ package org.apache.paimon.flink.action; import org.apache.paimon.CoreOptions; +import org.apache.paimon.append.AppendOnlyCompactionTask; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder; import org.apache.paimon.flink.sink.BucketsRowChannelComputer; import org.apache.paimon.flink.sink.CompactorSinkBuilder; -import org.apache.paimon.flink.sink.MultiTablesCompactorSink; +import org.apache.paimon.flink.sink.CombineModeCompactorSink; import org.apache.paimon.flink.source.CompactorSourceBuilder; import org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder; import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; @@ -54,12 +55,15 @@ import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition; -/** Database compact action for Flink. */ +/** + * Database compact action for Flink. + */ public class CompactDatabaseAction extends ActionBase { private static final Logger LOG = LoggerFactory.getLogger(CompactDatabaseAction.class); private Pattern includingPattern = Pattern.compile(".*"); - @Nullable private Pattern excludingPattern; + @Nullable + private Pattern excludingPattern; private Pattern databasePattern = Pattern.compile(".*"); private MultiTablesSinkMode databaseCompactMode = MultiTablesSinkMode.DIVIDED; @@ -167,19 +171,17 @@ private void buildForDividedMode() { for (Map.Entry entry : tableMap.entrySet()) { FileStoreTable fileStoreTable = entry.getValue(); switch (fileStoreTable.bucketMode()) { - case UNAWARE: - { - buildForUnawareBucketCompaction( - env, entry.getKey(), fileStoreTable, isStreaming); - break; - } + case UNAWARE: { + buildForUnawareBucketCompaction( + env, entry.getKey(), fileStoreTable, isStreaming); + break; + } case FIXED: case DYNAMIC: - default: - { - buildForTraditionalCompaction( - env, entry.getKey(), fileStoreTable, isStreaming); - } + default: { + buildForTraditionalCompaction( + env, entry.getKey(), fileStoreTable, isStreaming); + } } } } @@ -197,15 +199,24 @@ private void buildForCombinedMode() { includingPattern, excludingPattern, tableOptions.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis()); - DataStream source = - sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build(); - - DataStream partitioned = + DataStream multiBucketTableSource = partition( - source, + sourceBuilder + .withEnv(env) + .withContinuousMode(isStreaming) + .buildForMultiBucketTableSource(), new BucketsRowChannelComputer(), tableOptions.get(FlinkConnectorOptions.SINK_PARALLELISM)); - new MultiTablesCompactorSink(catalogLoader(), tableOptions).sinkFrom(partitioned); + + + // unaware table + DataStream unawareBucketTableSource = + sourceBuilder + .withEnv(env) + .withContinuousMode(isStreaming) + .buildForUnawareBucketsTableSource(); + + new CombineModeCompactorSink(catalogLoader(), tableOptions).sinkFrom(multiBucketTableSource, unawareBucketTableSource); } private void buildForTraditionalCompaction( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnwareBucketCompactionHelper.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnwareBucketCompactionHelper.java new file mode 100644 index 0000000000000..bd2178af4e1d7 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnwareBucketCompactionHelper.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.operation.AppendOnlyFileStoreWrite; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.TableCommitImpl; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** + * The help class for the Compaction of unware bucket append table to execute {@link + * AppendOnlyCompactionTask}. + */ +public class UnwareBucketCompactionHelper { + private final FileStoreTable table; + private final String commitUser; + + private final transient AppendOnlyFileStoreWrite write; + + protected final transient Queue> result; + + private final transient Supplier compactExecutorsupplier; + + public UnwareBucketCompactionHelper( + FileStoreTable table, + String commitUser, + Supplier lazyCompactExecutor) { + this.table = table; + this.commitUser = commitUser; + this.write = (AppendOnlyFileStoreWrite) table.store().newWrite(commitUser); + this.result = new LinkedList<>(); + this.compactExecutorsupplier = lazyCompactExecutor; + } + + public void processElement(StreamRecord element) throws Exception { + AppendOnlyCompactionTask task = element.getValue(); + result.add(compactExecutorsupplier.get().submit(() -> task.doCompact(write))); + } + + public void close() throws Exception { + shutdown(); + } + + @VisibleForTesting + void shutdown() throws Exception { + + List messages = new ArrayList<>(); + for (Future resultFuture : result) { + if (!resultFuture.isDone()) { + // the later tasks should be stopped running + break; + } + try { + messages.add(resultFuture.get()); + } catch (Exception exception) { + // exception should already be handled + } + } + if (messages.isEmpty()) { + return; + } + + try (TableCommitImpl tableCommit = table.newCommit(commitUser)) { + tableCommit.abort(messages); + } + } + + public List prepareCommit(boolean waitCompaction, long checkpointId) + throws IOException { + List tempList = new ArrayList<>(); + try { + while (!result.isEmpty()) { + Future future = result.peek(); + if (!future.isDone() && !waitCompaction) { + break; + } + result.poll(); + tempList.add(future.get()); + } + return tempList.stream() + .map(s -> new Committable(checkpointId, Committable.Kind.FILE, s)) + .collect(Collectors.toList()); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting tasks done.", e); + } catch (Exception e) { + throw new RuntimeException("Encountered an error while do compaction", e); + } + } + + public Iterable> result() { + return result; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombineModeCompactorSink.java similarity index 74% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombineModeCompactorSink.java index d9ded153d82a6..848a62daf6ca5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombineModeCompactorSink.java @@ -18,6 +18,7 @@ package org.apache.paimon.flink.sink; +import org.apache.paimon.append.AppendOnlyCompactionTask; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.VersionedSerializerWrapper; import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils; @@ -47,7 +48,7 @@ import static org.apache.paimon.utils.Preconditions.checkArgument; /** A sink for processing multi-tables in dedicated compaction job. */ -public class MultiTablesCompactorSink implements Serializable { +public class CombineModeCompactorSink implements Serializable { private static final long serialVersionUID = 1L; private static final String WRITER_NAME = "Writer"; @@ -58,55 +59,79 @@ public class MultiTablesCompactorSink implements Serializable { private final Options options; - public MultiTablesCompactorSink(Catalog.Loader catalogLoader, Options options) { + public CombineModeCompactorSink( + Catalog.Loader catalogLoader, + Options options) { this.catalogLoader = catalogLoader; this.ignorePreviousFiles = false; this.options = options; } - public DataStreamSink sinkFrom(DataStream input) { + public DataStreamSink sinkFrom( + DataStream multiBucketTableSource, DataStream unawareBucketTableSource) { // This commitUser is valid only for new jobs. // After the job starts, this commitUser will be recorded into the states of write and // commit operators. // When the job restarts, commitUser will be recovered from states and this value is // ignored. String initialCommitUser = UUID.randomUUID().toString(); - return sinkFrom(input, initialCommitUser); + return sinkFrom(multiBucketTableSource, unawareBucketTableSource, initialCommitUser); } - public DataStreamSink sinkFrom(DataStream input, String initialCommitUser) { + public DataStreamSink sinkFrom( + DataStream multiBucketTableSource, + DataStream unawareBucketTableSource, + String initialCommitUser) { // do the actually writing action, no snapshot generated in this stage - SingleOutputStreamOperator written = - doWrite(input, initialCommitUser, input.getParallelism()); + DataStream written = + doWrite(multiBucketTableSource, unawareBucketTableSource, initialCommitUser); // commit the committable to generate a new snapshot return doCommit(written, initialCommitUser); } - public SingleOutputStreamOperator doWrite( - DataStream input, String commitUser, Integer parallelism) { - StreamExecutionEnvironment env = input.getExecutionEnvironment(); + public DataStream doWrite( + DataStream multiBucketTableSource, + DataStream unawareBucketTableSource, + String commitUser) { + StreamExecutionEnvironment env = multiBucketTableSource.getExecutionEnvironment(); boolean isStreaming = StreamExecutionEnvironmentUtils.getConfiguration(env) .get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; - SingleOutputStreamOperator written = - input.transform( - WRITER_NAME, + SingleOutputStreamOperator multiBucketTableRewriter = + multiBucketTableSource + .transform( + String.format( + "%s-%s", + "Multi-Bucket-Table", WRITER_NAME), new MultiTableCommittableTypeInfo(), createWriteOperator( env.getCheckpointConfig(), isStreaming, commitUser)) - .setParallelism(parallelism == null ? input.getParallelism() : parallelism); + .setParallelism(multiBucketTableSource.getParallelism()); + + SingleOutputStreamOperator unawareBucketTableRewriter = + unawareBucketTableSource + .transform( + String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME), + new MultiTableCommittableTypeInfo(), + new UnawareCombineCompactionWorkerOperator( + catalogLoader, commitUser, options)) + .setParallelism(unawareBucketTableSource.getParallelism()); if (!isStreaming) { - assertBatchConfiguration(env, written.getParallelism()); + assertBatchConfiguration(env, multiBucketTableRewriter.getParallelism()); + assertBatchConfiguration(env, unawareBucketTableRewriter.getParallelism()); } if (options.get(SINK_USE_MANAGED_MEMORY)) { - declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY)); + declareManagedMemory( + multiBucketTableRewriter, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY)); + declareManagedMemory( + unawareBucketTableRewriter, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY)); } - return written; + return multiBucketTableRewriter.union(unawareBucketTableRewriter); } protected DataStreamSink doCommit( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java index cf825cec7ba41..66cf996de8de7 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java @@ -44,7 +44,7 @@ public static DataStreamSink sink( @Override protected OneInputStreamOperator createWriteOperator( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new AppendOnlyTableCompactionWorkerOperator(table, commitUser); + return new UnawareDividedCompactionWorkerOperator(table, commitUser); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareCombineCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareCombineCompactionWorkerOperator.java new file mode 100644 index 0000000000000..fbbe69e856312 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareCombineCompactionWorkerOperator.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.action.MultiTablesSinkMode; +import org.apache.paimon.flink.compact.UnwareBucketCompactionHelper; +import org.apache.paimon.flink.source.BucketUnawareCompactSource; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.utils.ExecutorThreadFactory; + +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.jetbrains.annotations.NotNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Operator to execute {@link AppendOnlyCompactionTask} passed from {@link + * BucketUnawareCompactSource} for support compacting multi unaware bucket tables in combined mode. + */ +public class UnawareCombineCompactionWorkerOperator + extends PrepareCommitOperator { + + private static final Logger LOG = + LoggerFactory.getLogger(UnawareCombineCompactionWorkerOperator.class); + + private final String commitUser; + private final Catalog.Loader catalogLoader; + + // support multi table compaction + private transient Map compactionHelperContainer; + + private transient ExecutorService lazyCompactExecutor; + + private transient Catalog catalog; + + public UnawareCombineCompactionWorkerOperator( + Catalog.Loader catalogLoader, String commitUser, Options options) { + super(options); + this.commitUser = commitUser; + this.catalogLoader = catalogLoader; + } + + @VisibleForTesting + Iterable> result() { + return compactionHelperContainer.values().stream() + .flatMap(helper -> Lists.newArrayList(helper.result()).stream()) + .collect(Collectors.toList()); + } + + @Override + public void open() throws Exception { + LOG.debug("Opened a append-only multi table compaction worker."); + compactionHelperContainer = new HashMap<>(); + catalog = catalogLoader.load(); + } + + @Override + protected List prepareCommit(boolean waitCompaction, long checkpointId) + throws IOException { + List result = new ArrayList<>(); + for (Map.Entry helperEntry : + compactionHelperContainer.entrySet()) { + Identifier tableId = helperEntry.getKey(); + UnwareBucketCompactionHelper helper = helperEntry.getValue(); + + for (Committable committable : helper.prepareCommit(waitCompaction, checkpointId)) { + result.add( + new MultiTableCommittable( + tableId.getDatabaseName(), + tableId.getObjectName(), + committable.checkpointId(), + committable.kind(), + committable.wrappedCommittable())); + } + } + + return result; + } + + @Override + public void processElement(StreamRecord element) throws Exception { + Identifier identifier = element.getValue().tableIdentifier(); + compactionHelperContainer + .computeIfAbsent(identifier, this::unwareBucketCompactionHelper) + .processElement(element); + } + + @NotNull + private UnwareBucketCompactionHelper unwareBucketCompactionHelper(Identifier tableId) { + try { + return new UnwareBucketCompactionHelper( + (FileStoreTable) catalog.getTable(tableId), commitUser, this::workerExecutor); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + } + + private ExecutorService workerExecutor() { + if (lazyCompactExecutor == null) { + lazyCompactExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory( + Thread.currentThread().getName() + + "-append-only-compact-worker")); + } + return lazyCompactExecutor; + } + + @Override + public void close() throws Exception { + if (lazyCompactExecutor != null) { + // ignore runnable tasks in queue + lazyCompactExecutor.shutdownNow(); + if (!lazyCompactExecutor.awaitTermination(120, TimeUnit.SECONDS)) { + LOG.warn( + "Executors shutdown timeout, there may be some files aren't deleted correctly"); + } + + for (UnwareBucketCompactionHelper helperEntry : compactionHelperContainer.values()) { + helperEntry.close(); + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareDividedCompactionWorkerOperator.java similarity index 57% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareDividedCompactionWorkerOperator.java index 0860ccb3ac8be..9a1d64af23f34 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareDividedCompactionWorkerOperator.java @@ -20,12 +20,11 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.flink.compact.UnwareBucketCompactionHelper; import org.apache.paimon.flink.source.BucketUnawareCompactSource; -import org.apache.paimon.operation.AppendOnlyFileStoreWrite; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -33,34 +32,30 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * Operator to execute {@link AppendOnlyCompactionTask} passed from {@link - * BucketUnawareCompactSource}. + * BucketUnawareCompactSource} for compacting multi unaware bucket tables in divided mode. */ -public class AppendOnlyTableCompactionWorkerOperator +public class UnawareDividedCompactionWorkerOperator extends PrepareCommitOperator { private static final Logger LOG = - LoggerFactory.getLogger(AppendOnlyTableCompactionWorkerOperator.class); + LoggerFactory.getLogger(UnawareDividedCompactionWorkerOperator.class); private final FileStoreTable table; private final String commitUser; - private transient AppendOnlyFileStoreWrite write; + private UnwareBucketCompactionHelper compactionHelper; + private transient ExecutorService lazyCompactExecutor; - private transient Queue> result; - public AppendOnlyTableCompactionWorkerOperator(FileStoreTable table, String commitUser) { + public UnawareDividedCompactionWorkerOperator(FileStoreTable table, String commitUser) { super(Options.fromMap(table.options())); this.table = table; this.commitUser = commitUser; @@ -68,44 +63,25 @@ public AppendOnlyTableCompactionWorkerOperator(FileStoreTable table, String comm @VisibleForTesting Iterable> result() { - return result; + return compactionHelper.result(); } @Override public void open() throws Exception { LOG.debug("Opened a append-only table compaction worker."); - this.write = (AppendOnlyFileStoreWrite) table.store().newWrite(commitUser); - this.result = new LinkedList<>(); + this.compactionHelper = + new UnwareBucketCompactionHelper(table, commitUser, this::workerExecutor); } @Override protected List prepareCommit(boolean waitCompaction, long checkpointId) throws IOException { - List tempList = new ArrayList<>(); - try { - while (!result.isEmpty()) { - Future future = result.peek(); - if (!future.isDone() && !waitCompaction) { - break; - } - - result.poll(); - tempList.add(future.get()); - } - return tempList.stream() - .map(s -> new Committable(checkpointId, Committable.Kind.FILE, s)) - .collect(Collectors.toList()); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting tasks done.", e); - } catch (Exception e) { - throw new RuntimeException("Encountered an error while do compaction", e); - } + return this.compactionHelper.prepareCommit(waitCompaction, checkpointId); } @Override public void processElement(StreamRecord element) throws Exception { - AppendOnlyCompactionTask task = element.getValue(); - result.add(workerExecutor().submit(() -> task.doCompact(write))); + this.compactionHelper.processElement(element); } private ExecutorService workerExecutor() { @@ -121,11 +97,6 @@ private ExecutorService workerExecutor() { @Override public void close() throws Exception { - shutdown(); - } - - @VisibleForTesting - void shutdown() throws Exception { if (lazyCompactExecutor != null) { // ignore runnable tasks in queue lazyCompactExecutor.shutdownNow(); @@ -133,25 +104,7 @@ void shutdown() throws Exception { LOG.warn( "Executors shutdown timeout, there may be some files aren't deleted correctly"); } - List messages = new ArrayList<>(); - for (Future resultFuture : result) { - if (!resultFuture.isDone()) { - // the later tasks should be stopped running - break; - } - try { - messages.add(resultFuture.get()); - } catch (Exception exception) { - // exception should already be handled - } - } - if (messages.isEmpty()) { - return; - } - - try (TableCommitImpl tableCommit = table.newCommit(commitUser)) { - tableCommit.abort(messages); - } + this.compactionHelper.close(); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilder.java index b631d3b7a037b..24378a3ea8311 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilder.java @@ -18,10 +18,13 @@ package org.apache.paimon.flink.source; +import org.apache.paimon.append.AppendOnlyCompactionTask; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.LogicalTypeConversion; +import org.apache.paimon.flink.source.operator.UnawareTablesBatchCompactorSourceFunction; import org.apache.paimon.flink.source.operator.MultiTablesBatchCompactorSourceFunction; import org.apache.paimon.flink.source.operator.MultiTablesStreamingCompactorSourceFunction; +import org.apache.paimon.flink.source.operator.UnwareTablesStreamingCompactorSourceFunction; import org.apache.paimon.table.system.BucketsTable; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Preconditions; @@ -70,13 +73,13 @@ public MultiTablesCompactorSourceBuilder withEnv(StreamExecutionEnvironment env) return this; } - public DataStream build() { + public DataStream buildForMultiBucketTableSource() { Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null."); RowType produceType = BucketsTable.getRowType(); if (isContinuous) { return MultiTablesStreamingCompactorSourceFunction.buildSource( env, - "MultiTables-StreamingCompactorSource", + "Combine-MultiBucketTables--StreamingCompactorSource", InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)), catalogLoader, includingPattern, @@ -86,7 +89,7 @@ public DataStream build() { } else { return MultiTablesBatchCompactorSourceFunction.buildSource( env, - "MultiTables-BatchCompactorSource", + "Combine-MultiBucketTables-BatchCompactorSource", InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)), catalogLoader, includingPattern, @@ -95,4 +98,27 @@ public DataStream build() { monitorInterval); } } + + public DataStream buildForUnawareBucketsTableSource() { + Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null."); + if (isContinuous) { + return UnwareTablesStreamingCompactorSourceFunction.buildSource( + env, + "Combine-UnawareBucketTables-StreamingCompactorSource", + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + monitorInterval); + } else { + return UnawareTablesBatchCompactorSourceFunction.buildSource( + env, + "Combine-UnawareBucketTables-BatchCompactorSource", + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + monitorInterval); + } + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesBatchCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesBatchCompactorSourceFunction.java index 3da9fb2e6a965..215eb956c0d3f 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesBatchCompactorSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesBatchCompactorSourceFunction.java @@ -19,12 +19,9 @@ package org.apache.paimon.flink.source.operator; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.table.source.EndOfScanException; import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.source.StreamTableScan; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -39,11 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.regex.Pattern; -import java.util.stream.Collectors; /** It is responsible for monitoring compactor source in batch mode. */ public class MultiTablesBatchCompactorSourceFunction extends MultiTablesCompactorSourceFunction { @@ -68,38 +61,7 @@ public MultiTablesBatchCompactorSourceFunction( @Override public void run(SourceContext> ctx) throws Exception { - this.ctx = ctx; - if (isRunning) { - boolean isEmpty; - synchronized (ctx.getCheckpointLock()) { - if (!isRunning) { - return; - } - try { - // batch mode do not need check for new tables - List> splits = new ArrayList<>(); - for (Map.Entry entry : scansMap.entrySet()) { - Identifier identifier = entry.getKey(); - StreamTableScan scan = entry.getValue(); - splits.addAll( - scan.plan().splits().stream() - .map(split -> new Tuple2<>(split, identifier.getFullName())) - .collect(Collectors.toList())); - } - - isEmpty = splits.isEmpty(); - splits.forEach(ctx::collect); - - } catch (EndOfScanException esf) { - LOG.info("Catching EndOfStreamException, the stream is finished."); - return; - } - } - if (isEmpty) { - throw new Exception( - "No splits were collected. Please ensure there are tables detected after pattern matching"); - } - } + batchMonitor(ctx); } public static DataStream buildSource( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesCompactorSourceFunction.java index b1f9637c60b07..6e58915f639d5 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesCompactorSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesCompactorSourceFunction.java @@ -22,55 +22,48 @@ import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.EndOfScanException; import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.table.system.BucketsTable; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.regex.Pattern; +import java.util.stream.Collectors; import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.compactOptions; -import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompactTable; /** * This is the single (non-parallel) monitoring task, it is responsible for: * *
    - *
  1. Monitoring snapshots of the Paimon table. - *
  2. Creating the Tuple2<{@link Split}, String> splits corresponding to the incremental files + *
  3. Monitoring snapshots of the Paimon table and the new Paimon table + *
  4. Creating the Tuple2<{@link Split}, String> splits corresponding to the incremental files. *
  5. Assigning them to downstream tasks for further processing. *
* *

The splits to be read are forwarded to the downstream {@link MultiTablesReadOperator} which * can have parallelism greater than one. * - *

Currently, only dedicated compaction job for multi-tables rely on this monitor. + *

Currently, the dedicated compaction of combine mode job for multi-tables with the fix and dynamic bucket rely on this monitor. */ public abstract class MultiTablesCompactorSourceFunction - extends RichSourceFunction> { + extends RuntimeTableMonitorSourceFunction> { private static final long serialVersionUID = 1L; private static final Logger LOG = LoggerFactory.getLogger(MultiTablesCompactorSourceFunction.class); - protected final Catalog.Loader catalogLoader; - protected final Pattern includingPattern; - protected final Pattern excludingPattern; - protected final Pattern databasePattern; - protected final boolean isStreaming; - protected final long monitorInterval; - - protected transient Catalog catalog; protected transient Map tablesMap; protected transient Map scansMap; @@ -81,78 +74,73 @@ public MultiTablesCompactorSourceFunction( Pattern databasePattern, boolean isStreaming, long monitorInterval) { - this.catalogLoader = catalogLoader; - this.includingPattern = includingPattern; - this.excludingPattern = excludingPattern; - this.databasePattern = databasePattern; - this.isStreaming = isStreaming; - this.monitorInterval = monitorInterval; + super( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + monitorInterval); } - protected volatile boolean isRunning = true; - - protected transient SourceContext> ctx; - @Override public void open(Configuration parameters) throws Exception { + super.open(parameters); tablesMap = new HashMap<>(); scansMap = new HashMap<>(); - catalog = catalogLoader.load(); + } - updateTableMap(); + @Override + boolean hasScanned(Identifier identifier) { + return tablesMap.containsKey(identifier); } @Override - public void cancel() { - // this is to cover the case where cancel() is called before the run() - if (ctx != null) { - synchronized (ctx.getCheckpointLock()) { - isRunning = false; - } - } else { - isRunning = false; + void applyFileTable(FileStoreTable fileStoreTable, Identifier identifier) { + if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) { + LOG.info( + String.format("the bucket mode of %s is unware. ", identifier.getFullName()) + + "currently, the table with unware bucket mode is not support in combined mode."); + return; } + + BucketsTable bucketsTable = + new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName()) + .copy(compactOptions(isStreaming)); + tablesMap.put(identifier, bucketsTable); + scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan()); } - protected void updateTableMap() - throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { - List databases = catalog.listDatabases(); - - for (String databaseName : databases) { - if (databasePattern.matcher(databaseName).matches()) { - List tables = catalog.listTables(databaseName); - for (String tableName : tables) { - Identifier identifier = Identifier.create(databaseName, tableName); - if (shouldCompactTable(identifier, includingPattern, excludingPattern) - && (!tablesMap.containsKey(identifier))) { - Table table = catalog.getTable(identifier); - if (!(table instanceof FileStoreTable)) { - LOG.error( - String.format( - "Only FileStoreTable supports compact action. The table type is '%s'.", - table.getClass().getName())); - continue; - } - FileStoreTable fileStoreTable = (FileStoreTable) table; - if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) { - LOG.info( - String.format( - "the bucket mode of %s is unware. ", - identifier.getFullName()) - + "currently, the table with unware bucket mode is not support in combined mode."); - continue; - } - BucketsTable bucketsTable = - new BucketsTable( - fileStoreTable, - isStreaming, - identifier.getDatabaseName()) - .copy(compactOptions(isStreaming)); - tablesMap.put(identifier, bucketsTable); - scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan()); - } + @Nullable + @Override + public Boolean execute() throws Exception { + boolean isEmpty; + synchronized (ctx.getCheckpointLock()) { + if (!isRunning) { + return null; + } + + // check for new tables + updateTableMap(); + + try { + // batch mode do not need check for new tables + List> splits = new ArrayList<>(); + for (Map.Entry entry : scansMap.entrySet()) { + Identifier identifier = entry.getKey(); + StreamTableScan scan = entry.getValue(); + splits.addAll( + scan.plan().splits().stream() + .map(split -> new Tuple2<>(split, identifier.getFullName())) + .collect(Collectors.toList())); } + isEmpty = splits.isEmpty(); + splits.forEach(ctx::collect); + } catch (EndOfScanException esf) { + LOG.info("Catching EndOfStreamException, the stream is finished."); + return null; } } + return isEmpty; } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesStreamingCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesStreamingCompactorSourceFunction.java index 61e085f328a95..bebfd6e1358c8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesStreamingCompactorSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesStreamingCompactorSourceFunction.java @@ -19,12 +19,9 @@ package org.apache.paimon.flink.source.operator; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.table.source.EndOfScanException; import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.source.StreamTableScan; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -39,11 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.regex.Pattern; -import java.util.stream.Collectors; /** It is responsible for monitoring compactor source in streaming mode. */ public class MultiTablesStreamingCompactorSourceFunction @@ -67,43 +60,9 @@ public MultiTablesStreamingCompactorSourceFunction( monitorInterval); } - @SuppressWarnings("BusyWait") @Override public void run(SourceContext> ctx) throws Exception { - this.ctx = ctx; - while (isRunning) { - boolean isEmpty; - synchronized (ctx.getCheckpointLock()) { - if (!isRunning) { - return; - } - try { - // check for new tables - updateTableMap(); - - List> splits = new ArrayList<>(); - for (Map.Entry entry : scansMap.entrySet()) { - Identifier identifier = entry.getKey(); - StreamTableScan scan = entry.getValue(); - splits.addAll( - scan.plan().splits().stream() - .map(split -> new Tuple2<>(split, identifier.getFullName())) - .collect(Collectors.toList())); - } - - isEmpty = splits.isEmpty(); - splits.forEach(ctx::collect); - - } catch (EndOfScanException esf) { - LOG.info("Catching EndOfStreamException, the stream is finished."); - return; - } - } - - if (isEmpty) { - Thread.sleep(monitorInterval); - } - } + incrementMonitor(ctx); } public static DataStream buildSource( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/RuntimeTableMonitorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/RuntimeTableMonitorSourceFunction.java new file mode 100644 index 0000000000000..e9cf12db7d52e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/RuntimeTableMonitorSourceFunction.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.regex.Pattern; + +import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompactTable; + +/** + * This is the single (non-parallel) monitoring task, it is responsible for the new Paimon table. + */ +public abstract class RuntimeTableMonitorSourceFunction extends RichSourceFunction { + + private static final long serialVersionUID = 1L; + + private static final Logger LOG = + LoggerFactory.getLogger(RuntimeTableMonitorSourceFunction.class); + + protected final Catalog.Loader catalogLoader; + protected final Pattern includingPattern; + protected final Pattern excludingPattern; + protected final Pattern databasePattern; + protected final long monitorInterval; + protected final boolean isStreaming; + + protected transient Catalog catalog; + + public RuntimeTableMonitorSourceFunction( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + boolean isStreaming, + long monitorInterval) { + this.catalogLoader = catalogLoader; + this.includingPattern = includingPattern; + this.excludingPattern = excludingPattern; + this.databasePattern = databasePattern; + this.monitorInterval = monitorInterval; + this.isStreaming = isStreaming; + } + + protected volatile boolean isRunning = true; + + protected transient SourceContext ctx; + + @Override + public void open(Configuration parameters) throws Exception { + catalog = catalogLoader.load(); + } + + @Override + public void cancel() { + // this is to cover the case where cancel() is called before the run() + if (ctx != null) { + synchronized (ctx.getCheckpointLock()) { + isRunning = false; + } + } else { + isRunning = false; + } + } + + protected void updateTableMap() + throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { + List databases = catalog.listDatabases(); + + for (String databaseName : databases) { + if (databasePattern.matcher(databaseName).matches()) { + List tables = catalog.listTables(databaseName); + for (String tableName : tables) { + Identifier identifier = Identifier.create(databaseName, tableName); + if (shouldCompactTable(identifier, includingPattern, excludingPattern) + && (!hasScanned(identifier))) { + Table table = catalog.getTable(identifier); + if (!(table instanceof FileStoreTable)) { + LOG.error( + String.format( + "Only FileStoreTable supports compact action. The table type is '%s'.", + table.getClass().getName())); + continue; + } + + FileStoreTable fileStoreTable = (FileStoreTable) table; + if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) { + LOG.info( + String.format( + "the bucket mode of %s is unware. ", + identifier.getFullName()) + + "currently, the table with unware bucket mode is not support in combined mode."); + continue; + } + + applyFileTable(fileStoreTable, identifier); + } + } + } + } + } + + abstract boolean hasScanned(Identifier identifier); + + abstract void applyFileTable(FileStoreTable fileStoreTable, Identifier identifier); + + @SuppressWarnings("BusyWait") + public void incrementMonitor(SourceContext ctx) throws Exception { + this.ctx = ctx; + while (isRunning) { + Boolean isEmpty = execute(); + if (isEmpty == null) return; + if (isEmpty) { + Thread.sleep(monitorInterval); + } + } + } + + public void batchMonitor(SourceContext ctx) throws Exception { + this.ctx = ctx; + if (isRunning) { + Boolean isEmpty = execute(); + if (isEmpty == null) return; + if (isEmpty) { + throw new Exception( + "No file were collected. Please ensure there are tables detected after pattern matching"); + } + } + } + + abstract Boolean execute() throws Exception; +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/UnawareTablesBatchCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/UnawareTablesBatchCompactorSourceFunction.java new file mode 100644 index 0000000000000..1f8e2ce77ffe5 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/UnawareTablesBatchCompactorSourceFunction.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.sink.CompactionTaskTypeInfo; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; + +import java.util.regex.Pattern; + +/** + * It is responsible for the batch compactor source of the table of unaware bucket in combined mode. + */ +public class UnawareTablesBatchCompactorSourceFunction extends UnawareTablesSourceFunction { + public UnawareTablesBatchCompactorSourceFunction( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + super( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + false, + monitorInterval); + } + + @Override + public void run(SourceContext sourceContext) throws Exception { + this.batchMonitor(sourceContext); + } + + public static DataStream buildSource( + StreamExecutionEnvironment env, + String name, + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + UnawareTablesBatchCompactorSourceFunction function = + new UnawareTablesBatchCompactorSourceFunction( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + monitorInterval); + StreamSource + sourceOperator = new StreamSource<>(function); + CompactionTaskTypeInfo compactionTaskTypeInfo = new CompactionTaskTypeInfo(); + SingleOutputStreamOperator source = + new DataStreamSource<>( + env, + compactionTaskTypeInfo, + sourceOperator, + false, + name, + Boundedness.BOUNDED) + .forceNonParallel(); + + PartitionTransformation transformation = + new PartitionTransformation<>( + source.getTransformation(), new RebalancePartitioner<>()); + + return new DataStream<>(env, transformation); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/UnawareTablesSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/UnawareTablesSourceFunction.java new file mode 100644 index 0000000000000..1e193b7813a4e --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/UnawareTablesSourceFunction.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.sink.UnawareCombineCompactionWorkerOperator; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.EndOfScanException; + +import org.apache.flink.configuration.Configuration; +import org.jetbrains.annotations.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; + +/** + * This is the single (non-parallel) monitoring task , it is responsible for the dedicated + * compaction of combine mode job for multi-tables with the unaware bucket. + * + *

    + *
  1. Monitoring snapshots of the Paimon table and the new Paimon table. + *
  2. Creating the {@link AppendOnlyCompactionTask} corresponding to the incremental files. + *
  3. Assigning them to downstream tasks for further processing. + *
+ * + *

The {@link AppendOnlyCompactionTask} to be read are forwarded to the downstream {@link + * UnawareCombineCompactionWorkerOperator} which can have parallelism greater than one. + * + *

Currently, only the dedicated compaction of combine mode job for multi-tables with the fix and + * dynamic bucket rely on this monitor. + */ +public abstract class UnawareTablesSourceFunction + extends RuntimeTableMonitorSourceFunction { + private static final long serialVersionUID = 1L; + + private static final Logger LOG = + LoggerFactory.getLogger(UnawareTablesSourceFunction.class); + + protected transient Map tablesMap; + + public UnawareTablesSourceFunction( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + boolean isStreaming, + long monitorInterval) { + super( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + monitorInterval); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + tablesMap = new HashMap<>(); + updateTableMap(); + } + + @Override + boolean hasScanned(Identifier identifier) { + return tablesMap.containsKey(identifier); + } + + @Override + void applyFileTable(FileStoreTable fileStoreTable, Identifier identifier) { + if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) { + tablesMap.put( + identifier, + new AppendOnlyTableCompactionCoordinator(fileStoreTable, isStreaming)); + } + } + + @Nullable + @Override + public Boolean execute() throws Exception { + boolean isEmpty; + try { + if (!isRunning) { + return null; + } + + updateTableMap(); + // do scan and plan action, emit append-only compaction tasks. + List tasks = new ArrayList<>(); + for (Map.Entry tableIdAndCoordinator : + tablesMap.entrySet()) { + Identifier tableId = tableIdAndCoordinator.getKey(); + AppendOnlyTableCompactionCoordinator compactionCoordinator = + tableIdAndCoordinator.getValue(); + compactionCoordinator.run().stream() + .map( + task -> + new AppendOnlyCompactionTask( + task.partition(), task.compactBefore(), tableId)) + .forEach(tasks::add); + } + + isEmpty = tasks.isEmpty(); + tasks.forEach(ctx::collect); + } catch (EndOfScanException esf) { + LOG.info("Catching EndOfStreamException, the stream is finished."); + return null; + } + return isEmpty; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/UnwareTablesStreamingCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/UnwareTablesStreamingCompactorSourceFunction.java new file mode 100644 index 0000000000000..2597fb8bff5ce --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/UnwareTablesStreamingCompactorSourceFunction.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.sink.CompactionTaskTypeInfo; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.StreamSource; + +import java.util.regex.Pattern; + +/** + * It is responsible for monitoring compactor source in stream mode for the table of unaware bucket. + */ +public class UnwareTablesStreamingCompactorSourceFunction extends UnawareTablesSourceFunction { + public UnwareTablesStreamingCompactorSourceFunction( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + super( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + true, + monitorInterval); + } + + @Override + public void run(SourceContext sourceContext) throws Exception { + this.incrementMonitor(sourceContext); + } + + public static DataStream buildSource( + StreamExecutionEnvironment env, + String name, + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + + UnwareTablesStreamingCompactorSourceFunction function = + new UnwareTablesStreamingCompactorSourceFunction( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + monitorInterval); + StreamSource + sourceOperator = new StreamSource<>(function); + boolean isParallel = false; + CompactionTaskTypeInfo compactionTaskTypeInfo = new CompactionTaskTypeInfo(); + return new DataStreamSource<>( + env, + compactionTaskTypeInfo, + sourceOperator, + isParallel, + name, + Boundedness.CONTINUOUS_UNBOUNDED) + .forceNonParallel() + .rebalance(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/UnwaredSingleCompactionWorkerOperatorTest.java similarity index 93% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/UnwaredSingleCompactionWorkerOperatorTest.java index ad4a2a3131764..b222332af4261 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/UnwaredSingleCompactionWorkerOperatorTest.java @@ -42,14 +42,14 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -/** Tests for {@link AppendOnlyTableCompactionWorkerOperator}. */ -public class AppendOnlyTableCompactionWorkerOperatorTest extends TableTestBase { +/** Tests for {@link UnawareDividedCompactionWorkerOperator}. */ +public class UnwaredSingleCompactionWorkerOperatorTest extends TableTestBase { @Test public void testAsyncCompactionWorks() throws Exception { createTableDefault(); - AppendOnlyTableCompactionWorkerOperator workerOperator = - new AppendOnlyTableCompactionWorkerOperator(getTableDefault(), "user"); + UnawareDividedCompactionWorkerOperator workerOperator = + new UnawareDividedCompactionWorkerOperator(getTableDefault(), "user"); // write 200 files List commitMessages = writeDataDefault(200, 20); @@ -79,7 +79,7 @@ public void testAsyncCompactionWorks() throws Exception { if (now - timeStart > timeout && committables.size() != 4) { throw new RuntimeException( "Timeout waiting for compaction, maybe some error happens in " - + AppendOnlyTableCompactionWorkerOperator.class + + UnawareDividedCompactionWorkerOperator.class .getName()); } Thread.sleep(1_000L); @@ -100,8 +100,8 @@ public void testAsyncCompactionWorks() throws Exception { @Test public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { createTableDefault(); - AppendOnlyTableCompactionWorkerOperator workerOperator = - new AppendOnlyTableCompactionWorkerOperator(getTableDefault(), "user"); + UnawareDividedCompactionWorkerOperator workerOperator = + new UnawareDividedCompactionWorkerOperator(getTableDefault(), "user"); // write 200 files List commitMessages = writeDataDefault(200, 40); @@ -145,7 +145,7 @@ public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { } // shut down worker operator - workerOperator.shutdown(); + workerOperator.close(); // wait the last runnable in thread pool to stop Thread.sleep(2_000); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java index 56e9374d52146..ee7f299cc5c47 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java @@ -168,7 +168,7 @@ public void testBatchRead(boolean defaultOptions) throws Exception { monitorInterval) .withContinuousMode(false) .withEnv(env) - .build(); + .buildForMultiBucketTableSource(); CloseableIterator it = source.executeAndCollect(); List actual = new ArrayList<>(); while (it.hasNext()) { @@ -264,7 +264,7 @@ public void testStreamingRead(boolean defaultOptions) throws Exception { monitorInterval) .withContinuousMode(true) .withEnv(env) - .build(); + .buildForMultiBucketTableSource(); CloseableIterator it = compactorSource.executeAndCollect(); List actual = new ArrayList<>(); @@ -433,7 +433,7 @@ public void testIncludeAndExcludeTableRead(boolean defaultOptions) throws Except monitorInterval) .withContinuousMode(true) .withEnv(env) - .build(); + .buildForMultiBucketTableSource(); CloseableIterator it = compactorSource.executeAndCollect(); List actual = new ArrayList<>();