diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/ExceptionUtils.java b/paimon-common/src/main/java/org/apache/paimon/utils/ExceptionUtils.java index 0009d6e247fd..ad6990260813 100644 --- a/paimon-common/src/main/java/org/apache/paimon/utils/ExceptionUtils.java +++ b/paimon-common/src/main/java/org/apache/paimon/utils/ExceptionUtils.java @@ -24,6 +24,7 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.lang.reflect.Field; +import java.util.List; import java.util.Locale; import java.util.Optional; import java.util.concurrent.CompletionException; @@ -561,6 +562,20 @@ public static void checkInterrupted(Throwable e) { } } + public static void throwMultiException(List exceptions) throws Exception { + if (!exceptions.isEmpty()) { + if (exceptions.size() == 1) { + throw exceptions.get(0); + } else { + Exception compoundException = new Exception("Multi exceptions occurred"); + for (Exception e : exceptions) { + compoundException.addSuppressed(e); + } + throw compoundException; + } + } + } + // ------------------------------------------------------------------------ /** Private constructor to prevent instantiation. */ diff --git a/paimon-core/src/main/java/org/apache/paimon/append/MultiTableAppendOnlyCompactionTask.java b/paimon-core/src/main/java/org/apache/paimon/append/MultiTableAppendOnlyCompactionTask.java new file mode 100644 index 000000000000..bd63a8cee7fd --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/append/MultiTableAppendOnlyCompactionTask.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.append; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.io.DataFileMeta; + +import java.util.List; +import java.util.Objects; + +/** Compaction task for multi table . */ +public class MultiTableAppendOnlyCompactionTask extends AppendOnlyCompactionTask { + + private final Identifier tableIdentifier; + + public MultiTableAppendOnlyCompactionTask( + BinaryRow partition, List files, Identifier identifier) { + super(partition, files); + this.tableIdentifier = identifier; + } + + public Identifier tableIdentifier() { + return tableIdentifier; + } + + public int hashCode() { + return Objects.hash(partition(), compactBefore(), compactAfter(), tableIdentifier); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + MultiTableAppendOnlyCompactionTask that = (MultiTableAppendOnlyCompactionTask) o; + return Objects.equals(partition(), that.partition()) + && Objects.equals(compactBefore(), that.compactBefore()) + && Objects.equals(compactAfter(), that.compactAfter()) + && Objects.equals(tableIdentifier, that.tableIdentifier); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java index d4185f2c049d..15eb31b002e8 100644 --- a/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java +++ b/paimon-core/src/main/java/org/apache/paimon/catalog/Identifier.java @@ -19,6 +19,8 @@ package org.apache.paimon.catalog; import org.apache.paimon.annotation.Public; +import org.apache.paimon.types.DataTypes; +import org.apache.paimon.types.RowType; import org.apache.paimon.utils.StringUtils; import java.io.Serializable; @@ -109,4 +111,8 @@ public int hashCode() { public String toString() { return "Identifier{" + "database='" + database + '\'' + ", table='" + table + '\'' + '}'; } + + public static RowType schema() { + return RowType.builder().fields(DataTypes.STRING(), DataTypes.STRING()).build(); + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/io/IdentifierSerializer.java b/paimon-core/src/main/java/org/apache/paimon/io/IdentifierSerializer.java new file mode 100644 index 000000000000..8ff39f099460 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/io/IdentifierSerializer.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.io; + +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.data.BinaryString; +import org.apache.paimon.data.GenericRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.utils.ObjectSerializer; + +/** Serializer for {@link Identifier}. */ +public class IdentifierSerializer extends ObjectSerializer { + + public IdentifierSerializer() { + super(Identifier.schema()); + } + + @Override + public InternalRow toRow(Identifier record) { + return GenericRow.of( + BinaryString.fromString(record.getDatabaseName()), + BinaryString.fromString(record.getObjectName())); + } + + @Override + public Identifier fromRow(InternalRow rowData) { + String databaseName = rowData.isNullAt(0) ? null : rowData.getString(0).toString(); + String tableName = rowData.isNullAt(1) ? null : rowData.getString(1).toString(); + return Identifier.create(databaseName, tableName); + } +} diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/MultiTableCompactionTaskSerializer.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/MultiTableCompactionTaskSerializer.java new file mode 100644 index 000000000000..d1b28af57927 --- /dev/null +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/MultiTableCompactionTaskSerializer.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.table.sink; + +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.data.serializer.VersionedSerializer; +import org.apache.paimon.io.DataFileMetaSerializer; +import org.apache.paimon.io.DataInputDeserializer; +import org.apache.paimon.io.DataInputView; +import org.apache.paimon.io.DataOutputView; +import org.apache.paimon.io.DataOutputViewStreamWrapper; +import org.apache.paimon.io.IdentifierSerializer; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; +import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow; + +/** Serializer for {@link MultiTableAppendOnlyCompactionTask}. */ +public class MultiTableCompactionTaskSerializer + implements VersionedSerializer { + + private static final int CURRENT_VERSION = 1; + + private final DataFileMetaSerializer dataFileSerializer; + + private final IdentifierSerializer identifierSerializer; + + public MultiTableCompactionTaskSerializer() { + this.dataFileSerializer = new DataFileMetaSerializer(); + this.identifierSerializer = new IdentifierSerializer(); + } + + @Override + public int getVersion() { + return CURRENT_VERSION; + } + + @Override + public byte[] serialize(MultiTableAppendOnlyCompactionTask task) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputViewStreamWrapper view = new DataOutputViewStreamWrapper(out); + serialize(task, view); + return out.toByteArray(); + } + + private void serialize(MultiTableAppendOnlyCompactionTask task, DataOutputView view) + throws IOException { + serializeBinaryRow(task.partition(), view); + dataFileSerializer.serializeList(task.compactBefore(), view); + identifierSerializer.serialize(task.tableIdentifier(), view); + } + + @Override + public MultiTableAppendOnlyCompactionTask deserialize(int version, byte[] serialized) + throws IOException { + checkVersion(version); + DataInputDeserializer view = new DataInputDeserializer(serialized); + return deserialize(view); + } + + private MultiTableAppendOnlyCompactionTask deserialize(DataInputView view) throws IOException { + return new MultiTableAppendOnlyCompactionTask( + deserializeBinaryRow(view), + dataFileSerializer.deserializeList(view), + identifierSerializer.deserialize(view)); + } + + public List deserializeList(int version, DataInputView view) + throws IOException { + checkVersion(version); + int length = view.readInt(); + List list = new ArrayList<>(length); + for (int i = 0; i < length; i++) { + list.add(deserialize(view)); + } + return list; + } + + public void serializeList(List list, DataOutputView view) + throws IOException { + view.writeInt(list.size()); + for (MultiTableAppendOnlyCompactionTask commitMessage : list) { + serialize(commitMessage, view); + } + } + + private void checkVersion(int version) { + if (version != CURRENT_VERSION) { + throw new UnsupportedOperationException( + "Expecting MultiTableCompactionTaskSerializer version to be " + + CURRENT_VERSION + + ", but found " + + version + + ".\nCompactionTask is not a compatible data structure. " + + "Please restart the job afresh (do not recover from savepoint)."); + } + } +} diff --git a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java index 4be85e5cbe8b..cc8fc98dd4e2 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java @@ -170,6 +170,10 @@ public void createTableDefault() throws Exception { catalog.createTable(identifier(), schemaDefault(), true); } + public void createTable(Identifier identifier) throws Exception { + catalog.createTable(identifier, schemaDefault(), false); + } + protected void commitDefault(List messages) throws Exception { BatchTableCommit commit = getTableDefault().newBatchWriteBuilder().newCommit(); commit.commit(messages); @@ -177,16 +181,23 @@ protected void commitDefault(List messages) throws Exception { } protected List writeDataDefault(int size, int times) throws Exception { + return writeData(getTableDefault(), size, times); + } + + protected List writeData(Table table, int size, int times) throws Exception { List messages = new ArrayList<>(); for (int i = 0; i < times; i++) { - messages.addAll(writeOnce(getTableDefault(), i, size)); + messages.addAll(writeOnce(table, i, size)); } - return messages; } public FileStoreTable getTableDefault() throws Exception { - return (FileStoreTable) catalog.getTable(identifier()); + return getTable(identifier()); + } + + public FileStoreTable getTable(Identifier identifier) throws Exception { + return (FileStoreTable) catalog.getTable(identifier); } private List writeOnce(Table table, int time, int size) throws Exception { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/sink/CompactionTaskSerializerTest.java b/paimon-core/src/test/java/org/apache/paimon/table/sink/CompactionTaskSerializerTest.java index 847ee99f2125..355cbf18a80a 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/sink/CompactionTaskSerializerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/sink/CompactionTaskSerializerTest.java @@ -19,6 +19,8 @@ package org.apache.paimon.table.sink; import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.catalog.Identifier; import org.junit.jupiter.api.Test; @@ -28,7 +30,7 @@ import static org.apache.paimon.mergetree.compact.MergeTreeCompactManagerTest.row; import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link CompactionTaskSerializer}. */ +/** Tests for {@link CompactionTaskSerializer} and {@link MultiTableCompactionTaskSerializer}. */ public class CompactionTaskSerializerTest { @Test @@ -41,4 +43,19 @@ public void testCompactionTaskSerializer() throws IOException { AppendOnlyCompactionTask task1 = serializer.deserialize(serializer.getVersion(), bytes); assertThat(task).isEqualTo(task1); } + + @Test + public void testMultiTableCompactionTaskSerializer() throws IOException { + MultiTableCompactionTaskSerializer serializer = new MultiTableCompactionTaskSerializer(); + MultiTableAppendOnlyCompactionTask task = + new MultiTableAppendOnlyCompactionTask( + row(0), + randomNewFilesIncrement().newFiles(), + Identifier.create("db", "table")); + + byte[] bytes = serializer.serialize(task); + MultiTableAppendOnlyCompactionTask task1 = + serializer.deserialize(serializer.getVersion(), bytes); + assertThat(task).isEqualTo(task1); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java index 8cbef4fb94de..051b2baab410 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactDatabaseAction.java @@ -19,15 +19,16 @@ package org.apache.paimon.flink.action; import org.apache.paimon.CoreOptions; +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.flink.FlinkConnectorOptions; import org.apache.paimon.flink.compact.UnawareBucketCompactionTopoBuilder; import org.apache.paimon.flink.sink.BucketsRowChannelComputer; +import org.apache.paimon.flink.sink.CombinedTableCompactorSink; import org.apache.paimon.flink.sink.CompactorSinkBuilder; -import org.apache.paimon.flink.sink.MultiTablesCompactorSink; +import org.apache.paimon.flink.source.CombinedTableCompactorSourceBuilder; import org.apache.paimon.flink.source.CompactorSourceBuilder; -import org.apache.paimon.flink.source.MultiTablesCompactorSourceBuilder; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.Table; @@ -188,23 +189,34 @@ private void buildForCombinedMode() { ReadableConfig conf = env.getConfiguration(); boolean isStreaming = conf.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; - // TODO: Currently, multi-tables compaction don't support tables which bucketmode is UNWARE. - MultiTablesCompactorSourceBuilder sourceBuilder = - new MultiTablesCompactorSourceBuilder( + CombinedTableCompactorSourceBuilder sourceBuilder = + new CombinedTableCompactorSourceBuilder( catalogLoader(), databasePattern, includingPattern, excludingPattern, tableOptions.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL).toMillis()); - DataStream source = - sourceBuilder.withEnv(env).withContinuousMode(isStreaming).build(); - DataStream partitioned = + // multi bucket table which has multi bucket in a partition like fix bucket and dynamic + // bucket + DataStream awareBucketTableSource = partition( - source, + sourceBuilder + .withEnv(env) + .withContinuousMode(isStreaming) + .buildAwareBucketTableSource(), new BucketsRowChannelComputer(), tableOptions.get(FlinkConnectorOptions.SINK_PARALLELISM)); - new MultiTablesCompactorSink(catalogLoader(), tableOptions).sinkFrom(partitioned); + + // unaware bucket table + DataStream unawareBucketTableSource = + sourceBuilder + .withEnv(env) + .withContinuousMode(isStreaming) + .buildForUnawareBucketsTableSource(); + + new CombinedTableCompactorSink(catalogLoader(), tableOptions) + .sinkFrom(awareBucketTableSource, unawareBucketTableSource); } private void buildForTraditionalCompaction( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java new file mode 100644 index 000000000000..68d66ec5f9ee --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiAwareBucketTableScan.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.StreamTableScan; +import org.apache.paimon.table.system.BucketsTable; + +import org.apache.flink.api.java.tuple.Tuple2; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.compactOptions; + +/** + * This class is responsible for implementing the scanning logic {@link MultiTableScanBase} for the + * table with multi bucket such as dynamic or fixed bucket table. + */ +public class MultiAwareBucketTableScan extends MultiTableScanBase> { + + protected transient Map tablesMap; + protected transient Map scansMap; + + public MultiAwareBucketTableScan( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + boolean isStreaming, + AtomicBoolean isRunning) { + super( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + isRunning); + tablesMap = new HashMap<>(); + scansMap = new HashMap<>(); + } + + @Override + List> doScan() { + List> splits = new ArrayList<>(); + for (Map.Entry entry : scansMap.entrySet()) { + Identifier identifier = entry.getKey(); + StreamTableScan scan = entry.getValue(); + splits.addAll( + scan.plan().splits().stream() + .map(split -> new Tuple2<>(split, identifier.getFullName())) + .collect(Collectors.toList())); + } + return splits; + } + + @Override + public boolean checkTableScanned(Identifier identifier) { + return tablesMap.containsKey(identifier); + } + + @Override + public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) { + if (fileStoreTable.bucketMode() != BucketMode.BUCKET_UNAWARE) { + BucketsTable bucketsTable = + new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName()) + .copy(compactOptions(isStreaming)); + tablesMap.put(identifier, bucketsTable); + scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan()); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java new file mode 100644 index 000000000000..05ff4ea9f4f2 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiTableScanBase.java @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.EndOfScanException; +import org.apache.paimon.table.source.Split; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; + +import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompactTable; + +/** + * This class is responsible for implementing the scanning logic for the table of different type + * buckets during compaction. + * + * @param the result of scanning file : + *
    + *
  1. Tuple2<{@link Split},String> for the table with multi buckets, such as dynamic or fixed + * bucket table. + *
  2. {@link MultiTableAppendOnlyCompactionTask} for the table witch fixed single bucket + * ,such as unaware bucket table. + *
+ */ +public abstract class MultiTableScanBase { + + private static final Logger LOG = LoggerFactory.getLogger(MultiTableScanBase.class); + protected final Pattern includingPattern; + protected final Pattern excludingPattern; + protected final Pattern databasePattern; + + protected transient Catalog catalog; + + protected AtomicBoolean isRunning; + protected boolean isStreaming; + + public MultiTableScanBase( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + boolean isStreaming, + AtomicBoolean isRunning) { + catalog = catalogLoader.load(); + + this.includingPattern = includingPattern; + this.excludingPattern = excludingPattern; + this.databasePattern = databasePattern; + this.isRunning = isRunning; + this.isStreaming = isStreaming; + } + + protected void updateTableMap() + throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { + List databases = catalog.listDatabases(); + + for (String databaseName : databases) { + if (databasePattern.matcher(databaseName).matches()) { + List tables = catalog.listTables(databaseName); + for (String tableName : tables) { + Identifier identifier = Identifier.create(databaseName, tableName); + if (shouldCompactTable(identifier, includingPattern, excludingPattern) + && (!checkTableScanned(identifier))) { + Table table = catalog.getTable(identifier); + if (!(table instanceof FileStoreTable)) { + LOG.error( + String.format( + "Only FileStoreTable supports compact action. The table type is '%s'.", + table.getClass().getName())); + continue; + } + + FileStoreTable fileStoreTable = (FileStoreTable) table; + addScanTable(fileStoreTable, identifier); + } + } + } + } + } + + public ScanResult scanTable(SourceFunction.SourceContext ctx) + throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException { + try { + if (!isRunning.get()) { + return ScanResult.FINISHED; + } + + updateTableMap(); + List tasks = doScan(); + + tasks.forEach(ctx::collect); + return tasks.isEmpty() ? ScanResult.IS_EMPTY : ScanResult.IS_NON_EMPTY; + } catch (EndOfScanException esf) { + LOG.info("Catching EndOfStreamException, the stream is finished."); + return ScanResult.FINISHED; + } + } + + abstract List doScan(); + + /** Check if table has been scanned. */ + abstract boolean checkTableScanned(Identifier identifier); + + /** Add the scan table to the table map. */ + abstract void addScanTable(FileStoreTable fileStoreTable, Identifier identifier); + + /** the result of table scanning. */ + public enum ScanResult { + FINISHED, + IS_EMPTY, + IS_NON_EMPTY + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java new file mode 100644 index 000000000000..a56109d3e5f4 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiUnawareBucketTableScan.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator; +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; + +/** + * This class is responsible for implementing the scanning logic {@link MultiTableScanBase} for the + * table with fix single bucket such as unaware bucket table. + */ +public class MultiUnawareBucketTableScan + extends MultiTableScanBase { + + private static final Logger LOG = LoggerFactory.getLogger(MultiUnawareBucketTableScan.class); + + protected transient Map tablesMap; + + public MultiUnawareBucketTableScan( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + boolean isStreaming, + AtomicBoolean isRunning) { + super( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + isRunning); + tablesMap = new HashMap<>(); + } + + @Override + List doScan() { + // do scan and plan action, emit append-only compaction tasks. + List tasks = new ArrayList<>(); + for (Map.Entry tableIdAndCoordinator : + tablesMap.entrySet()) { + Identifier tableId = tableIdAndCoordinator.getKey(); + AppendOnlyTableCompactionCoordinator compactionCoordinator = + tableIdAndCoordinator.getValue(); + compactionCoordinator.run().stream() + .map( + task -> + new MultiTableAppendOnlyCompactionTask( + task.partition(), task.compactBefore(), tableId)) + .forEach(tasks::add); + } + return tasks; + } + + @Override + public boolean checkTableScanned(Identifier identifier) { + return tablesMap.containsKey(identifier); + } + + @Override + public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) { + if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) { + tablesMap.put( + identifier, + new AppendOnlyTableCompactionCoordinator(fileStoreTable, isStreaming)); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java new file mode 100644 index 000000000000..1f05321f2fa7 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketCompactor.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.annotation.VisibleForTesting; +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.flink.sink.Committable; +import org.apache.paimon.operation.AppendOnlyFileStoreWrite; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.TableCommitImpl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.function.Supplier; +import java.util.stream.Collectors; + +/** The Compactor of unaware bucket table to execute {@link AppendOnlyCompactionTask}. */ +public class UnawareBucketCompactor { + + private final FileStoreTable table; + private final String commitUser; + + private final transient AppendOnlyFileStoreWrite write; + + protected final transient Queue> result; + + private final transient Supplier compactExecutorsupplier; + + public UnawareBucketCompactor( + FileStoreTable table, + String commitUser, + Supplier lazyCompactExecutor) { + this.table = table; + this.commitUser = commitUser; + this.write = (AppendOnlyFileStoreWrite) table.store().newWrite(commitUser); + this.result = new LinkedList<>(); + this.compactExecutorsupplier = lazyCompactExecutor; + } + + public void processElement(AppendOnlyCompactionTask task) throws Exception { + result.add(compactExecutorsupplier.get().submit(() -> task.doCompact(write))); + } + + public void close() throws Exception { + shutdown(); + } + + @VisibleForTesting + void shutdown() throws Exception { + + List messages = new ArrayList<>(); + for (Future resultFuture : result) { + if (!resultFuture.isDone()) { + // the later tasks should be stopped running + break; + } + try { + messages.add(resultFuture.get()); + } catch (Exception exception) { + // exception should already be handled + } + } + if (messages.isEmpty()) { + return; + } + + try (TableCommitImpl tableCommit = table.newCommit(commitUser)) { + tableCommit.abort(messages); + } + } + + public List prepareCommit(boolean waitCompaction, long checkpointId) + throws IOException { + List tempList = new ArrayList<>(); + try { + while (!result.isEmpty()) { + Future future = result.peek(); + if (!future.isDone() && !waitCompaction) { + break; + } + result.poll(); + tempList.add(future.get()); + } + return tempList.stream() + .map(s -> new Committable(checkpointId, Committable.Kind.FILE, s)) + .collect(Collectors.toList()); + } catch (InterruptedException e) { + throw new RuntimeException("Interrupted while waiting tasks done.", e); + } catch (Exception e) { + throw new RuntimeException("Encountered an error while do compaction", e); + } + } + + public Iterable> result() { + return result; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java new file mode 100644 index 000000000000..762e9a6df13f --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperator.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.compact.UnawareBucketCompactor; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.utils.ExceptionUtils; +import org.apache.paimon.utils.ExecutorThreadFactory; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +/** + * Operator to execute {@link AppendOnlyCompactionTask} passed for support compacting multi unaware + * bucket tables in combined mode. + */ +public class AppendOnlyMultiTableCompactionWorkerOperator + extends PrepareCommitOperator { + + private static final Logger LOG = + LoggerFactory.getLogger(AppendOnlyMultiTableCompactionWorkerOperator.class); + + private final String commitUser; + private final Catalog.Loader catalogLoader; + + // support multi table compaction + private transient Map compactorContainer; + + private transient ExecutorService lazyCompactExecutor; + + private transient Catalog catalog; + + public AppendOnlyMultiTableCompactionWorkerOperator( + Catalog.Loader catalogLoader, String commitUser, Options options) { + super(options); + this.commitUser = commitUser; + this.catalogLoader = catalogLoader; + } + + @Override + public void open() throws Exception { + LOG.debug("Opened a append-only multi table compaction worker."); + compactorContainer = new HashMap<>(); + catalog = catalogLoader.load(); + } + + @Override + protected List prepareCommit(boolean waitCompaction, long checkpointId) + throws IOException { + List result = new ArrayList<>(); + for (Map.Entry compactorWithTable : + compactorContainer.entrySet()) { + Identifier tableId = compactorWithTable.getKey(); + UnawareBucketCompactor compactor = compactorWithTable.getValue(); + + for (Committable committable : compactor.prepareCommit(waitCompaction, checkpointId)) { + result.add( + new MultiTableCommittable( + tableId.getDatabaseName(), + tableId.getObjectName(), + committable.checkpointId(), + committable.kind(), + committable.wrappedCommittable())); + } + } + + return result; + } + + @Override + public void processElement(StreamRecord element) + throws Exception { + Identifier identifier = element.getValue().tableIdentifier(); + compactorContainer + .computeIfAbsent(identifier, this::compactor) + .processElement(element.getValue()); + } + + private UnawareBucketCompactor compactor(Identifier tableId) { + try { + return new UnawareBucketCompactor( + (FileStoreTable) catalog.getTable(tableId), commitUser, this::workerExecutor); + } catch (Catalog.TableNotExistException e) { + throw new RuntimeException(e); + } + } + + private ExecutorService workerExecutor() { + if (lazyCompactExecutor == null) { + lazyCompactExecutor = + Executors.newSingleThreadScheduledExecutor( + new ExecutorThreadFactory( + Thread.currentThread().getName() + + "-append-only-compact-worker")); + } + return lazyCompactExecutor; + } + + @Override + public void close() throws Exception { + List exceptions = new ArrayList<>(); + + if (lazyCompactExecutor != null) { + try { + lazyCompactExecutor.shutdownNow(); + if (!lazyCompactExecutor.awaitTermination(120, TimeUnit.SECONDS)) { + LOG.warn( + "Executors shutdown timeout, there may be some files that aren't deleted correctly"); + } + } catch (Exception e) { + LOG.warn( + String.format( + "Fail to stop the compaction executor. Reason: %s, please check the thread stack of append-only-compact-worker.", + e.getMessage()), + e); + exceptions.add(e); + } + + for (Map.Entry compactorEntry : + compactorContainer.entrySet()) { + try { + UnawareBucketCompactor compactor = compactorEntry.getValue(); + compactor.close(); + } catch (Exception e) { + Identifier id = compactorEntry.getKey(); + LOG.warn( + String.format( + "Fail to roll back the compactor of %s. Reason: %s", + id, e.getMessage()), + e); + exceptions.add(e); + } + } + } + + ExceptionUtils.throwMultiException(exceptions); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java similarity index 57% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java index 0860ccb3ac8b..8b41d77598fc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperator.java @@ -20,12 +20,11 @@ import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.flink.compact.UnawareBucketCompactor; import org.apache.paimon.flink.source.BucketUnawareCompactSource; -import org.apache.paimon.operation.AppendOnlyFileStoreWrite; import org.apache.paimon.options.Options; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.sink.CommitMessage; -import org.apache.paimon.table.sink.TableCommitImpl; import org.apache.paimon.utils.ExecutorThreadFactory; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -33,34 +32,30 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; -import java.util.Queue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; /** * Operator to execute {@link AppendOnlyCompactionTask} passed from {@link - * BucketUnawareCompactSource}. + * BucketUnawareCompactSource} for compacting single unaware bucket tables in divided mode. */ -public class AppendOnlyTableCompactionWorkerOperator +public class AppendOnlySingleTableCompactionWorkerOperator extends PrepareCommitOperator { private static final Logger LOG = - LoggerFactory.getLogger(AppendOnlyTableCompactionWorkerOperator.class); + LoggerFactory.getLogger(AppendOnlySingleTableCompactionWorkerOperator.class); private final FileStoreTable table; private final String commitUser; - private transient AppendOnlyFileStoreWrite write; + private transient UnawareBucketCompactor unawareBucketCompactor; + private transient ExecutorService lazyCompactExecutor; - private transient Queue> result; - public AppendOnlyTableCompactionWorkerOperator(FileStoreTable table, String commitUser) { + public AppendOnlySingleTableCompactionWorkerOperator(FileStoreTable table, String commitUser) { super(Options.fromMap(table.options())); this.table = table; this.commitUser = commitUser; @@ -68,44 +63,25 @@ public AppendOnlyTableCompactionWorkerOperator(FileStoreTable table, String comm @VisibleForTesting Iterable> result() { - return result; + return unawareBucketCompactor.result(); } @Override public void open() throws Exception { LOG.debug("Opened a append-only table compaction worker."); - this.write = (AppendOnlyFileStoreWrite) table.store().newWrite(commitUser); - this.result = new LinkedList<>(); + this.unawareBucketCompactor = + new UnawareBucketCompactor(table, commitUser, this::workerExecutor); } @Override protected List prepareCommit(boolean waitCompaction, long checkpointId) throws IOException { - List tempList = new ArrayList<>(); - try { - while (!result.isEmpty()) { - Future future = result.peek(); - if (!future.isDone() && !waitCompaction) { - break; - } - - result.poll(); - tempList.add(future.get()); - } - return tempList.stream() - .map(s -> new Committable(checkpointId, Committable.Kind.FILE, s)) - .collect(Collectors.toList()); - } catch (InterruptedException e) { - throw new RuntimeException("Interrupted while waiting tasks done.", e); - } catch (Exception e) { - throw new RuntimeException("Encountered an error while do compaction", e); - } + return this.unawareBucketCompactor.prepareCommit(waitCompaction, checkpointId); } @Override public void processElement(StreamRecord element) throws Exception { - AppendOnlyCompactionTask task = element.getValue(); - result.add(workerExecutor().submit(() -> task.doCompact(write))); + this.unawareBucketCompactor.processElement(element.getValue()); } private ExecutorService workerExecutor() { @@ -121,11 +97,6 @@ private ExecutorService workerExecutor() { @Override public void close() throws Exception { - shutdown(); - } - - @VisibleForTesting - void shutdown() throws Exception { if (lazyCompactExecutor != null) { // ignore runnable tasks in queue lazyCompactExecutor.shutdownNow(); @@ -133,25 +104,7 @@ void shutdown() throws Exception { LOG.warn( "Executors shutdown timeout, there may be some files aren't deleted correctly"); } - List messages = new ArrayList<>(); - for (Future resultFuture : result) { - if (!resultFuture.isDone()) { - // the later tasks should be stopped running - break; - } - try { - messages.add(resultFuture.get()); - } catch (Exception exception) { - // exception should already be handled - } - } - if (messages.isEmpty()) { - return; - } - - try (TableCommitImpl tableCommit = table.newCommit(commitUser)) { - tableCommit.abort(messages); - } + this.unawareBucketCompactor.close(); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java similarity index 69% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java index cfc1ec1cd286..ba7c1bb4413e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTablesCompactorSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CombinedTableCompactorSink.java @@ -19,6 +19,7 @@ package org.apache.paimon.flink.sink; import org.apache.paimon.CoreOptions; +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.VersionedSerializerWrapper; import org.apache.paimon.manifest.WrappedManifestCommittable; @@ -48,7 +49,7 @@ import static org.apache.paimon.flink.utils.ManagedMemoryUtils.declareManagedMemory; /** A sink for processing multi-tables in dedicated compaction job. */ -public class MultiTablesCompactorSink implements Serializable { +public class CombinedTableCompactorSink implements Serializable { private static final long serialVersionUID = 1L; private static final String WRITER_NAME = "Writer"; @@ -59,54 +60,75 @@ public class MultiTablesCompactorSink implements Serializable { private final Options options; - public MultiTablesCompactorSink(Catalog.Loader catalogLoader, Options options) { + public CombinedTableCompactorSink(Catalog.Loader catalogLoader, Options options) { this.catalogLoader = catalogLoader; this.ignorePreviousFiles = false; this.options = options; } - public DataStreamSink sinkFrom(DataStream input) { + public DataStreamSink sinkFrom( + DataStream awareBucketTableSource, + DataStream unawareBucketTableSource) { // This commitUser is valid only for new jobs. // After the job starts, this commitUser will be recorded into the states of write and // commit operators. // When the job restarts, commitUser will be recovered from states and this value is // ignored. String initialCommitUser = UUID.randomUUID().toString(); - return sinkFrom(input, initialCommitUser); + return sinkFrom(awareBucketTableSource, unawareBucketTableSource, initialCommitUser); } - public DataStreamSink sinkFrom(DataStream input, String initialCommitUser) { + public DataStreamSink sinkFrom( + DataStream awareBucketTableSource, + DataStream unawareBucketTableSource, + String initialCommitUser) { // do the actually writing action, no snapshot generated in this stage - SingleOutputStreamOperator written = - doWrite(input, initialCommitUser, input.getParallelism()); + DataStream written = + doWrite(awareBucketTableSource, unawareBucketTableSource, initialCommitUser); // commit the committable to generate a new snapshot return doCommit(written, initialCommitUser); } - public SingleOutputStreamOperator doWrite( - DataStream input, String commitUser, Integer parallelism) { - StreamExecutionEnvironment env = input.getExecutionEnvironment(); + public DataStream doWrite( + DataStream awareBucketTableSource, + DataStream unawareBucketTableSource, + String commitUser) { + StreamExecutionEnvironment env = awareBucketTableSource.getExecutionEnvironment(); boolean isStreaming = env.getConfiguration().get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.STREAMING; - SingleOutputStreamOperator written = - input.transform( - WRITER_NAME, + SingleOutputStreamOperator multiBucketTableRewriter = + awareBucketTableSource + .transform( + String.format("%s-%s", "Multi-Bucket-Table", WRITER_NAME), new MultiTableCommittableTypeInfo(), - createWriteOperator( + combinedMultiComacptionWriteOperator( env.getCheckpointConfig(), isStreaming, commitUser)) - .setParallelism(parallelism == null ? input.getParallelism() : parallelism); + .setParallelism(awareBucketTableSource.getParallelism()); + + SingleOutputStreamOperator unawareBucketTableRewriter = + unawareBucketTableSource + .transform( + String.format("%s-%s", "Unaware-Bucket-Table", WRITER_NAME), + new MultiTableCommittableTypeInfo(), + new AppendOnlyMultiTableCompactionWorkerOperator( + catalogLoader, commitUser, options)) + .setParallelism(unawareBucketTableSource.getParallelism()); if (!isStreaming) { - assertBatchConfiguration(env, written.getParallelism()); + assertBatchConfiguration(env, multiBucketTableRewriter.getParallelism()); + assertBatchConfiguration(env, unawareBucketTableRewriter.getParallelism()); } if (options.get(SINK_USE_MANAGED_MEMORY)) { - declareManagedMemory(written, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY)); + declareManagedMemory( + multiBucketTableRewriter, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY)); + declareManagedMemory( + unawareBucketTableRewriter, options.get(SINK_MANAGED_WRITER_BUFFER_MEMORY)); } - return written; + return multiBucketTableRewriter.union(unawareBucketTableRewriter); } protected DataStreamSink doCommit( @@ -138,8 +160,9 @@ protected DataStreamSink doCommit( } // TODO:refactor FlinkSink to adopt this sink - protected OneInputStreamOperator createWriteOperator( - CheckpointConfig checkpointConfig, boolean isStreaming, String commitUser) { + protected OneInputStreamOperator + combinedMultiComacptionWriteOperator( + CheckpointConfig checkpointConfig, boolean isStreaming, String commitUser) { return new MultiTablesStoreCompactOperator( catalogLoader, commitUser, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java new file mode 100644 index 000000000000..8c2ec7b7a0e2 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/MultiTableCompactionTaskTypeInfo.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.flink.VersionedSerializerWrapper; +import org.apache.paimon.table.sink.MultiTableCompactionTaskSerializer; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.io.SimpleVersionedSerializerTypeSerializerProxy; + +/** Type information of {@link MultiTableAppendOnlyCompactionTask}. */ +public class MultiTableCompactionTaskTypeInfo + extends TypeInformation { + @Override + public boolean isBasicType() { + return false; + } + + @Override + public boolean isTupleType() { + return false; + } + + @Override + public int getArity() { + return 1; + } + + @Override + public int getTotalFields() { + return 1; + } + + @Override + public Class getTypeClass() { + return MultiTableAppendOnlyCompactionTask.class; + } + + @Override + public boolean isKeyType() { + return false; + } + + @Override + public TypeSerializer createSerializer( + ExecutionConfig executionConfig) { + return new SimpleVersionedSerializerTypeSerializerProxy( + () -> new VersionedSerializerWrapper<>(new MultiTableCompactionTaskSerializer())) { + @Override + public MultiTableAppendOnlyCompactionTask copy( + MultiTableAppendOnlyCompactionTask from) { + return from; + } + + @Override + public MultiTableAppendOnlyCompactionTask copy( + MultiTableAppendOnlyCompactionTask from, + MultiTableAppendOnlyCompactionTask reuse) { + return from; + } + }; + } + + @Override + public String toString() { + return "MultiTableAppendOnlyCompactionTask"; + } + + @Override + public boolean equals(Object o) { + return o instanceof MultiTableAppendOnlyCompactionTask; + } + + @Override + public int hashCode() { + return 0; + } + + @Override + public boolean canEqual(Object o) { + return o instanceof MultiTableAppendOnlyCompactionTask; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java index 6785c53fb1a5..8891eba6315a 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java @@ -44,7 +44,7 @@ public static DataStreamSink sink( @Override protected OneInputStreamOperator createWriteOperator( StoreSinkWrite.Provider writeProvider, String commitUser) { - return new AppendOnlyTableCompactionWorkerOperator(table, commitUser); + return new AppendOnlySingleTableCompactionWorkerOperator(table, commitUser); } @Override diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilder.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java similarity index 61% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilder.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java index b631d3b7a037..6ee9d849af89 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilder.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CombinedTableCompactorSourceBuilder.java @@ -18,10 +18,13 @@ package org.apache.paimon.flink.source; +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.flink.LogicalTypeConversion; -import org.apache.paimon.flink.source.operator.MultiTablesBatchCompactorSourceFunction; -import org.apache.paimon.flink.source.operator.MultiTablesStreamingCompactorSourceFunction; +import org.apache.paimon.flink.source.operator.CombinedAwareBatchSourceFunction; +import org.apache.paimon.flink.source.operator.CombinedAwareStreamingSourceFunction; +import org.apache.paimon.flink.source.operator.CombinedUnawareBatchSourceFunction; +import org.apache.paimon.flink.source.operator.CombinedUnawareStreamingSourceFunction; import org.apache.paimon.table.system.BucketsTable; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Preconditions; @@ -35,9 +38,9 @@ /** * source builder to build a Flink compactor source for multi-tables. This is for dedicated - * compactor jobs. + * compactor jobs in combined mode. */ -public class MultiTablesCompactorSourceBuilder { +public class CombinedTableCompactorSourceBuilder { private final Catalog.Loader catalogLoader; private final Pattern includingPattern; private final Pattern excludingPattern; @@ -47,7 +50,7 @@ public class MultiTablesCompactorSourceBuilder { private boolean isContinuous = false; private StreamExecutionEnvironment env; - public MultiTablesCompactorSourceBuilder( + public CombinedTableCompactorSourceBuilder( Catalog.Loader catalogLoader, Pattern databasePattern, Pattern includingPattern, @@ -60,23 +63,23 @@ public MultiTablesCompactorSourceBuilder( this.monitorInterval = monitorInterval; } - public MultiTablesCompactorSourceBuilder withContinuousMode(boolean isContinuous) { + public CombinedTableCompactorSourceBuilder withContinuousMode(boolean isContinuous) { this.isContinuous = isContinuous; return this; } - public MultiTablesCompactorSourceBuilder withEnv(StreamExecutionEnvironment env) { + public CombinedTableCompactorSourceBuilder withEnv(StreamExecutionEnvironment env) { this.env = env; return this; } - public DataStream build() { + public DataStream buildAwareBucketTableSource() { Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null."); RowType produceType = BucketsTable.getRowType(); if (isContinuous) { - return MultiTablesStreamingCompactorSourceFunction.buildSource( + return CombinedAwareStreamingSourceFunction.buildSource( env, - "MultiTables-StreamingCompactorSource", + "Combine-MultiBucketTables--StreamingCompactorSource", InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)), catalogLoader, includingPattern, @@ -84,15 +87,36 @@ public DataStream build() { databasePattern, monitorInterval); } else { - return MultiTablesBatchCompactorSourceFunction.buildSource( + return CombinedAwareBatchSourceFunction.buildSource( env, - "MultiTables-BatchCompactorSource", + "Combine-MultiBucketTables-BatchCompactorSource", InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(produceType)), catalogLoader, includingPattern, excludingPattern, + databasePattern); + } + } + + public DataStream buildForUnawareBucketsTableSource() { + Preconditions.checkArgument(env != null, "StreamExecutionEnvironment should not be null."); + if (isContinuous) { + return CombinedUnawareStreamingSourceFunction.buildSource( + env, + "Combined-UnawareBucketTables-StreamingCompactorSource", + catalogLoader, + includingPattern, + excludingPattern, databasePattern, monitorInterval); + } else { + return CombinedUnawareBatchSourceFunction.buildSource( + env, + "Combined-UnawareBucketTables-BatchCompactorSource", + catalogLoader, + includingPattern, + excludingPattern, + databasePattern); } } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesBatchCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java similarity index 54% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesBatchCompactorSourceFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java index 3da9fb2e6a96..2d6423fced23 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesBatchCompactorSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareBatchSourceFunction.java @@ -19,18 +19,18 @@ package org.apache.paimon.flink.source.operator; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.compact.MultiAwareBucketTableScan; +import org.apache.paimon.flink.compact.MultiTableScanBase; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.table.source.EndOfScanException; import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.source.StreamTableScan; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -39,65 +39,54 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.regex.Pattern; -import java.util.stream.Collectors; -/** It is responsible for monitoring compactor source in batch mode. */ -public class MultiTablesBatchCompactorSourceFunction extends MultiTablesCompactorSourceFunction { +import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.FINISHED; +import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.IS_EMPTY; - private static final Logger LOG = - LoggerFactory.getLogger(MultiTablesBatchCompactorSourceFunction.class); +/** It is responsible for monitoring compactor source of aware bucket table in batch mode. */ +public class CombinedAwareBatchSourceFunction + extends CombinedCompactorSourceFunction> { - public MultiTablesBatchCompactorSourceFunction( + private static final Logger LOGGER = + LoggerFactory.getLogger(CombinedAwareBatchSourceFunction.class); + + private MultiTableScanBase> tableScan; + + public CombinedAwareBatchSourceFunction( Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, - Pattern databasePattern, - long monitorInterval) { - super( - catalogLoader, - includingPattern, - excludingPattern, - databasePattern, - false, - monitorInterval); + Pattern databasePattern) { + super(catalogLoader, includingPattern, excludingPattern, databasePattern, false); } @Override - public void run(SourceContext> ctx) throws Exception { - this.ctx = ctx; - if (isRunning) { - boolean isEmpty; - synchronized (ctx.getCheckpointLock()) { - if (!isRunning) { - return; - } - try { - // batch mode do not need check for new tables - List> splits = new ArrayList<>(); - for (Map.Entry entry : scansMap.entrySet()) { - Identifier identifier = entry.getKey(); - StreamTableScan scan = entry.getValue(); - splits.addAll( - scan.plan().splits().stream() - .map(split -> new Tuple2<>(split, identifier.getFullName())) - .collect(Collectors.toList())); - } - - isEmpty = splits.isEmpty(); - splits.forEach(ctx::collect); + public void open(Configuration parameters) throws Exception { + super.open(parameters); + tableScan = + new MultiAwareBucketTableScan( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + isRunning); + } - } catch (EndOfScanException esf) { - LOG.info("Catching EndOfStreamException, the stream is finished."); - return; - } + @Override + void scanTable() throws Exception { + if (isRunning.get()) { + MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(ctx); + if (scanResult == FINISHED) { + return; } - if (isEmpty) { - throw new Exception( - "No splits were collected. Please ensure there are tables detected after pattern matching"); + if (scanResult == IS_EMPTY) { + // Currently, in the combined mode, there are two scan tasks for the table of two + // different bucket type (multi bucket & unaware bucket) running concurrently. + // There will be a situation that there is only one task compaction , therefore this + // should not be thrown exception here. + LOGGER.info("No file were collected for the table of aware-bucket"); } } } @@ -109,15 +98,10 @@ public static DataStream buildSource( Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, - Pattern databasePattern, - long monitorInterval) { - MultiTablesBatchCompactorSourceFunction function = - new MultiTablesBatchCompactorSourceFunction( - catalogLoader, - includingPattern, - excludingPattern, - databasePattern, - monitorInterval); + Pattern databasePattern) { + CombinedAwareBatchSourceFunction function = + new CombinedAwareBatchSourceFunction( + catalogLoader, includingPattern, excludingPattern, databasePattern); StreamSource, ?> sourceOperator = new StreamSource<>(function); TupleTypeInfo> tupleTypeInfo = new TupleTypeInfo<>( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesStreamingCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java similarity index 59% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesStreamingCompactorSourceFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java index 61e085f328a9..4df38ef997ed 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesStreamingCompactorSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedAwareStreamingSourceFunction.java @@ -19,88 +19,68 @@ package org.apache.paimon.flink.source.operator; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.flink.compact.MultiAwareBucketTableScan; +import org.apache.paimon.flink.compact.MultiTableScanBase; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.table.source.DataSplit; -import org.apache.paimon.table.source.EndOfScanException; import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.source.StreamTableScan; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.table.data.RowData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; import java.util.regex.Pattern; -import java.util.stream.Collectors; -/** It is responsible for monitoring compactor source in streaming mode. */ -public class MultiTablesStreamingCompactorSourceFunction - extends MultiTablesCompactorSourceFunction { +import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.FINISHED; +import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.IS_EMPTY; - private static final Logger LOG = - LoggerFactory.getLogger(MultiTablesStreamingCompactorSourceFunction.class); +/** It is responsible for monitoring compactor source of multi bucket table in stream mode. */ +public class CombinedAwareStreamingSourceFunction + extends CombinedCompactorSourceFunction> { - public MultiTablesStreamingCompactorSourceFunction( + private final long monitorInterval; + private transient MultiTableScanBase> tableScan; + + public CombinedAwareStreamingSourceFunction( Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, long monitorInterval) { - super( - catalogLoader, - includingPattern, - excludingPattern, - databasePattern, - true, - monitorInterval); + super(catalogLoader, includingPattern, excludingPattern, databasePattern, true); + this.monitorInterval = monitorInterval; } - @SuppressWarnings("BusyWait") @Override - public void run(SourceContext> ctx) throws Exception { - this.ctx = ctx; - while (isRunning) { - boolean isEmpty; - synchronized (ctx.getCheckpointLock()) { - if (!isRunning) { - return; - } - try { - // check for new tables - updateTableMap(); - - List> splits = new ArrayList<>(); - for (Map.Entry entry : scansMap.entrySet()) { - Identifier identifier = entry.getKey(); - StreamTableScan scan = entry.getValue(); - splits.addAll( - scan.plan().splits().stream() - .map(split -> new Tuple2<>(split, identifier.getFullName())) - .collect(Collectors.toList())); - } - - isEmpty = splits.isEmpty(); - splits.forEach(ctx::collect); + public void open(Configuration parameters) throws Exception { + super.open(parameters); + tableScan = + new MultiAwareBucketTableScan( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + isRunning); + } - } catch (EndOfScanException esf) { - LOG.info("Catching EndOfStreamException, the stream is finished."); - return; - } + @SuppressWarnings("BusyWait") + @Override + void scanTable() throws Exception { + while (isRunning.get()) { + MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(ctx); + if (scanResult == FINISHED) { + return; } - - if (isEmpty) { + if (scanResult == IS_EMPTY) { Thread.sleep(monitorInterval); } } @@ -116,8 +96,8 @@ public static DataStream buildSource( Pattern databasePattern, long monitorInterval) { - MultiTablesStreamingCompactorSourceFunction function = - new MultiTablesStreamingCompactorSourceFunction( + CombinedAwareStreamingSourceFunction function = + new CombinedAwareStreamingSourceFunction( catalogLoader, includingPattern, excludingPattern, diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java new file mode 100644 index 000000000000..17ade3902e03 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.table.source.Split; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; + +/** + * This is the single (non-parallel) monitoring task, it is responsible for: + * + *
    + *
  1. Monitoring snapshots of the Paimon table. + *
  2. Creating the splits {@link Split} or compaction task {@link AppendOnlyCompactionTask} + * corresponding to the incremental files + *
  3. Assigning them to downstream tasks for further processing. + *
+ * + *

The splits to be read are forwarded to the downstream {@link MultiTablesReadOperator} which + * can have parallelism greater than one. + * + *

Currently, only dedicated compaction job for multi-tables rely on this monitor. This is the + * single (non-parallel) monitoring task, it is responsible for the new Paimon table. + */ +public abstract class CombinedCompactorSourceFunction extends RichSourceFunction { + + private static final long serialVersionUID = 2L; + + protected final Catalog.Loader catalogLoader; + protected final Pattern includingPattern; + protected final Pattern excludingPattern; + protected final Pattern databasePattern; + protected final boolean isStreaming; + + protected transient AtomicBoolean isRunning; + protected transient SourceContext ctx; + + public CombinedCompactorSourceFunction( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + boolean isStreaming) { + this.catalogLoader = catalogLoader; + this.includingPattern = includingPattern; + this.excludingPattern = excludingPattern; + this.databasePattern = databasePattern; + this.isStreaming = isStreaming; + } + + @Override + public void open(Configuration parameters) throws Exception { + isRunning = new AtomicBoolean(true); + } + + @Override + public void run(SourceContext sourceContext) throws Exception { + this.ctx = sourceContext; + scanTable(); + } + + @Override + public void cancel() { + // this is to cover the case where cancel() is called before the run() + if (ctx != null) { + synchronized (ctx.getCheckpointLock()) { + isRunning.set(false); + } + } else { + isRunning.set(false); + } + } + + abstract void scanTable() throws Exception; +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java new file mode 100644 index 000000000000..748ccebbe628 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareBatchSourceFunction.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.compact.MultiTableScanBase; +import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan; +import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.regex.Pattern; + +import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.FINISHED; +import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.IS_EMPTY; + +/** + * It is responsible for the batch compactor source of the table with unaware bucket in combined + * mode. + */ +public class CombinedUnawareBatchSourceFunction + extends CombinedCompactorSourceFunction { + + private static final Logger LOGGER = + LoggerFactory.getLogger(CombinedUnawareBatchSourceFunction.class); + private transient MultiTableScanBase tableScan; + + public CombinedUnawareBatchSourceFunction( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern) { + super(catalogLoader, includingPattern, excludingPattern, databasePattern, false); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + tableScan = + new MultiUnawareBucketTableScan( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + isRunning); + } + + @Override + void scanTable() throws Exception { + if (isRunning.get()) { + MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(ctx); + if (scanResult == FINISHED) { + return; + } + if (scanResult == IS_EMPTY) { + // Currently, in the combined mode, there are two scan tasks for the table of two + // different bucket type (multi bucket & unaware bucket) running concurrently. + // There will be a situation that there is only one task compaction , therefore this + // should not be thrown exception here. + LOGGER.info("No file were collected for the table of unaware-bucket"); + } + } + } + + public static DataStream buildSource( + StreamExecutionEnvironment env, + String name, + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern) { + CombinedUnawareBatchSourceFunction function = + new CombinedUnawareBatchSourceFunction( + catalogLoader, includingPattern, excludingPattern, databasePattern); + StreamSource + sourceOperator = new StreamSource<>(function); + MultiTableCompactionTaskTypeInfo compactionTaskTypeInfo = + new MultiTableCompactionTaskTypeInfo(); + + SingleOutputStreamOperator source = + new DataStreamSource<>( + env, + compactionTaskTypeInfo, + sourceOperator, + false, + name, + Boundedness.BOUNDED) + .forceNonParallel(); + + PartitionTransformation transformation = + new PartitionTransformation<>( + source.getTransformation(), new RebalancePartitioner<>()); + + return new DataStream<>(env, transformation); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java new file mode 100644 index 000000000000..3323af60198a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedUnawareStreamingSourceFunction.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.compact.MultiTableScanBase; +import org.apache.paimon.flink.compact.MultiUnawareBucketTableScan; +import org.apache.paimon.flink.sink.MultiTableCompactionTaskTypeInfo; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.StreamSource; + +import java.util.regex.Pattern; + +import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.FINISHED; +import static org.apache.paimon.flink.compact.MultiTableScanBase.ScanResult.IS_EMPTY; + +/** + * It is responsible for monitoring compactor source in stream mode for the table of unaware bucket. + */ +public class CombinedUnawareStreamingSourceFunction + extends CombinedCompactorSourceFunction { + + private final long monitorInterval; + private MultiTableScanBase tableScan; + + public CombinedUnawareStreamingSourceFunction( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + super(catalogLoader, includingPattern, excludingPattern, databasePattern, true); + this.monitorInterval = monitorInterval; + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + tableScan = + new MultiUnawareBucketTableScan( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + isRunning); + } + + @SuppressWarnings("BusyWait") + @Override + void scanTable() throws Exception { + while (isRunning.get()) { + MultiTableScanBase.ScanResult scanResult = tableScan.scanTable(ctx); + if (scanResult == FINISHED) { + return; + } + if (scanResult == IS_EMPTY) { + Thread.sleep(monitorInterval); + } + } + } + + public static DataStream buildSource( + StreamExecutionEnvironment env, + String name, + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + + CombinedUnawareStreamingSourceFunction function = + new CombinedUnawareStreamingSourceFunction( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + monitorInterval); + StreamSource + sourceOperator = new StreamSource<>(function); + boolean isParallel = false; + MultiTableCompactionTaskTypeInfo compactionTaskTypeInfo = + new MultiTableCompactionTaskTypeInfo(); + return new DataStreamSource<>( + env, + compactionTaskTypeInfo, + sourceOperator, + isParallel, + name, + Boundedness.CONTINUOUS_UNBOUNDED) + .forceNonParallel() + .rebalance(); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesCompactorSourceFunction.java deleted file mode 100644 index 2e2e90ca9763..000000000000 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesCompactorSourceFunction.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.paimon.flink.source.operator; - -import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.table.BucketMode; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.Table; -import org.apache.paimon.table.source.Split; -import org.apache.paimon.table.source.StreamTableScan; -import org.apache.paimon.table.system.BucketsTable; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; - -import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.compactOptions; -import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompactTable; - -/** - * This is the single (non-parallel) monitoring task, it is responsible for: - * - *

    - *
  1. Monitoring snapshots of the Paimon table. - *
  2. Creating the Tuple2<{@link Split}, String> splits corresponding to the incremental files - *
  3. Assigning them to downstream tasks for further processing. - *
- * - *

The splits to be read are forwarded to the downstream {@link MultiTablesReadOperator} which - * can have parallelism greater than one. - * - *

Currently, only dedicated compaction job for multi-tables rely on this monitor. - */ -public abstract class MultiTablesCompactorSourceFunction - extends RichSourceFunction> { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = - LoggerFactory.getLogger(MultiTablesCompactorSourceFunction.class); - - protected final Catalog.Loader catalogLoader; - protected final Pattern includingPattern; - protected final Pattern excludingPattern; - protected final Pattern databasePattern; - protected final boolean isStreaming; - protected final long monitorInterval; - - protected transient Catalog catalog; - protected transient Map tablesMap; - protected transient Map scansMap; - - public MultiTablesCompactorSourceFunction( - Catalog.Loader catalogLoader, - Pattern includingPattern, - Pattern excludingPattern, - Pattern databasePattern, - boolean isStreaming, - long monitorInterval) { - this.catalogLoader = catalogLoader; - this.includingPattern = includingPattern; - this.excludingPattern = excludingPattern; - this.databasePattern = databasePattern; - this.isStreaming = isStreaming; - this.monitorInterval = monitorInterval; - } - - protected volatile boolean isRunning = true; - - protected transient SourceContext> ctx; - - @Override - public void open(Configuration parameters) throws Exception { - tablesMap = new HashMap<>(); - scansMap = new HashMap<>(); - catalog = catalogLoader.load(); - - updateTableMap(); - } - - @Override - public void cancel() { - // this is to cover the case where cancel() is called before the run() - if (ctx != null) { - synchronized (ctx.getCheckpointLock()) { - isRunning = false; - } - } else { - isRunning = false; - } - } - - protected void updateTableMap() - throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { - List databases = catalog.listDatabases(); - - for (String databaseName : databases) { - if (databasePattern.matcher(databaseName).matches()) { - List tables = catalog.listTables(databaseName); - for (String tableName : tables) { - Identifier identifier = Identifier.create(databaseName, tableName); - if (shouldCompactTable(identifier, includingPattern, excludingPattern) - && (!tablesMap.containsKey(identifier))) { - Table table = catalog.getTable(identifier); - if (!(table instanceof FileStoreTable)) { - LOG.error( - String.format( - "Only FileStoreTable supports compact action. The table type is '%s'.", - table.getClass().getName())); - continue; - } - FileStoreTable fileStoreTable = (FileStoreTable) table; - if (fileStoreTable.bucketMode() == BucketMode.BUCKET_UNAWARE) { - LOG.info( - String.format( - "the bucket mode of %s is unware. ", - identifier.getFullName()) - + "currently, the table with unware bucket mode is not support in combined mode."); - continue; - } - BucketsTable bucketsTable = - new BucketsTable( - fileStoreTable, - isStreaming, - identifier.getDatabaseName()) - .copy(compactOptions(isStreaming)); - tablesMap.put(identifier, bucketsTable); - scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan()); - } - } - } - } - } -} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java index 8a4232255320..a5a17cd4405c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java @@ -44,9 +44,9 @@ /** * The operator that reads the Tuple2<{@link Split}, String> received from the preceding {@link - * MultiTablesBatchCompactorSourceFunction} or {@link MultiTablesStreamingCompactorSourceFunction}. - * Contrary to the {@link MultiTablesCompactorSourceFunction} which has a parallelism of 1, this - * operator can have DOP > 1. + * CombinedAwareBatchSourceFunction} or {@link CombinedAwareStreamingSourceFunction}. Contrary to + * the {@link CombinedCompactorSourceFunction} which has a parallelism of 1, this operator can have + * DOP > 1. */ public class MultiTablesReadOperator extends AbstractStreamOperator implements OneInputStreamOperator, RowData> { diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java index 4fa1b3c53bd1..80b8e379840a 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactDatabaseActionITCase.java @@ -37,6 +37,8 @@ import org.apache.paimon.types.RowType; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + import org.apache.flink.core.execution.JobClient; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.junit.jupiter.api.Test; @@ -62,7 +64,7 @@ public class CompactDatabaseActionITCase extends CompactActionITCaseBase { private static final String[] DATABASE_NAMES = new String[] {"db1", "db2"}; - private static final String[] TABLE_NAMES = new String[] {"t1", "t2"}; + private static final String[] TABLE_NAMES = new String[] {"t1", "t2", "t3_unaware_bucket"}; private static final String[] New_DATABASE_NAMES = new String[] {"db3", "db4"}; private static final String[] New_TABLE_NAMES = new String[] {"t3", "t4"}; private static final RowType ROW_TYPE = @@ -88,25 +90,123 @@ private FileStoreTable createTable( return (FileStoreTable) catalog.getTable(identifier); } + @ParameterizedTest(name = "mode = {0}") + @ValueSource(strings = {"combined", "divided"}) + @Timeout(6000) + public void testStreamCompactForUnawareTable(String mode) throws Exception { + + // step0. create tables + Map tableToCompaction = new HashMap<>(); + for (String dbName : DATABASE_NAMES) { + for (String tableName : TABLE_NAMES) { + Map option = new HashMap<>(); + option.put(CoreOptions.WRITE_ONLY.key(), "true"); + List keys; + if (tableName.endsWith("unaware_bucket")) { + option.put("bucket", "-1"); + option.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); + option.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2"); + keys = Lists.newArrayList(); + FileStoreTable table = + createTable(dbName, tableName, Arrays.asList("dt", "hh"), keys, option); + tableToCompaction.put(Identifier.create(dbName, tableName), table); + } + } + } + + // step1. run streaming compaction task for tables + if (ThreadLocalRandom.current().nextBoolean()) { + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder().streamingMode().build(); + createAction( + CompactDatabaseAction.class, + "compact_database", + "--warehouse", + warehouse, + "--mode", + mode) + .withStreamExecutionEnvironment(env) + .build(); + env.executeAsync(); + } else { + callProcedure(String.format("CALL sys.compact_database('', '%s')", mode), true, false); + } + + // step3. write datas to table wait for compaction + for (Map.Entry identifierFileStoreTableEntry : + tableToCompaction.entrySet()) { + FileStoreTable table = identifierFileStoreTableEntry.getValue(); + SnapshotManager snapshotManager = table.snapshotManager(); + StreamWriteBuilder streamWriteBuilder = + table.newStreamWriteBuilder().withCommitUser(commitUser); + write = streamWriteBuilder.newWrite(); + commit = streamWriteBuilder.newCommit(); + + writeData( + rowData(1, 100, 15, BinaryString.fromString("20221208")), + rowData(1, 100, 16, BinaryString.fromString("20221208")), + rowData(1, 100, 15, BinaryString.fromString("20221209"))); + + writeData( + rowData(2, 100, 15, BinaryString.fromString("20221208")), + rowData(2, 100, 16, BinaryString.fromString("20221208")), + rowData(2, 100, 15, BinaryString.fromString("20221209"))); + + Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId()); + assertThat(snapshot.id()).isEqualTo(2); + assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND); + write.close(); + commit.close(); + } + + for (Map.Entry identifierFileStoreTableEntry : + tableToCompaction.entrySet()) { + FileStoreTable table = identifierFileStoreTableEntry.getValue(); + SnapshotManager snapshotManager = table.snapshotManager(); + while (true) { + if (snapshotManager.latestSnapshotId() == 2) { + Thread.sleep(1000); + } else { + validateResult( + table, + ROW_TYPE, + table.newReadBuilder().newStreamScan(), + Arrays.asList( + "+I[1, 100, 15, 20221208]", + "+I[1, 100, 15, 20221209]", + "+I[1, 100, 16, 20221208]", + "+I[2, 100, 15, 20221208]", + "+I[2, 100, 15, 20221209]", + "+I[2, 100, 16, 20221208]"), + 60_000); + break; + } + } + } + } + @ParameterizedTest(name = "mode = {0}") @ValueSource(strings = {"divided", "combined"}) @Timeout(60) public void testBatchCompact(String mode) throws Exception { - Map options = new HashMap<>(); - options.put(CoreOptions.WRITE_ONLY.key(), "true"); - options.put("bucket", "1"); - List tables = new ArrayList<>(); for (String dbName : DATABASE_NAMES) { for (String tableName : TABLE_NAMES) { + Map option = new HashMap<>(); + option.put(CoreOptions.WRITE_ONLY.key(), "true"); + List keys; + if (tableName.endsWith("unaware_bucket")) { + option.put("bucket", "-1"); + option.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2"); + option.put(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "2"); + keys = Lists.newArrayList(); + } else { + option.put("bucket", "1"); + keys = Arrays.asList("dt", "hh", "k"); + } FileStoreTable table = - createTable( - dbName, - tableName, - Arrays.asList("dt", "hh"), - Arrays.asList("dt", "hh", "k"), - options); + createTable(dbName, tableName, Arrays.asList("dt", "hh"), keys, option); tables.add(table); SnapshotManager snapshotManager = table.snapshotManager(); StreamWriteBuilder streamWriteBuilder = diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperatorTest.java new file mode 100644 index 000000000000..e549f04ec594 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyMultiTableCompactionWorkerOperatorTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.sink; + +import org.apache.paimon.append.MultiTableAppendOnlyCompactionTask; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.options.Options; +import org.apache.paimon.table.TableTestBase; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; + +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.paimon.flink.sink.AppendOnlySingleTableCompactionWorkerOperatorTest.packTask; + +/** test for {@link AppendOnlyMultiTableCompactionWorkerOperator}. */ +public class AppendOnlyMultiTableCompactionWorkerOperatorTest extends TableTestBase { + private final String[] tables = {"a", "b"}; + + @Test + public void testAsyncCompactionWorks() throws Exception { + + AppendOnlyMultiTableCompactionWorkerOperator workerOperator = + new AppendOnlyMultiTableCompactionWorkerOperator( + () -> catalog, "user", new Options()); + + List> records = new ArrayList<>(); + // create table and write + for (String table : tables) { + Identifier identifier = identifier(table); + createTable(identifier); + + // write 200 files + List commitMessages = writeData(getTable(identifier), 200, 20); + + packTask(commitMessages, 5).stream() + .map( + task -> + new StreamRecord<>( + new MultiTableAppendOnlyCompactionTask( + task.partition(), + task.compactBefore(), + identifier))) + .forEach(records::add); + } + + Assertions.assertThat(records.size()).isEqualTo(8); + workerOperator.open(); + + for (StreamRecord record : records) { + workerOperator.processElement(record); + } + + List committables = new ArrayList<>(); + Long timeStart = System.currentTimeMillis(); + long timeout = 60_000L; + + Assertions.assertThatCode( + () -> { + while (committables.size() != 8) { + committables.addAll( + workerOperator.prepareCommit(false, Long.MAX_VALUE)); + + Long now = System.currentTimeMillis(); + if (now - timeStart > timeout && committables.size() != 8) { + throw new RuntimeException( + "Timeout waiting for compaction, maybe some error happens in " + + AppendOnlySingleTableCompactionWorkerOperator + .class + .getName()); + } + Thread.sleep(1_000L); + } + }) + .doesNotThrowAnyException(); + committables.forEach( + a -> + Assertions.assertThat( + ((CommitMessageImpl) a.wrappedCommittable()) + .compactIncrement() + .compactAfter() + .size() + == 1) + .isTrue()); + Set table = + committables.stream() + .map(MultiTableCommittable::getTable) + .collect(Collectors.toSet()); + Assertions.assertThat(table).hasSameElementsAs(Arrays.asList(tables)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java similarity index 91% rename from paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java rename to paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java index ad4a2a313176..4f459948c98c 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/AppendOnlySingleTableCompactionWorkerOperatorTest.java @@ -42,14 +42,14 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -/** Tests for {@link AppendOnlyTableCompactionWorkerOperator}. */ -public class AppendOnlyTableCompactionWorkerOperatorTest extends TableTestBase { +/** Tests for {@link AppendOnlySingleTableCompactionWorkerOperator}. */ +public class AppendOnlySingleTableCompactionWorkerOperatorTest extends TableTestBase { @Test public void testAsyncCompactionWorks() throws Exception { createTableDefault(); - AppendOnlyTableCompactionWorkerOperator workerOperator = - new AppendOnlyTableCompactionWorkerOperator(getTableDefault(), "user"); + AppendOnlySingleTableCompactionWorkerOperator workerOperator = + new AppendOnlySingleTableCompactionWorkerOperator(getTableDefault(), "user"); // write 200 files List commitMessages = writeDataDefault(200, 20); @@ -79,7 +79,8 @@ public void testAsyncCompactionWorks() throws Exception { if (now - timeStart > timeout && committables.size() != 4) { throw new RuntimeException( "Timeout waiting for compaction, maybe some error happens in " - + AppendOnlyTableCompactionWorkerOperator.class + + AppendOnlySingleTableCompactionWorkerOperator + .class .getName()); } Thread.sleep(1_000L); @@ -100,8 +101,8 @@ public void testAsyncCompactionWorks() throws Exception { @Test public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { createTableDefault(); - AppendOnlyTableCompactionWorkerOperator workerOperator = - new AppendOnlyTableCompactionWorkerOperator(getTableDefault(), "user"); + AppendOnlySingleTableCompactionWorkerOperator workerOperator = + new AppendOnlySingleTableCompactionWorkerOperator(getTableDefault(), "user"); // write 200 files List commitMessages = writeDataDefault(200, 40); @@ -145,7 +146,7 @@ public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception { } // shut down worker operator - workerOperator.shutdown(); + workerOperator.close(); // wait the last runnable in thread pool to stop Thread.sleep(2_000); @@ -190,7 +191,8 @@ protected InternalRow dataDefault(int time, int size) { return GenericRow.of(RANDOM.nextInt(), RANDOM.nextLong(), randomString()); } - private List packTask(List messages, int fileSize) { + public static List packTask( + List messages, int fileSize) { List result = new ArrayList<>(); List metas = messages.stream() diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java index 5555856035f1..b53710dc69e7 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/MultiTablesCompactorSourceBuilderITCase.java @@ -65,7 +65,7 @@ import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow; import static org.assertj.core.api.Assertions.assertThat; -/** IT cases for {@link MultiTablesCompactorSourceBuilder}. */ +/** IT cases for {@link CombinedTableCompactorSourceBuilder}. */ public class MultiTablesCompactorSourceBuilderITCase extends AbstractTestBase implements Serializable { private String warehouse; @@ -161,7 +161,7 @@ public void testBatchRead(boolean defaultOptions) throws Exception { .parallelism(ThreadLocalRandom.current().nextInt(2) + 1) .build(); DataStream source = - new MultiTablesCompactorSourceBuilder( + new CombinedTableCompactorSourceBuilder( catalogLoader(), Pattern.compile("db1|db2"), Pattern.compile(".*"), @@ -169,7 +169,7 @@ public void testBatchRead(boolean defaultOptions) throws Exception { monitorInterval) .withContinuousMode(false) .withEnv(env) - .build(); + .buildAwareBucketTableSource(); CloseableIterator it = source.executeAndCollect(); List actual = new ArrayList<>(); while (it.hasNext()) { @@ -258,7 +258,7 @@ public void testStreamingRead(boolean defaultOptions) throws Exception { StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().streamingMode().build(); DataStream compactorSource = - new MultiTablesCompactorSourceBuilder( + new CombinedTableCompactorSourceBuilder( catalogLoader(), Pattern.compile(".*"), Pattern.compile(".*"), @@ -266,7 +266,7 @@ public void testStreamingRead(boolean defaultOptions) throws Exception { monitorInterval) .withContinuousMode(true) .withEnv(env) - .build(); + .buildAwareBucketTableSource(); CloseableIterator it = compactorSource.executeAndCollect(); List actual = new ArrayList<>(); @@ -428,7 +428,7 @@ public void testIncludeAndExcludeTableRead(boolean defaultOptions) throws Except StreamExecutionEnvironment env = streamExecutionEnvironmentBuilder().streamingMode().build(); DataStream compactorSource = - new MultiTablesCompactorSourceBuilder( + new CombinedTableCompactorSourceBuilder( catalogLoader(), Pattern.compile(".*"), Pattern.compile("db1.+|db2.t1|db3.t1"), @@ -436,7 +436,7 @@ public void testIncludeAndExcludeTableRead(boolean defaultOptions) throws Except monitorInterval) .withContinuousMode(true) .withEnv(env) - .build(); + .buildAwareBucketTableSource(); CloseableIterator it = compactorSource.executeAndCollect(); List actual = new ArrayList<>();