diff --git a/paimon-core/src/main/java/org/apache/paimon/append/MultiTableAppendOnlyCompactionTask.java b/paimon-core/src/main/java/org/apache/paimon/append/MultiTableAppendOnlyCompactionTask.java new file mode 100644 index 000000000000..1d8290f72bc9 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/MultiTableAppendOnlyCompactionTask.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.DataFileMeta; + +import java.util.List; +import java.util.Objects; + +/** Compaction task for multi table . */ +public class MultiTableAppendOnlyCompactionTask extends AppendOnlyCompactionTask { + private final Identifier tableIdentifier; + + public MultiTableAppendOnlyCompactionTask( + BinaryRow partition, List files, Identifier identifier) { + super(partition, files); + this.tableIdentifier = identifier; + } + + public Identifier tableIdentifier() { + return tableIdentifier; + } + + public int hashCode() { + return Objects.hash(partition(), compactBefore(), compactAfter(), tableIdentifier); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MultiTableAppendOnlyCompactionTask that = (MultiTableAppendOnlyCompactionTask) o; + return Objects.equals(partition(), that.partition()) + && Objects.equals(compactBefore(), that.compactBefore()) + && Objects.equals(compactAfter(), that.compactAfter()) + && Objects.equals(tableIdentifier, that.tableIdentifier); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java index d4185f2c049d..15eb31b002e8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java @@ -19,6 +19,8 @@ package org.apache.paimon.catalog; import org.apache.paimon.annotation.Public; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.StringUtils; import java.io.Serializable; @@ -109,4 +111,8 @@ public int hashCode() { public String toString() { return "Identifier{" + "database='" + database + '\'' + ", table='" + table + '\'' + '}'; } + + public static RowType schema() { + return RowType.builder().fields(DataTypes.STRING(), DataTypes.STRING()).build(); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/IdentifierSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/IdentifierSerializer.java new file mode 100644 index 000000000000..8ff39f099460 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/IdentifierSerializer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.utils.ObjectSerializer; + +/** Serializer for {@link Identifier}. */ +public class IdentifierSerializer extends ObjectSerializer { + + public IdentifierSerializer() { + super(Identifier.schema()); + } + + @Override + public InternalRow toRow(Identifier record) { + return GenericRow.of( + BinaryString.fromString(record.getDatabaseName()), + BinaryString.fromString(record.getObjectName())); + } + + @Override + public Identifier fromRow(InternalRow rowData) { + String databaseName = rowData.isNullAt(0) ? null : rowData.getString(0).toString(); + String tableName = rowData.isNullAt(1) ? null : rowData.getString(1).toString(); + return Identifier.create(databaseName, tableName); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/MultiTableCompactionTaskSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/MultiTableCompactionTaskSerializer.java new file mode 100644 index 000000000000..336064764585 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/MultiTableCompactionTaskSerializer.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.sink; + +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.data.serializer.VersionedSerializer; +import org.apache.paimon.io.DataFileMetaSerializer; +import org.apache.paimon.io.DataInputDeserializer; +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.io.DataOutputView; +import org.apache.paimon.io.DataOutputViewStreamWrapper; +import org.apache.paimon.io.IdentifierSerializer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; +import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; + +/** Serializer for {@link MultiTableAppendOnlyCompactionTask}. */ +public class MultiTableCompactionTaskSerializer + implements VersionedSerializer { + private static final int CURRENT_VERSION = 1; + + private final DataFileMetaSerializer dataFileSerializer; + + private final IdentifierSerializer identifierSerializer; + + public MultiTableCompactionTaskSerializer() { + this.dataFileSerializer = new DataFileMetaSerializer(); + this.identifierSerializer = new IdentifierSerializer(); + } + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(MultiTableAppendOnlyCompactionTask task) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + serialize(task, view); + return out.toByteArray(); + } + + private void serialize(MultiTableAppendOnlyCompactionTask task, DataOutputView view) + throws IOException { + serializeBinaryRow(task.partition(), view); + dataFileSerializer.serializeList(task.compactBefore(), view); + identifierSerializer.serialize(task.tableIdentifier(), view); + } + + @Override + public MultiTableAppendOnlyCompactionTask deserialize(int version, byte[] serialized) + throws IOException { + checkVersion(version); + DataInputDeserializer view = new DataInputDeserializer(serialized); + return deserialize(view); + } + + private MultiTableAppendOnlyCompactionTask deserialize(DataInputView view) throws IOException { + return new MultiTableAppendOnlyCompactionTask( + deserializeBinaryRow(view), + dataFileSerializer.deserializeList(view), + identifierSerializer.deserialize(view)); + } + + public List deserializeList(int version, DataInputView view) + throws IOException { + checkVersion(version); + int length = view.readInt(); + List list = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + list.add(deserialize(view)); + } + return list; + } + + public void serializeList(List list, DataOutputView view) + throws IOException { + view.writeInt(list.size()); + for (MultiTableAppendOnlyCompactionTask commitMessage : list) { + serialize(commitMessage, view); + } + } + + private void checkVersion(int version) { + if (version != CURRENT_VERSION) { + throw new UnsupportedOperationException( + "Expecting MultiTableCompactionTaskSerializer version to be " + + CURRENT_VERSION + + ", but found " + + version + + ".\nCompactionTask is not a compatible data structure. " + + "Please restart the job afresh (do not recover from savepoint)."); + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java index 4be85e5cbe8b..cc8fc98dd4e2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java @@ -170,6 +170,10 @@ public void createTableDefault() throws Exception { catalog.createTable(identifier(), schemaDefault(), true); } + public void createTable(Identifier identifier) throws Exception { + catalog.createTable(identifier, schemaDefault(), false); + } + protected void commitDefault(List messages) throws Exception { BatchTableCommit commit = getTableDefault().newBatchWriteBuilder().newCommit(); commit.commit(messages); @@ -177,16 +181,23 @@ protected void commitDefault(List messages) throws Exception { } protected List writeDataDefault(int size, int times) throws Exception { + return writeData(getTableDefault(), size, times); + } + + protected List writeData(Table table, int size, int times) throws Exception { List messages = new ArrayList<>(); for (int i = 0; i < times; i++) { - messages.addAll(writeOnce(getTableDefault(), i, size)); + messages.addAll(writeOnce(table, i, size)); } - return messages; } public FileStoreTable getTableDefault() throws Exception { - return (FileStoreTable) catalog.getTable(identifier()); + return getTable(identifier()); + } + + public FileStoreTable getTable(Identifier identifier) throws Exception { + return (FileStoreTable) catalog.getTable(identifier); } private List writeOnce(Table table, int time, int size) throws Exception { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/CompactionTaskSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/CompactionTaskSerializerTest.java index 847ee99f2125..355cbf18a80a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/CompactionTaskSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/CompactionTaskSerializerTest.java @@ -19,6 +19,8 @@ package org.apache.paimon.table.sink; import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.catalog.Identifier; import org.junit.jupiter.api.Test; @@ -28,7 +30,7 @@ import static org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link CompactionTaskSerializer}. */ +/** Tests for {@link CompactionTaskSerializer} and {@link MultiTableCompactionTaskSerializer}. */ public class CompactionTaskSerializerTest { @Test @@ -41,4 +43,19 @@ public void testCompactionTaskSerializer() throws IOException { AppendOnlyCompactionTask task1 = serializer.deserialize(serializer.getVersion(), bytes); assertThat(task).isEqualTo(task1); } + + @Test + public void testMultiTableCompactionTaskSerializer() throws IOException { + MultiTableCompactionTaskSerializer serializer = new MultiTableCompactionTaskSerializer(); + MultiTableAppendOnlyCompactionTask task = + new MultiTableAppendOnlyCompactionTask( + row(0), + randomNewFilesIncrement().newFiles(), + Identifier.create("db", "table")); + + byte[] bytes = serializer.serialize(task); + MultiTableAppendOnlyCompactionTask task1 = + serializer.deserialize(serializer.getVersion(), bytes); + assertThat(task).isEqualTo(task1); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java new file mode 100644 index 000000000000..cabf94d86e67 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.operation.AppendOnlyFileStoreWrite; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.TableCommitImpl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** The Compactor of unaware bucket table to execute {@link AppendOnlyCompactionTask}. */ +public class UnawareBucketCompactor { + private final FileStoreTable table; + private final String commitUser; + + private final transient AppendOnlyFileStoreWrite write; + + protected final transient Queue> result; + + private final transient Supplier compactExecutorsupplier; + + public UnawareBucketCompactor( + FileStoreTable table, + String commitUser, + Supplier lazyCompactExecutor) { + this.table = table; + this.commitUser = commitUser; + this.write = (AppendOnlyFileStoreWrite) table.store().newWrite(commitUser); + this.result = new LinkedList<>(); + this.compactExecutorsupplier = lazyCompactExecutor; + } + + public void processElement(AppendOnlyCompactionTask task) throws Exception { + result.add(compactExecutorsupplier.get().submit(() -> task.doCompact(write))); + } + + public void close() throws Exception { + shutdown(); + } + + @VisibleForTesting + void shutdown() throws Exception { + + List messages = new ArrayList<>(); + for (Future resultFuture : result) { + if (!resultFuture.isDone()) { + // the later tasks should be stopped running + break; + } + try { + messages.add(resultFuture.get()); + } catch (Exception exception) { + // exception should already be handled + } + } + if (messages.isEmpty()) { + return; + } + + try (TableCommitImpl tableCommit = table.newCommit(commitUser)) { + tableCommit.abort(messages); + } + } + + public List prepareCommit(boolean waitCompaction, long checkpointId) + throws IOException { + List tempList = new ArrayList<>(); + try { + while (!result.isEmpty()) { + Future future = result.peek(); + if (!future.isDone() && !waitCompaction) { + break; + } + result.poll(); + tempList.add(future.get()); + } + return tempList.stream() + .map(s -> new Committable(checkpointId, Committable.Kind.FILE, s)) + .collect(Collectors.toList()); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting tasks done.", e); + } catch (Exception e) { + throw new RuntimeException("Encountered an error while do compaction", e); + } + } + + public Iterable> result() { + return result; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java new file mode 100644 index 000000000000..882135c03a2b --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.compact.UnawareBucketCompactor; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.ExecutorThreadFactory; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Operator to execute {@link AppendOnlyCompactionTask} passed for support compacting multi unaware + * bucket tables in combined mode. + */ +public class AppendOnlyMultiTableCompactionWorkerOperator + extends PrepareCommitOperator { + + private static final Logger LOG = + LoggerFactory.getLogger(AppendOnlyMultiTableCompactionWorkerOperator.class); + + private final String commitUser; + private final Catalog.Loader catalogLoader; + + // support multi table compaction + private transient Map compactorContainer; + + private transient ExecutorService lazyCompactExecutor; + + private transient Catalog catalog; + + public AppendOnlyMultiTableCompactionWorkerOperator( + Catalog.Loader catalogLoader, String commitUser, Options options) { + super(options); + this.commitUser = commitUser; + this.catalogLoader = catalogLoader; + } + + @Override + public void open() throws Exception { + LOG.debug("Opened a append-only multi table compaction worker."); + compactorContainer = new HashMap<>(); + catalog = catalogLoader.load(); + } + + @Override + protected List prepareCommit(boolean waitCompaction, long checkpointId) + throws IOException { + List result = new ArrayList<>(); + for (Map.Entry compactorWithTable : + compactorContainer.entrySet()) { + Identifier tableId = compactorWithTable.getKey(); + UnawareBucketCompactor compactor = compactorWithTable.getValue(); + + for (Committable committable : compactor.prepareCommit(waitCompaction, checkpointId)) { + result.add( + new MultiTableCommittable( + tableId.getDatabaseName(), + tableId.getObjectName(), + committable.checkpointId(), + committable.kind(), + committable.wrappedCommittable())); + } + } + + return result; + } + + @Override + public void processElement(StreamRecord element) + throws Exception { + Identifier identifier = element.getValue().tableIdentifier(); + compactorContainer + .computeIfAbsent(identifier, this::compactor) + .processElement(element.getValue()); + } + + private UnawareBucketCompactor compactor(Identifier tableId) { + try { + return new UnawareBucketCompactor( + (FileStoreTable) catalog.getTable(tableId), commitUser, this::workerExecutor); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + } + + private ExecutorService workerExecutor() { + if (lazyCompactExecutor == null) { + lazyCompactExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory( + Thread.currentThread().getName() + + "-append-only-compact-worker")); + } + return lazyCompactExecutor; + } + + @Override + public void close() throws Exception { + if (lazyCompactExecutor != null) { + // ignore runnable tasks in queue + lazyCompactExecutor.shutdownNow(); + if (!lazyCompactExecutor.awaitTermination(120, TimeUnit.SECONDS)) { + LOG.warn( + "Executors shutdown timeout, there may be some files aren't deleted correctly"); + } + + for (UnawareBucketCompactor compactor : compactorContainer.values()) { + compactor.close(); + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java similarity index 57% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java index 0860ccb3ac8b..8b41d77598fc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java @@ -20,12 +20,11 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.flink.compact.UnawareBucketCompactor; import org.apache.paimon.flink.source.BucketUnawareCompactSource; -import org.apache.paimon.operation.AppendOnlyFileStoreWrite; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -33,34 +32,30 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * Operator to execute {@link AppendOnlyCompactionTask} passed from {@link - * BucketUnawareCompactSource}. + * BucketUnawareCompactSource} for compacting single unaware bucket tables in divided mode. */ -public class AppendOnlyTableCompactionWorkerOperator +public class AppendOnlySingleTableCompactionWorkerOperator extends PrepareCommitOperator { private static final Logger LOG = - LoggerFactory.getLogger(AppendOnlyTableCompactionWorkerOperator.class); + LoggerFactory.getLogger(AppendOnlySingleTableCompactionWorkerOperator.class); private final FileStoreTable table; private final String commitUser; - private transient AppendOnlyFileStoreWrite write; + private transient UnawareBucketCompactor unawareBucketCompactor; + private transient ExecutorService lazyCompactExecutor; - private transient Queue> result; - public AppendOnlyTableCompactionWorkerOperator(FileStoreTable table, String commitUser) { + public AppendOnlySingleTableCompactionWorkerOperator(FileStoreTable table, String commitUser) { super(Options.fromMap(table.options())); this.table = table; this.commitUser = commitUser; @@ -68,44 +63,25 @@ public AppendOnlyTableCompactionWorkerOperator(FileStoreTable table, String comm @VisibleForTesting Iterable> result() { - return result; + return unawareBucketCompactor.result(); } @Override public void open() throws Exception { LOG.debug("Opened a append-only table compaction worker."); - this.write = (AppendOnlyFileStoreWrite) table.store().newWrite(commitUser); - this.result = new LinkedList<>(); + this.unawareBucketCompactor = + new UnawareBucketCompactor(table, commitUser, this::workerExecutor); } @Override protected List prepareCommit(boolean waitCompaction, long checkpointId) throws IOException { - List tempList = new ArrayList<>(); - try { - while (!result.isEmpty()) { - Future future = result.peek(); - if (!future.isDone() && !waitCompaction) { - break; - } - - result.poll(); - tempList.add(future.get()); - } - return tempList.stream() - .map(s -> new Committable(checkpointId, Committable.Kind.FILE, s)) - .collect(Collectors.toList()); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting tasks done.", e); - } catch (Exception e) { - throw new RuntimeException("Encountered an error while do compaction", e); - } + return this.unawareBucketCompactor.prepareCommit(waitCompaction, checkpointId); } @Override public void processElement(StreamRecord element) throws Exception { - AppendOnlyCompactionTask task = element.getValue(); - result.add(workerExecutor().submit(() -> task.doCompact(write))); + this.unawareBucketCompactor.processElement(element.getValue()); } private ExecutorService workerExecutor() { @@ -121,11 +97,6 @@ private ExecutorService workerExecutor() { @Override public void close() throws Exception { - shutdown(); - } - - @VisibleForTesting - void shutdown() throws Exception { if (lazyCompactExecutor != null) { // ignore runnable tasks in queue lazyCompactExecutor.shutdownNow(); @@ -133,25 +104,7 @@ void shutdown() throws Exception { LOG.warn( "Executors shutdown timeout, there may be some files aren't deleted correctly"); } - List messages = new ArrayList<>(); - for (Future resultFuture : result) { - if (!resultFuture.isDone()) { - // the later tasks should be stopped running - break; - } - try { - messages.add(resultFuture.get()); - } catch (Exception exception) { - // exception should already be handled - } - } - if (messages.isEmpty()) { - return; - } - - try (TableCommitImpl tableCommit = table.newCommit(commitUser)) { - tableCommit.abort(messages); - } + this.unawareBucketCompactor.close(); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java new file mode 100644 index 000000000000..8c2ec7b7a0e2 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.flink.VersionedSerializerWrapper; +import org.apache.paimon.table.sink.MultiTableCompactionTaskSerializer; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy; + +/** Type information of {@link MultiTableAppendOnlyCompactionTask}. */ +public class MultiTableCompactionTaskTypeInfo + extends TypeInformation { + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @Override + public Class getTypeClass() { + return MultiTableAppendOnlyCompactionTask.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer createSerializer( + ExecutionConfig executionConfig) { + return new SimpleVersionedSerializerTypeSerializerProxy( + () -> new VersionedSerializerWrapper<>(new MultiTableCompactionTaskSerializer())) { + @Override + public MultiTableAppendOnlyCompactionTask copy( + MultiTableAppendOnlyCompactionTask from) { + return from; + } + + @Override + public MultiTableAppendOnlyCompactionTask copy( + MultiTableAppendOnlyCompactionTask from, + MultiTableAppendOnlyCompactionTask reuse) { + return from; + } + }; + } + + @Override + public String toString() { + return "MultiTableAppendOnlyCompactionTask"; + } + + @Override + public boolean equals(Object o) { + return o instanceof MultiTableAppendOnlyCompactionTask; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean canEqual(Object o) { + return o instanceof MultiTableAppendOnlyCompactionTask; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java index cf825cec7ba4..50f69345983d 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java @@ -44,7 +44,7 @@ public static DataStreamSink sink( @Override protected OneInputStreamOperator createWriteOperator( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new AppendOnlyTableCompactionWorkerOperator(table, commitUser); + return new AppendOnlySingleTableCompactionWorkerOperator(table, commitUser); } @Override diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperatorTest.java new file mode 100644 index 000000000000..e549f04ec594 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperatorTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.paimon.flink.sink.AppendOnlySingleTableCompactionWorkerOperatorTest.packTask; + +/** test for {@link AppendOnlyMultiTableCompactionWorkerOperator}. */ +public class AppendOnlyMultiTableCompactionWorkerOperatorTest extends TableTestBase { + private final String[] tables = {"a", "b"}; + + @Test + public void testAsyncCompactionWorks() throws Exception { + + AppendOnlyMultiTableCompactionWorkerOperator workerOperator = + new AppendOnlyMultiTableCompactionWorkerOperator( + () -> catalog, "user", new Options()); + + List> records = new ArrayList<>(); + // create table and write + for (String table : tables) { + Identifier identifier = identifier(table); + createTable(identifier); + + // write 200 files + List commitMessages = writeData(getTable(identifier), 200, 20); + + packTask(commitMessages, 5).stream() + .map( + task -> + new StreamRecord<>( + new MultiTableAppendOnlyCompactionTask( + task.partition(), + task.compactBefore(), + identifier))) + .forEach(records::add); + } + + Assertions.assertThat(records.size()).isEqualTo(8); + workerOperator.open(); + + for (StreamRecord record : records) { + workerOperator.processElement(record); + } + + List committables = new ArrayList<>(); + Long timeStart = System.currentTimeMillis(); + long timeout = 60_000L; + + Assertions.assertThatCode( + () -> { + while (committables.size() != 8) { + committables.addAll( + workerOperator.prepareCommit(false, Long.MAX_VALUE)); + + Long now = System.currentTimeMillis(); + if (now - timeStart > timeout && committables.size() != 8) { + throw new RuntimeException( + "Timeout waiting for compaction, maybe some error happens in " + + AppendOnlySingleTableCompactionWorkerOperator + .class + .getName()); + } + Thread.sleep(1_000L); + } + }) + .doesNotThrowAnyException(); + committables.forEach( + a -> + Assertions.assertThat( + ((CommitMessageImpl) a.wrappedCommittable()) + .compactIncrement() + .compactAfter() + .size() + == 1) + .isTrue()); + Set table = + committables.stream() + .map(MultiTableCommittable::getTable) + .collect(Collectors.toSet()); + Assertions.assertThat(table).hasSameElementsAs(Arrays.asList(tables)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java similarity index 91% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java index ad4a2a313176..4f459948c98c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java @@ -42,14 +42,14 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -/** Tests for {@link AppendOnlyTableCompactionWorkerOperator}. */ -public class AppendOnlyTableCompactionWorkerOperatorTest extends TableTestBase { +/** Tests for {@link AppendOnlySingleTableCompactionWorkerOperator}. */ +public class AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTestBase { @Test public void testAsyncCompactionWorks() throws Exception { createTableDefault(); - AppendOnlyTableCompactionWorkerOperator workerOperator = - new AppendOnlyTableCompactionWorkerOperator(getTableDefault(), "user"); + AppendOnlySingleTableCompactionWorkerOperator workerOperator = + new AppendOnlySingleTableCompactionWorkerOperator(getTableDefault(), "user"); // write 200 files List commitMessages = writeDataDefault(200, 20); @@ -79,7 +79,8 @@ public void testAsyncCompactionWorks() throws Exception { if (now - timeStart > timeout && committables.size() != 4) { throw new RuntimeException( "Timeout waiting for compaction, maybe some error happens in " - + AppendOnlyTableCompactionWorkerOperator.class + + AppendOnlySingleTableCompactionWorkerOperator + .class .getName()); } Thread.sleep(1_000L); @@ -100,8 +101,8 @@ public void testAsyncCompactionWorks() throws Exception { @Test public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { createTableDefault(); - AppendOnlyTableCompactionWorkerOperator workerOperator = - new AppendOnlyTableCompactionWorkerOperator(getTableDefault(), "user"); + AppendOnlySingleTableCompactionWorkerOperator workerOperator = + new AppendOnlySingleTableCompactionWorkerOperator(getTableDefault(), "user"); // write 200 files List commitMessages = writeDataDefault(200, 40); @@ -145,7 +146,7 @@ public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { } // shut down worker operator - workerOperator.shutdown(); + workerOperator.close(); // wait the last runnable in thread pool to stop Thread.sleep(2_000); @@ -190,7 +191,8 @@ protected InternalRow dataDefault(int time, int size) { return GenericRow.of(RANDOM.nextInt(), RANDOM.nextLong(), randomString()); } - private List packTask(List messages, int fileSize) { + public static List packTask( + List messages, int fileSize) { List result = new ArrayList<>(); List metas = messages.stream()