diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java new file mode 100644 index 000000000000..cabf94d86e67 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.operation.AppendOnlyFileStoreWrite; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.TableCommitImpl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** The Compactor of unaware bucket table to execute {@link AppendOnlyCompactionTask}. */ +public class UnawareBucketCompactor { + private final FileStoreTable table; + private final String commitUser; + + private final transient AppendOnlyFileStoreWrite write; + + protected final transient Queue> result; + + private final transient Supplier compactExecutorsupplier; + + public UnawareBucketCompactor( + FileStoreTable table, + String commitUser, + Supplier lazyCompactExecutor) { + this.table = table; + this.commitUser = commitUser; + this.write = (AppendOnlyFileStoreWrite) table.store().newWrite(commitUser); + this.result = new LinkedList<>(); + this.compactExecutorsupplier = lazyCompactExecutor; + } + + public void processElement(AppendOnlyCompactionTask task) throws Exception { + result.add(compactExecutorsupplier.get().submit(() -> task.doCompact(write))); + } + + public void close() throws Exception { + shutdown(); + } + + @VisibleForTesting + void shutdown() throws Exception { + + List messages = new ArrayList<>(); + for (Future resultFuture : result) { + if (!resultFuture.isDone()) { + // the later tasks should be stopped running + break; + } + try { + messages.add(resultFuture.get()); + } catch (Exception exception) { + // exception should already be handled + } + } + if (messages.isEmpty()) { + return; + } + + try (TableCommitImpl tableCommit = table.newCommit(commitUser)) { + tableCommit.abort(messages); + } + } + + public List prepareCommit(boolean waitCompaction, long checkpointId) + throws IOException { + List tempList = new ArrayList<>(); + try { + while (!result.isEmpty()) { + Future future = result.peek(); + if (!future.isDone() && !waitCompaction) { + break; + } + result.poll(); + tempList.add(future.get()); + } + return tempList.stream() + .map(s -> new Committable(checkpointId, Committable.Kind.FILE, s)) + .collect(Collectors.toList()); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting tasks done.", e); + } catch (Exception e) { + throw new RuntimeException("Encountered an error while do compaction", e); + } + } + + public Iterable> result() { + return result; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java new file mode 100644 index 000000000000..a351d7a3e2fe --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.compact.UnawareBucketCompactor; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.ExecutorThreadFactory; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Operator to execute {@link AppendOnlyCompactionTask} passed for support compacting multi unaware + * bucket tables in combined mode. + */ +public class AppendOnlyMultiTableCompactionWorkerOperator + extends PrepareCommitOperator { + + private static final Logger LOG = + LoggerFactory.getLogger(AppendOnlyMultiTableCompactionWorkerOperator.class); + + private final String commitUser; + private final Catalog.Loader catalogLoader; + + // support multi table compaction + private transient Map compactionHelperContainer; + + private transient ExecutorService lazyCompactExecutor; + + private transient Catalog catalog; + + public AppendOnlyMultiTableCompactionWorkerOperator( + Catalog.Loader catalogLoader, String commitUser, Options options) { + super(options); + this.commitUser = commitUser; + this.catalogLoader = catalogLoader; + } + + @Override + public void open() throws Exception { + LOG.debug("Opened a append-only multi table compaction worker."); + compactionHelperContainer = new HashMap<>(); + catalog = catalogLoader.load(); + } + + @Override + protected List prepareCommit(boolean waitCompaction, long checkpointId) + throws IOException { + List result = new ArrayList<>(); + for (Map.Entry helperEntry : + compactionHelperContainer.entrySet()) { + Identifier tableId = helperEntry.getKey(); + UnawareBucketCompactor helper = helperEntry.getValue(); + + for (Committable committable : helper.prepareCommit(waitCompaction, checkpointId)) { + result.add( + new MultiTableCommittable( + tableId.getDatabaseName(), + tableId.getObjectName(), + committable.checkpointId(), + committable.kind(), + committable.wrappedCommittable())); + } + } + + return result; + } + + @Override + public void processElement(StreamRecord element) + throws Exception { + Identifier identifier = element.getValue().tableIdentifier(); + compactionHelperContainer + .computeIfAbsent(identifier, this::unwareBucketCompactionHelper) + .processElement(element.getValue()); + } + + private UnawareBucketCompactor unwareBucketCompactionHelper(Identifier tableId) { + try { + return new UnawareBucketCompactor( + (FileStoreTable) catalog.getTable(tableId), commitUser, this::workerExecutor); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + } + + private ExecutorService workerExecutor() { + if (lazyCompactExecutor == null) { + lazyCompactExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory( + Thread.currentThread().getName() + + "-append-only-compact-worker")); + } + return lazyCompactExecutor; + } + + @Override + public void close() throws Exception { + if (lazyCompactExecutor != null) { + // ignore runnable tasks in queue + lazyCompactExecutor.shutdownNow(); + if (!lazyCompactExecutor.awaitTermination(120, TimeUnit.SECONDS)) { + LOG.warn( + "Executors shutdown timeout, there may be some files aren't deleted correctly"); + } + + for (UnawareBucketCompactor helperEntry : compactionHelperContainer.values()) { + helperEntry.close(); + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java similarity index 57% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java index 0860ccb3ac8b..8b41d77598fc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java @@ -20,12 +20,11 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.flink.compact.UnawareBucketCompactor; import org.apache.paimon.flink.source.BucketUnawareCompactSource; -import org.apache.paimon.operation.AppendOnlyFileStoreWrite; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -33,34 +32,30 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * Operator to execute {@link AppendOnlyCompactionTask} passed from {@link - * BucketUnawareCompactSource}. + * BucketUnawareCompactSource} for compacting single unaware bucket tables in divided mode. */ -public class AppendOnlyTableCompactionWorkerOperator +public class AppendOnlySingleTableCompactionWorkerOperator extends PrepareCommitOperator { private static final Logger LOG = - LoggerFactory.getLogger(AppendOnlyTableCompactionWorkerOperator.class); + LoggerFactory.getLogger(AppendOnlySingleTableCompactionWorkerOperator.class); private final FileStoreTable table; private final String commitUser; - private transient AppendOnlyFileStoreWrite write; + private transient UnawareBucketCompactor unawareBucketCompactor; + private transient ExecutorService lazyCompactExecutor; - private transient Queue> result; - public AppendOnlyTableCompactionWorkerOperator(FileStoreTable table, String commitUser) { + public AppendOnlySingleTableCompactionWorkerOperator(FileStoreTable table, String commitUser) { super(Options.fromMap(table.options())); this.table = table; this.commitUser = commitUser; @@ -68,44 +63,25 @@ public AppendOnlyTableCompactionWorkerOperator(FileStoreTable table, String comm @VisibleForTesting Iterable> result() { - return result; + return unawareBucketCompactor.result(); } @Override public void open() throws Exception { LOG.debug("Opened a append-only table compaction worker."); - this.write = (AppendOnlyFileStoreWrite) table.store().newWrite(commitUser); - this.result = new LinkedList<>(); + this.unawareBucketCompactor = + new UnawareBucketCompactor(table, commitUser, this::workerExecutor); } @Override protected List prepareCommit(boolean waitCompaction, long checkpointId) throws IOException { - List tempList = new ArrayList<>(); - try { - while (!result.isEmpty()) { - Future future = result.peek(); - if (!future.isDone() && !waitCompaction) { - break; - } - - result.poll(); - tempList.add(future.get()); - } - return tempList.stream() - .map(s -> new Committable(checkpointId, Committable.Kind.FILE, s)) - .collect(Collectors.toList()); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting tasks done.", e); - } catch (Exception e) { - throw new RuntimeException("Encountered an error while do compaction", e); - } + return this.unawareBucketCompactor.prepareCommit(waitCompaction, checkpointId); } @Override public void processElement(StreamRecord element) throws Exception { - AppendOnlyCompactionTask task = element.getValue(); - result.add(workerExecutor().submit(() -> task.doCompact(write))); + this.unawareBucketCompactor.processElement(element.getValue()); } private ExecutorService workerExecutor() { @@ -121,11 +97,6 @@ private ExecutorService workerExecutor() { @Override public void close() throws Exception { - shutdown(); - } - - @VisibleForTesting - void shutdown() throws Exception { if (lazyCompactExecutor != null) { // ignore runnable tasks in queue lazyCompactExecutor.shutdownNow(); @@ -133,25 +104,7 @@ void shutdown() throws Exception { LOG.warn( "Executors shutdown timeout, there may be some files aren't deleted correctly"); } - List messages = new ArrayList<>(); - for (Future resultFuture : result) { - if (!resultFuture.isDone()) { - // the later tasks should be stopped running - break; - } - try { - messages.add(resultFuture.get()); - } catch (Exception exception) { - // exception should already be handled - } - } - if (messages.isEmpty()) { - return; - } - - try (TableCommitImpl tableCommit = table.newCommit(commitUser)) { - tableCommit.abort(messages); - } + this.unawareBucketCompactor.close(); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java index cf825cec7ba4..50f69345983d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java @@ -44,7 +44,7 @@ public static DataStreamSink sink( @Override protected OneInputStreamOperator createWriteOperator( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new AppendOnlyTableCompactionWorkerOperator(table, commitUser); + return new AppendOnlySingleTableCompactionWorkerOperator(table, commitUser); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java similarity index 93% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java index ad4a2a313176..b22d794da9b0 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java @@ -42,14 +42,14 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -/** Tests for {@link AppendOnlyTableCompactionWorkerOperator}. */ -public class AppendOnlyTableCompactionWorkerOperatorTest extends TableTestBase { +/** Tests for {@link AppendOnlySingleTableCompactionWorkerOperator}. */ +public class AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTestBase { @Test public void testAsyncCompactionWorks() throws Exception { createTableDefault(); - AppendOnlyTableCompactionWorkerOperator workerOperator = - new AppendOnlyTableCompactionWorkerOperator(getTableDefault(), "user"); + AppendOnlySingleTableCompactionWorkerOperator workerOperator = + new AppendOnlySingleTableCompactionWorkerOperator(getTableDefault(), "user"); // write 200 files List commitMessages = writeDataDefault(200, 20); @@ -79,7 +79,8 @@ public void testAsyncCompactionWorks() throws Exception { if (now - timeStart > timeout && committables.size() != 4) { throw new RuntimeException( "Timeout waiting for compaction, maybe some error happens in " - + AppendOnlyTableCompactionWorkerOperator.class + + AppendOnlySingleTableCompactionWorkerOperator + .class .getName()); } Thread.sleep(1_000L); @@ -100,8 +101,8 @@ public void testAsyncCompactionWorks() throws Exception { @Test public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { createTableDefault(); - AppendOnlyTableCompactionWorkerOperator workerOperator = - new AppendOnlyTableCompactionWorkerOperator(getTableDefault(), "user"); + AppendOnlySingleTableCompactionWorkerOperator workerOperator = + new AppendOnlySingleTableCompactionWorkerOperator(getTableDefault(), "user"); // write 200 files List commitMessages = writeDataDefault(200, 40); @@ -145,7 +146,7 @@ public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { } // shut down worker operator - workerOperator.shutdown(); + workerOperator.close(); // wait the last runnable in thread pool to stop Thread.sleep(2_000);