diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AbstractTableScanLogic.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AbstractTableScanLogic.java new file mode 100644 index 0000000000000..3fc1af1273d98 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AbstractTableScanLogic.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.apache.paimon.table.source.Split; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; + +import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompactTable; + +/** + * This class is responsible for implementing the scanning logic for the table of different type + * buckets during compaction. + * + * @param the result of scanning file : + *
    + *
  1. {@link Tuple2< Split ,String>} for the table with multi buckets, such as dynamic or + * fixed bucket table. + *
  2. {@link AppendOnlyCompactionTask} for the table witch fixed single bucket ,such as + * unaware bucket table. + *
+ */ +public abstract class AbstractTableScanLogic { + private static final Logger LOG = LoggerFactory.getLogger(AbstractTableScanLogic.class); + protected final Catalog.Loader catalogLoader; + protected final Pattern includingPattern; + protected final Pattern excludingPattern; + protected final Pattern databasePattern; + + protected transient Catalog catalog; + + protected AtomicBoolean isRunning; + protected boolean isStreaming; + + public AbstractTableScanLogic( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + boolean isStreaming, + AtomicBoolean isRunning) { + this.catalogLoader = catalogLoader; + catalog = catalogLoader.load(); + + this.includingPattern = includingPattern; + this.excludingPattern = excludingPattern; + this.databasePattern = databasePattern; + this.isRunning = isRunning; + this.isStreaming = isStreaming; + } + + protected void updateTableMap() + throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { + List databases = catalog.listDatabases(); + + for (String databaseName : databases) { + if (databasePattern.matcher(databaseName).matches()) { + List tables = catalog.listTables(databaseName); + for (String tableName : tables) { + Identifier identifier = Identifier.create(databaseName, tableName); + if (shouldCompactTable(identifier, includingPattern, excludingPattern) + && (!checkTableScanned(identifier))) { + Table table = catalog.getTable(identifier); + if (!(table instanceof FileStoreTable)) { + LOG.error( + String.format( + "Only FileStoreTable supports compact action. The table type is '%s'.", + table.getClass().getName())); + continue; + } + + FileStoreTable fileStoreTable = (FileStoreTable) table; + addScanTable(fileStoreTable, identifier); + } + } + } + } + } + + abstract Boolean collectFiles(SourceFunction.SourceContext ctx) + throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException; + + /** Check if table has been scanned. */ + abstract boolean checkTableScanned(Identifier identifier); + + /** Add the scan table to the table map. */ + abstract void addScanTable(FileStoreTable fileStoreTable, Identifier identifier); + + abstract String bucketType(); +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/BatchFileScanner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/BatchFileScanner.java new file mode 100644 index 0000000000000..a49a4c66da5de --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/BatchFileScanner.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class is responsible for scanning files that need to be compact by batch method {@link + * CompactionFileScanner}. + */ +public class BatchFileScanner extends CompactionFileScanner { + private static final Logger LOGGER = LoggerFactory.getLogger(BatchFileScanner.class); + + public BatchFileScanner(AtomicBoolean isRunning, AbstractTableScanLogic tableScanLogic) { + super(isRunning, tableScanLogic); + } + + @Override + public void scan(SourceFunction.SourceContext ctx) throws Exception { + if (isRunning.get()) { + Boolean isEmpty = tableScanLogic.collectFiles(ctx); + if (isEmpty == null) { + return; + } + if (isEmpty) { + // Currently, in the combined mode, there are two scan tasks for the table of two + // different bucket type (multi bucket & unaware bucket) running concurrently. + // There will be a situation that there is only one task compaction , therefore this + // should not be thrown exception here. + LOGGER.info( + "No file were collected for the table of {}", tableScanLogic.bucketType()); + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/CompactionFileScanner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/CompactionFileScanner.java new file mode 100644 index 0000000000000..6689080071d24 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/CompactionFileScanner.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.table.source.Split; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The class is response for scanning the file which need compaction. + * + * @param the result of scanning file : + *
    + *
  1. {@link Split} for the table with multi buckets, such as dynamic or fixed bucket table. + *
  2. {@link AppendOnlyCompactionTask} for the table witch fixed single bucket ,such as + * unaware bucket table. + *
+ */ +public abstract class CompactionFileScanner { + protected final AtomicBoolean isRunning; + + protected final AbstractTableScanLogic tableScanLogic; + + public CompactionFileScanner( + AtomicBoolean isRunning, AbstractTableScanLogic tableScanLogic) { + this.isRunning = isRunning; + this.tableScanLogic = tableScanLogic; + } + + public abstract void scan(SourceFunction.SourceContext ctx) throws Exception; +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiBucketTableScanLogic.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiBucketTableScanLogic.java new file mode 100644 index 0000000000000..4248f39e32098 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiBucketTableScanLogic.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.EndOfScanException; +import org.apache.paimon.table.source.Split; +import org.apache.paimon.table.source.StreamTableScan; +import org.apache.paimon.table.system.BucketsTable; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.compactOptions; + +/** + * This class is responsible for implementing the scanning logic {@link AbstractTableScanLogic} for + * the table with multi bucket such as dynamic or fixed bucket table. + */ +public class MultiBucketTableScanLogic extends AbstractTableScanLogic> { + private static final Logger LOG = LoggerFactory.getLogger(MultiBucketTableScanLogic.class); + protected transient Map tablesMap; + protected transient Map scansMap; + + public MultiBucketTableScanLogic( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + boolean isStreaming, + AtomicBoolean isRunning) { + super( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + isRunning); + tablesMap = new HashMap<>(); + scansMap = new HashMap<>(); + } + + @Override + public Boolean collectFiles(SourceFunction.SourceContext> ctx) + throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException { + boolean isEmpty; + synchronized (ctx.getCheckpointLock()) { + if (!isRunning.get()) { + return null; + } + + // check for new tables + updateTableMap(); + + try { + List> splits = new ArrayList<>(); + for (Map.Entry entry : scansMap.entrySet()) { + Identifier identifier = entry.getKey(); + StreamTableScan scan = entry.getValue(); + splits.addAll( + scan.plan().splits().stream() + .map(split -> new Tuple2<>(split, identifier.getFullName())) + .collect(Collectors.toList())); + } + isEmpty = splits.isEmpty(); + splits.forEach(ctx::collect); + } catch (EndOfScanException esf) { + LOG.info("Catching EndOfStreamException, the stream is finished."); + return null; + } + } + return isEmpty; + } + + @Override + public boolean checkTableScanned(Identifier identifier) { + return tablesMap.containsKey(identifier); + } + + @Override + public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) { + if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) { + BucketsTable bucketsTable = + new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName()) + .copy(compactOptions(isStreaming)); + tablesMap.put(identifier, bucketsTable); + scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan()); + } + } + + @Override + String bucketType() { + return "multi-bucket"; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/StreamingFileScanner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/StreamingFileScanner.java new file mode 100644 index 0000000000000..838163365d9d7 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/StreamingFileScanner.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * This class is responsible for scanning files that need to be compact by stream method {@link + * CompactionFileScanner}. + */ +public class StreamingFileScanner extends CompactionFileScanner { + + private final long monitorInterval; + + public StreamingFileScanner( + long monitorInterval, + AbstractTableScanLogic tableScanLogic, + AtomicBoolean isRunning) { + super(isRunning, tableScanLogic); + this.monitorInterval = monitorInterval; + } + + @SuppressWarnings("BusyWait") + @Override + public void scan(SourceFunction.SourceContext ctx) throws Exception { + while (isRunning.get()) { + Boolean isEmpty = tableScanLogic.collectFiles(ctx); + if (isEmpty == null) { + return; + } + if (isEmpty) { + Thread.sleep(monitorInterval); + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketTableScanLogic.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketTableScanLogic.java new file mode 100644 index 0000000000000..b5b6827e7428a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnawareBucketTableScanLogic.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.source.EndOfScanException; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; + +/** + * This class is responsible for implementing the scanning logic {@link AbstractTableScanLogic} for + * the table with fix single bucket such as unaware bucket table. + */ +public class UnawareBucketTableScanLogic extends AbstractTableScanLogic { + private static final Logger LOG = LoggerFactory.getLogger(UnawareBucketTableScanLogic.class); + + protected transient Map tablesMap; + + public UnawareBucketTableScanLogic( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + boolean isStreaming, + AtomicBoolean isRunning) { + super( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + isRunning); + tablesMap = new HashMap<>(); + } + + @Override + public Boolean collectFiles(SourceFunction.SourceContext ctx) + throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException { + boolean isEmpty; + try { + if (!isRunning.get()) { + return null; + } + + updateTableMap(); + // do scan and plan action, emit append-only compaction tasks. + List tasks = new ArrayList<>(); + for (Map.Entry tableIdAndCoordinator : + tablesMap.entrySet()) { + Identifier tableId = tableIdAndCoordinator.getKey(); + AppendOnlyTableCompactionCoordinator compactionCoordinator = + tableIdAndCoordinator.getValue(); + compactionCoordinator.run().stream() + .map( + task -> + new AppendOnlyCompactionTask( + task.partition(), task.compactBefore(), tableId)) + .forEach(tasks::add); + } + + isEmpty = tasks.isEmpty(); + tasks.forEach(ctx::collect); + } catch (EndOfScanException esf) { + LOG.info("Catching EndOfStreamException, the stream is finished."); + return null; + } + return isEmpty; + } + + @Override + public boolean checkTableScanned(Identifier identifier) { + return tablesMap.containsKey(identifier); + } + + @Override + public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) { + if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) { + tablesMap.put( + identifier, + new AppendOnlyTableCompactionCoordinator(fileStoreTable, isStreaming)); + } + } + + @Override + String bucketType() { + return "unaware-bucket"; + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedBatchMultiSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedBatchMultiSourceFunction.java new file mode 100644 index 0000000000000..4bf1481dce6d0 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedBatchMultiSourceFunction.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.compact.AbstractTableScanLogic; +import org.apache.paimon.flink.compact.BatchFileScanner; +import org.apache.paimon.flink.compact.MultiBucketTableScanLogic; +import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.Split; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.table.data.RowData; + +import java.util.regex.Pattern; + +/** It is responsible for monitoring compactor source of multi bucket table in batch mode. */ +public class CombinedBatchMultiSourceFunction + extends CombinedCompactorSourceFunction> { + + public CombinedBatchMultiSourceFunction( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + super( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + false, + monitorInterval); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + AbstractTableScanLogic> multiBucketTableScanLogic = + new MultiBucketTableScanLogic( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + isRunning); + this.compactionFileScanner = new BatchFileScanner<>(isRunning, multiBucketTableScanLogic); + } + + public static DataStream buildSource( + StreamExecutionEnvironment env, + String name, + TypeInformation typeInfo, + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + CombinedBatchMultiSourceFunction function = + new CombinedBatchMultiSourceFunction( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + monitorInterval); + StreamSource, ?> sourceOperator = new StreamSource<>(function); + TupleTypeInfo> tupleTypeInfo = + new TupleTypeInfo<>( + new JavaTypeInfo<>(Split.class), BasicTypeInfo.STRING_TYPE_INFO); + return new DataStreamSource<>( + env, tupleTypeInfo, sourceOperator, false, name, Boundedness.BOUNDED) + .forceNonParallel() + .partitionCustom( + (key, numPartitions) -> key % numPartitions, + split -> ((DataSplit) split.f0).bucket()) + .transform(name, typeInfo, new MultiTablesReadOperator(catalogLoader, false)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedBatchUnawareSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedBatchUnawareSourceFunction.java new file mode 100644 index 0000000000000..54c21bb52b03a --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedBatchUnawareSourceFunction.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.compact.AbstractTableScanLogic; +import org.apache.paimon.flink.compact.BatchFileScanner; +import org.apache.paimon.flink.compact.UnawareBucketTableScanLogic; +import org.apache.paimon.flink.sink.CompactionTaskTypeInfo; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.streaming.api.transformations.PartitionTransformation; +import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner; + +import java.util.regex.Pattern; + +/** + * It is responsible for the batch compactor source of the table with unaware bucket in combined + * mode. + */ +public class CombinedBatchUnawareSourceFunction + extends CombinedCompactorSourceFunction { + public CombinedBatchUnawareSourceFunction( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + super( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + false, + monitorInterval); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + AbstractTableScanLogic unawareBucketTableScanLogic = + new UnawareBucketTableScanLogic( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + isRunning); + this.compactionFileScanner = new BatchFileScanner<>(isRunning, unawareBucketTableScanLogic); + } + + public static DataStream buildSource( + StreamExecutionEnvironment env, + String name, + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + CombinedBatchUnawareSourceFunction function = + new CombinedBatchUnawareSourceFunction( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + monitorInterval); + StreamSource sourceOperator = + new StreamSource<>(function); + CompactionTaskTypeInfo compactionTaskTypeInfo = new CompactionTaskTypeInfo(); + SingleOutputStreamOperator source = + new DataStreamSource<>( + env, + compactionTaskTypeInfo, + sourceOperator, + false, + name, + Boundedness.BOUNDED) + .forceNonParallel(); + + PartitionTransformation transformation = + new PartitionTransformation<>( + source.getTransformation(), new RebalancePartitioner<>()); + + return new DataStream<>(env, transformation); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java new file mode 100644 index 0000000000000..c1026a7198daf --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedCompactorSourceFunction.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.compact.CompactionFileScanner; +import org.apache.paimon.table.source.Split; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.functions.source.RichSourceFunction; + +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; + +/** + * This is the single (non-parallel) monitoring task, it is responsible for: + * + *
    + *
  1. Monitoring snapshots of the Paimon table. + *
  2. Creating the splits {@link Split} or compaction task {@link AppendOnlyCompactionTask} + * corresponding to the incremental files + *
  3. Assigning them to downstream tasks for further processing. + *
+ * + *

The splits to be read are forwarded to the downstream {@link MultiTablesReadOperator} which + * can have parallelism greater than one. + * + *

Currently, only dedicated compaction job for multi-tables rely on this monitor. This is the + * single (non-parallel) monitoring task, it is responsible for the new Paimon table. + */ +public abstract class CombinedCompactorSourceFunction extends RichSourceFunction { + + private static final long serialVersionUID = 2L; + + protected final Catalog.Loader catalogLoader; + protected final Pattern includingPattern; + protected final Pattern excludingPattern; + protected final Pattern databasePattern; + protected final long monitorInterval; + protected final boolean isStreaming; + + protected AtomicBoolean isRunning; + + protected transient SourceContext ctx; + + protected transient CompactionFileScanner compactionFileScanner; + + public CombinedCompactorSourceFunction( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + boolean isStreaming, + long monitorInterval) { + this.catalogLoader = catalogLoader; + this.includingPattern = includingPattern; + this.excludingPattern = excludingPattern; + this.databasePattern = databasePattern; + this.monitorInterval = monitorInterval; + this.isStreaming = isStreaming; + } + + @Override + public void open(Configuration parameters) throws Exception { + isRunning = new AtomicBoolean(true); + } + + @Override + public void run(SourceContext sourceContext) throws Exception { + this.ctx = sourceContext; + compactionFileScanner.scan(ctx); + } + + @Override + public void cancel() { + // this is to cover the case where cancel() is called before the run() + if (ctx != null) { + synchronized (ctx.getCheckpointLock()) { + isRunning.set(false); + } + } else { + isRunning.set(false); + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedStreamingMultiSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedStreamingMultiSourceFunction.java new file mode 100644 index 0000000000000..6394fc0b3f960 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedStreamingMultiSourceFunction.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.compact.AbstractTableScanLogic; +import org.apache.paimon.flink.compact.MultiBucketTableScanLogic; +import org.apache.paimon.flink.compact.StreamingFileScanner; +import org.apache.paimon.flink.utils.JavaTypeInfo; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.Split; + +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.StreamSource; +import org.apache.flink.table.data.RowData; + +import java.util.regex.Pattern; + +/** It is responsible for monitoring compactor source of multi bucket table in stream mode. */ +public class CombinedStreamingMultiSourceFunction + extends CombinedCompactorSourceFunction> { + + public CombinedStreamingMultiSourceFunction( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + super( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + true, + monitorInterval); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + AbstractTableScanLogic> multiBucketTableScanLogic = + new MultiBucketTableScanLogic( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + isRunning); + this.compactionFileScanner = + new StreamingFileScanner<>(monitorInterval, multiBucketTableScanLogic, isRunning); + } + + public static DataStream buildSource( + StreamExecutionEnvironment env, + String name, + TypeInformation typeInfo, + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + + CombinedStreamingMultiSourceFunction function = + new CombinedStreamingMultiSourceFunction( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + monitorInterval); + StreamSource, ?> sourceOperator = new StreamSource<>(function); + boolean isParallel = false; + TupleTypeInfo> tupleTypeInfo = + new TupleTypeInfo<>( + new JavaTypeInfo<>(Split.class), BasicTypeInfo.STRING_TYPE_INFO); + return new DataStreamSource<>( + env, + tupleTypeInfo, + sourceOperator, + isParallel, + name, + Boundedness.CONTINUOUS_UNBOUNDED) + .forceNonParallel() + .partitionCustom( + (key, numPartitions) -> key % numPartitions, + split -> ((DataSplit) split.f0).bucket()) + .transform(name, typeInfo, new MultiTablesReadOperator(catalogLoader, true)); + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedStreamingUnawareSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedStreamingUnawareSourceFunction.java new file mode 100644 index 0000000000000..314da634df013 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombinedStreamingUnawareSourceFunction.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.source.operator; + +import org.apache.paimon.append.AppendOnlyCompactionTask; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.compact.AbstractTableScanLogic; +import org.apache.paimon.flink.compact.StreamingFileScanner; +import org.apache.paimon.flink.compact.UnawareBucketTableScanLogic; +import org.apache.paimon.flink.sink.CompactionTaskTypeInfo; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.StreamSource; + +import java.util.regex.Pattern; + +/** + * It is responsible for monitoring compactor source in stream mode for the table of unaware bucket. + */ +public class CombinedStreamingUnawareSourceFunction + extends CombinedCompactorSourceFunction { + public CombinedStreamingUnawareSourceFunction( + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + super( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + true, + monitorInterval); + } + + @Override + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + AbstractTableScanLogic unwareBucketTableScanLogic = + new UnawareBucketTableScanLogic( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + isStreaming, + isRunning); + this.compactionFileScanner = + new StreamingFileScanner<>(monitorInterval, unwareBucketTableScanLogic, isRunning); + } + + public static DataStream buildSource( + StreamExecutionEnvironment env, + String name, + Catalog.Loader catalogLoader, + Pattern includingPattern, + Pattern excludingPattern, + Pattern databasePattern, + long monitorInterval) { + + CombinedStreamingUnawareSourceFunction function = + new CombinedStreamingUnawareSourceFunction( + catalogLoader, + includingPattern, + excludingPattern, + databasePattern, + monitorInterval); + StreamSource + sourceOperator = new StreamSource<>(function); + boolean isParallel = false; + CompactionTaskTypeInfo compactionTaskTypeInfo = new CompactionTaskTypeInfo(); + return new DataStreamSource<>( + env, + compactionTaskTypeInfo, + sourceOperator, + isParallel, + name, + Boundedness.CONTINUOUS_UNBOUNDED) + .forceNonParallel() + .rebalance(); + } +}