From f6274efa06356620250a3d6ce717d104fd5356c5 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Tue, 6 Feb 2024 15:54:05 +0800 Subject: [PATCH] refactor --- .../flink/compact/AbstractTableScanLogic.java | 95 ++++++++++++++ .../flink/compact/BatchTableScanner.java | 45 +++++++ .../flink/compact/CompactionTableScanner.java | 39 ++++++ .../MultiBucketTableCompactionLogic.java} | 101 +++++---------- .../flink/compact/StreamingTableScanner.java | 49 ++++++++ .../UnwareCompactionLogic.java} | 88 ++++--------- .../org/apache/paimon/flink/source/Base.java | 101 +++++++++++++++ .../source/operator/BatchMultiFunction.java | 16 +-- .../source/operator/BatchUnawareFunction.java | 11 +- .../CombineModeCompactorSourceFunction.java | 118 +++++------------- .../operator/MultiTablesReadOperator.java | 2 +- .../operator/StreamingMultiFunction.java | 16 ++- .../operator/StreamingUnawareFunction.java | 12 +- 13 files changed, 451 insertions(+), 242 deletions(-) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AbstractTableScanLogic.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/BatchTableScanner.java create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/CompactionTableScanner.java rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/{source/operator/MultiBucketTablesFunction.java => compact/MultiBucketTableCompactionLogic.java} (66%) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/StreamingTableScanner.java rename paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/{source/operator/UnawareBucketTablesFunction.java => compact/UnwareCompactionLogic.java} (59%) create mode 100644 paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/Base.java diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AbstractTableScanLogic.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AbstractTableScanLogic.java new file mode 100644 index 0000000000000..fc2ca96850b06 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AbstractTableScanLogic.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.paimon.flink.compact; + +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.BucketMode; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.regex.Pattern; + +import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompactTable; + +public abstract class AbstractTableScanLogic implements CompactionTableScanner.TableScanLogic { + private static final Logger LOG = LoggerFactory.getLogger(AbstractTableScanLogic.class); + protected final Catalog.Loader catalogLoader; + protected final Pattern includingPattern; + protected final Pattern excludingPattern; + protected final Pattern databasePattern; + + protected transient Catalog catalog; + + protected AtomicBoolean isRunning; + protected boolean isStreaming; + + public AbstractTableScanLogic(Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, AtomicBoolean isRunning, boolean isStreaming) { + this.catalogLoader = catalogLoader; + catalog = catalogLoader.load(); + + this.includingPattern = includingPattern; + this.excludingPattern = excludingPattern; + this.databasePattern = databasePattern; + this.isRunning = isRunning; + this.isStreaming = isStreaming; + } + + protected void updateTableMap() + throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { + List databases = catalog.listDatabases(); + + for (String databaseName : databases) { + if (databasePattern.matcher(databaseName).matches()) { + List tables = catalog.listTables(databaseName); + for (String tableName : tables) { + Identifier identifier = Identifier.create(databaseName, tableName); + if (shouldCompactTable(identifier, includingPattern, excludingPattern) + && (!tableScanned(identifier))) { + Table table = catalog.getTable(identifier); + if (!(table instanceof FileStoreTable)) { + LOG.error( + String.format( + "Only FileStoreTable supports compact action. The table type is '%s'.", + table.getClass().getName())); + continue; + } + + FileStoreTable fileStoreTable = (FileStoreTable) table; + if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) { + LOG.info( + String.format( + "the bucket mode of %s is unware. ", + identifier.getFullName()) + + "currently, the table with unware bucket mode is not support in combined mode."); + continue; + } + + addScanTable(fileStoreTable, identifier); + } + } + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/BatchTableScanner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/BatchTableScanner.java new file mode 100644 index 0000000000000..2e68ec0e32b72 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/BatchTableScanner.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class BatchTableScanner implements CompactionTableScanner { + private AtomicBoolean isRunning = new AtomicBoolean(true); + private TableScanLogic tableScanLogic; + + public BatchTableScanner(AtomicBoolean isRunning, TableScanLogic tableScanLogic) { + this.isRunning = isRunning; + this.tableScanLogic = tableScanLogic; + } + + @Override + public void scan(SourceFunction.SourceContext ctx) throws Exception { + if (isRunning.get()) { + Boolean isEmpty = tableScanLogic.collectFiles(ctx); + if (isEmpty == null) return; + if (isEmpty) { + throw new Exception( + "No file were collected. Please ensure there are tables detected after pattern matching"); + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/CompactionTableScanner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/CompactionTableScanner.java new file mode 100644 index 0000000000000..c1a26b4801973 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/CompactionTableScanner.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; +import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.catalog.Identifier; +import org.apache.paimon.table.FileStoreTable; + +public interface CompactionTableScanner { + void scan(SourceFunction.SourceContext ctx) throws Exception; + + interface TableScanLogic{ + Boolean collectFiles(SourceFunction.SourceContext ctx) throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException; + + boolean tableScanned(Identifier identifier); + + void addScanTable(FileStoreTable fileStoreTable, Identifier identifier); + } +// + +} + diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiBucketTablesFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiBucketTableCompactionLogic.java similarity index 66% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiBucketTablesFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiBucketTableCompactionLogic.java index fe3621a83f6aa..c5caa2c8293b6 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiBucketTablesFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/MultiBucketTableCompactionLogic.java @@ -16,8 +16,10 @@ * limitations under the License. */ -package org.apache.paimon.flink.source.operator; +package org.apache.paimon.flink.compact; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; import org.apache.paimon.table.BucketMode; @@ -26,10 +28,6 @@ import org.apache.paimon.table.source.Split; import org.apache.paimon.table.source.StreamTableScan; import org.apache.paimon.table.system.BucketsTable; - -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,86 +35,28 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; import java.util.stream.Collectors; import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.compactOptions; -/** - * This is the single (non-parallel) monitoring task, it is responsible for: - * - *
    - *
  1. Monitoring snapshots of the Paimon table and the new Paimon table - *
  2. Creating the Tuple2<{@link Split}, String> splits corresponding to the incremental files. - *
  3. Assigning them to downstream tasks for further processing. - *
- * - *

The splits to be read are forwarded to the downstream {@link MultiTablesReadOperator} which - * can have parallelism greater than one. - * - *

Currently, the dedicated combine mode compaction of job for multi-tables with multi bucket rely on this monitor. - */ -public abstract class MultiBucketTablesFunction - extends CombineModeCompactorSourceFunction> { - - private static final long serialVersionUID = 1L; - - private static final Logger LOG = - LoggerFactory.getLogger(MultiBucketTablesFunction.class); - +public class MultiBucketTableCompactionLogic extends AbstractTableScanLogic> { + private static final Logger LOG = LoggerFactory.getLogger(MultiBucketTableCompactionLogic.class); protected transient Map tablesMap; protected transient Map scansMap; - public MultiBucketTablesFunction( - Catalog.Loader catalogLoader, - Pattern includingPattern, - Pattern excludingPattern, - Pattern databasePattern, - boolean isStreaming, - long monitorInterval) { - super( - catalogLoader, - includingPattern, - excludingPattern, - databasePattern, - isStreaming, - monitorInterval); - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); + public MultiBucketTableCompactionLogic(Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, boolean isStreaming, AtomicBoolean isRunning) { + super(catalogLoader, includingPattern, excludingPattern, databasePattern, isRunning, isStreaming); tablesMap = new HashMap<>(); scansMap = new HashMap<>(); } @Override - boolean hasScanned(Identifier identifier) { - return tablesMap.containsKey(identifier); - } - - @Override - void applyFileTable(FileStoreTable fileStoreTable, Identifier identifier) { - if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) { - LOG.info( - String.format("the bucket mode of %s is unware. ", identifier.getFullName()) - + "currently, the table with unware bucket mode is not support in combined mode."); - return; - } - - BucketsTable bucketsTable = - new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName()) - .copy(compactOptions(isStreaming)); - tablesMap.put(identifier, bucketsTable); - scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan()); - } - - @Nullable - @Override - public Boolean execute() throws Exception { + public Boolean collectFiles(SourceFunction.SourceContext> ctx) throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException { boolean isEmpty; synchronized (ctx.getCheckpointLock()) { - if (!isRunning) { + if (!isRunning.get()) { return null; } @@ -143,4 +83,25 @@ public Boolean execute() throws Exception { } return isEmpty; } + + @Override + public boolean tableScanned(Identifier identifier) { + return tablesMap.containsKey(identifier); + } + + @Override + public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) { + if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) { + LOG.info( + String.format("the bucket mode of %s is unware. ", identifier.getFullName()) + + "currently, the table with unware bucket mode is not support in combined mode."); + return; + } + + BucketsTable bucketsTable = + new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName()) + .copy(compactOptions(isStreaming)); + tablesMap.put(identifier, bucketsTable); + scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan()); + } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/StreamingTableScanner.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/StreamingTableScanner.java new file mode 100644 index 0000000000000..01760d5897de8 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/StreamingTableScanner.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.compact; + +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class StreamingTableScanner implements CompactionTableScanner { + private final AtomicBoolean isRunning; + + private final long monitorInterval; + + private final TableScanLogic tableScanLogic; + + public StreamingTableScanner(long monitorInterval, TableScanLogic tableScanLogic, AtomicBoolean isRunning) { + this.monitorInterval = monitorInterval; + this.tableScanLogic = tableScanLogic; + this.isRunning = isRunning; + } + + @SuppressWarnings("BusyWait") + @Override + public void scan(SourceFunction.SourceContext ctx) throws Exception { + while (isRunning.get()) { + Boolean isEmpty = tableScanLogic.collectFiles(ctx); + if (isEmpty == null) return; + if (isEmpty) { + Thread.sleep(monitorInterval); + } + } + } +} diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/UnawareBucketTablesFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnwareCompactionLogic.java similarity index 59% rename from paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/UnawareBucketTablesFunction.java rename to paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnwareCompactionLogic.java index 59ef905e53766..f93b2bdf27edb 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/UnawareBucketTablesFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/UnwareCompactionLogic.java @@ -16,19 +16,16 @@ * limitations under the License. */ -package org.apache.paimon.flink.source.operator; +package org.apache.paimon.flink.compact; +import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.paimon.append.AppendOnlyCompactionTask; import org.apache.paimon.append.AppendOnlyTableCompactionCoordinator; import org.apache.paimon.catalog.Catalog; import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.flink.sink.UnawareCombineCompactionWorkerOperator; import org.apache.paimon.table.BucketMode; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.source.EndOfScanException; - -import org.apache.flink.configuration.Configuration; -import org.jetbrains.annotations.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -36,76 +33,24 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; -/** - * This is the single (non-parallel) monitoring task , it is responsible for the dedicated - * compaction of combine mode job for multi-tables with the unaware bucket. - * - *

    - *
  1. Monitoring snapshots of the Paimon table and the new Paimon table. - *
  2. Creating the {@link AppendOnlyCompactionTask} corresponding to the incremental files. - *
  3. Assigning them to downstream tasks for further processing. - *
- * - *

The {@link AppendOnlyCompactionTask} to be read are forwarded to the downstream {@link - * UnawareCombineCompactionWorkerOperator} which can have parallelism greater than one. - * - *

Currently, only the dedicated compaction of combine mode job for multi-tables with the fix and - * dynamic bucket rely on this monitor. - */ -public abstract class UnawareBucketTablesFunction - extends CombineModeCompactorSourceFunction { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = - LoggerFactory.getLogger(UnawareBucketTablesFunction.class); +public class UnwareCompactionLogic extends AbstractTableScanLogic { + private static final Logger LOG = LoggerFactory.getLogger(UnwareCompactionLogic.class); protected transient Map tablesMap; - public UnawareBucketTablesFunction( - Catalog.Loader catalogLoader, - Pattern includingPattern, - Pattern excludingPattern, - Pattern databasePattern, - boolean isStreaming, - long monitorInterval) { - super( - catalogLoader, - includingPattern, - excludingPattern, - databasePattern, - isStreaming, - monitorInterval); - } - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); + public UnwareCompactionLogic(Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, boolean isStreaming, AtomicBoolean isRunning) { + super(catalogLoader, includingPattern, excludingPattern, databasePattern, isRunning, isStreaming); tablesMap = new HashMap<>(); - updateTableMap(); - } - - @Override - boolean hasScanned(Identifier identifier) { - return tablesMap.containsKey(identifier); - } - - @Override - void applyFileTable(FileStoreTable fileStoreTable, Identifier identifier) { - if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) { - tablesMap.put( - identifier, - new AppendOnlyTableCompactionCoordinator(fileStoreTable, isStreaming)); - } } - @Nullable @Override - public Boolean execute() throws Exception { + public Boolean collectFiles(SourceFunction.SourceContext ctx) throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException { boolean isEmpty; try { - if (!isRunning) { + if (!isRunning.get()) { return null; } @@ -133,4 +78,19 @@ public Boolean execute() throws Exception { } return isEmpty; } + + @Override + public boolean tableScanned(Identifier identifier) { + return tablesMap.containsKey(identifier); + } + + @Override + public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) { + if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) { + tablesMap.put( + identifier, + new AppendOnlyTableCompactionCoordinator(fileStoreTable, isStreaming)); + } + } + } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/Base.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/Base.java new file mode 100644 index 0000000000000..ae95bdc442545 --- /dev/null +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/Base.java @@ -0,0 +1,101 @@ +package org.apache.paimon.flink.source; + + +// 必须实现的接口 +interface FlinkSourceFunction { + void run(); +} + +public abstract class Base { + + // using in stream execute + private int xxx; + private int yyy; + + //using in batch execute + private int zzz; + private int aaa; + + + abstract void execute(); + public void streamExecute(){ + //do stream logic + + //and the invoke common logic execute() + execute(); + } + + public void batchExecute(){ + //do batch logic + + //and the invoke common loigc execute() + + execute(); + } + abstract void shouldExecute(); +} + +abstract class Base2 extends Base { + + public void execute(){ + //the invoke common logic1 + } +} + +abstract class Base3 extends Base { + + public void execute(){ + //the invoke common logic2 + } +} + + +class StreamExecutor2 extends Base2 implements FlinkSourceFunction { + + @Override + public void run(){ + streamExecute(); + } + + @Override + void shouldExecute() { + //do some check + } +} + +class StreamExecutor3 extends Base3 implements FlinkSourceFunction { + + @Override + public void run(){ + streamExecute(); + } + + @Override + void shouldExecute() { + //do some check + } +} + +class BatchExecutor2 extends Base2 implements FlinkSourceFunction { + @Override + public void run(){ + batchExecute(); + } + @Override + void shouldExecute() { + //do some check + } +} + +class BatchExecutor3 extends Base3 implements FlinkSourceFunction { + @Override + public void run(){ + batchExecute(); + } + @Override + void shouldExecute() { + //do some check + } +} + + diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchMultiFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchMultiFunction.java index 8a8babbdd2290..289e4992e6b29 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchMultiFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchMultiFunction.java @@ -18,7 +18,10 @@ package org.apache.paimon.flink.source.operator; +import org.apache.flink.configuration.Configuration; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.compact.BatchTableScanner; +import org.apache.paimon.flink.compact.MultiBucketTableCompactionLogic; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; @@ -33,16 +36,12 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.table.data.RowData; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.regex.Pattern; /** It is responsible for monitoring compactor source in batch mode. */ -public class BatchMultiFunction extends MultiBucketTablesFunction { +public class BatchMultiFunction extends CombineModeCompactorSourceFunction> { - private static final Logger LOG = - LoggerFactory.getLogger(BatchMultiFunction.class); public BatchMultiFunction( Catalog.Loader catalogLoader, @@ -60,8 +59,11 @@ public BatchMultiFunction( } @Override - public void run(SourceContext> ctx) throws Exception { - batchMonitor(ctx); + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + MultiBucketTableCompactionLogic multiBucketTableCompactionLogic = new MultiBucketTableCompactionLogic(catalogLoader, includingPattern, excludingPattern, databasePattern, isStreaming, isRunning); + this.compactionTableScanner =new BatchTableScanner<>(isRunning, multiBucketTableCompactionLogic); } public static DataStream buildSource( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchUnawareFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchUnawareFunction.java index a7b9d6cc4e275..cf9da924f02a2 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchUnawareFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/BatchUnawareFunction.java @@ -18,8 +18,11 @@ package org.apache.paimon.flink.source.operator; +import org.apache.flink.configuration.Configuration; import org.apache.paimon.append.AppendOnlyCompactionTask; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.compact.BatchTableScanner; +import org.apache.paimon.flink.compact.UnwareCompactionLogic; import org.apache.paimon.flink.sink.CompactionTaskTypeInfo; import org.apache.flink.api.connector.source.Boundedness; @@ -36,7 +39,7 @@ /** * It is responsible for the batch compactor source of the table of unaware bucket in combined mode. */ -public class BatchUnawareFunction extends UnawareBucketTablesFunction { +public class BatchUnawareFunction extends CombineModeCompactorSourceFunction { public BatchUnawareFunction( Catalog.Loader catalogLoader, Pattern includingPattern, @@ -53,8 +56,10 @@ public BatchUnawareFunction( } @Override - public void run(SourceContext sourceContext) throws Exception { - this.batchMonitor(sourceContext); + public void open(Configuration parameters) throws Exception { + super.open(parameters); + UnwareCompactionLogic unwareCompactionLogic = new UnwareCompactionLogic(catalogLoader,includingPattern,excludingPattern,databasePattern,isStreaming,isRunning); + this.compactionTableScanner = new BatchTableScanner<>(isRunning, unwareCompactionLogic); } public static DataStream buildSource( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombineModeCompactorSourceFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombineModeCompactorSourceFunction.java index 5c62c51cfc5ba..341727af352f4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombineModeCompactorSourceFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/CombineModeCompactorSourceFunction.java @@ -18,31 +18,33 @@ package org.apache.paimon.flink.source.operator; +import org.apache.flink.configuration.Configuration; import org.apache.paimon.catalog.Catalog; -import org.apache.paimon.catalog.Identifier; -import org.apache.paimon.table.BucketMode; -import org.apache.paimon.table.FileStoreTable; -import org.apache.paimon.table.Table; +import org.apache.paimon.flink.compact.CompactionTableScanner; -import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; -import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompactTable; - /** + * This is the single (non-parallel) monitoring task, it is responsible for: + * + *

    + *
  1. Monitoring snapshots of the Paimon table. + *
  2. Creating the splits or compaction task corresponding to the incremental files + *
  3. Assigning them to downstream tasks for further processing. + *
+ * + *

The splits to be read are forwarded to the downstream {@link MultiTablesReadOperator} which + * can have parallelism greater than one. + * + *

Currently, only dedicated compaction job for multi-tables rely on this monitor. * This is the single (non-parallel) monitoring task, it is responsible for the new Paimon table. */ public abstract class CombineModeCompactorSourceFunction extends RichSourceFunction { - private static final long serialVersionUID = 1L; - - private static final Logger LOG = - LoggerFactory.getLogger(CombineModeCompactorSourceFunction.class); + private static final long serialVersionUID = 2L; protected final Catalog.Loader catalogLoader; protected final Pattern includingPattern; @@ -51,7 +53,11 @@ public abstract class CombineModeCompactorSourceFunction extends RichSourceFu protected final long monitorInterval; protected final boolean isStreaming; - protected transient Catalog catalog; + protected AtomicBoolean isRunning; + + protected transient SourceContext ctx; + + protected transient CompactionTableScanner compactionTableScanner; public CombineModeCompactorSourceFunction( Catalog.Loader catalogLoader, @@ -68,13 +74,15 @@ public CombineModeCompactorSourceFunction( this.isStreaming = isStreaming; } - protected volatile boolean isRunning = true; - - protected transient SourceContext ctx; - @Override public void open(Configuration parameters) throws Exception { - catalog = catalogLoader.load(); + isRunning = new AtomicBoolean(true); + } + + @Override + public void run(SourceContext sourceContext) throws Exception { + this.ctx = sourceContext; + compactionTableScanner.scan(ctx); } @Override @@ -82,77 +90,11 @@ public void cancel() { // this is to cover the case where cancel() is called before the run() if (ctx != null) { synchronized (ctx.getCheckpointLock()) { - isRunning = false; + isRunning.set(false); } } else { - isRunning = false; - } - } - - protected void updateTableMap() - throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException { - List databases = catalog.listDatabases(); - - for (String databaseName : databases) { - if (databasePattern.matcher(databaseName).matches()) { - List tables = catalog.listTables(databaseName); - for (String tableName : tables) { - Identifier identifier = Identifier.create(databaseName, tableName); - if (shouldCompactTable(identifier, includingPattern, excludingPattern) - && (!hasScanned(identifier))) { - Table table = catalog.getTable(identifier); - if (!(table instanceof FileStoreTable)) { - LOG.error( - String.format( - "Only FileStoreTable supports compact action. The table type is '%s'.", - table.getClass().getName())); - continue; - } - - FileStoreTable fileStoreTable = (FileStoreTable) table; - if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) { - LOG.info( - String.format( - "the bucket mode of %s is unware. ", - identifier.getFullName()) - + "currently, the table with unware bucket mode is not support in combined mode."); - continue; - } - - applyFileTable(fileStoreTable, identifier); - } - } - } - } - } - - abstract boolean hasScanned(Identifier identifier); - - abstract void applyFileTable(FileStoreTable fileStoreTable, Identifier identifier); - - @SuppressWarnings("BusyWait") - public void incrementMonitor(SourceContext ctx) throws Exception { - this.ctx = ctx; - while (isRunning) { - Boolean isEmpty = execute(); - if (isEmpty == null) return; - if (isEmpty) { - Thread.sleep(monitorInterval); - } - } - } - - public void batchMonitor(SourceContext ctx) throws Exception { - this.ctx = ctx; - if (isRunning) { - Boolean isEmpty = execute(); - if (isEmpty == null) return; - if (isEmpty) { - throw new Exception( - "No file were collected. Please ensure there are tables detected after pattern matching"); - } + isRunning.set(false); } } - abstract Boolean execute() throws Exception; } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java index b94631a00a3f1..07a2d2ea59fcc 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/MultiTablesReadOperator.java @@ -45,7 +45,7 @@ /** * The operator that reads the Tuple2<{@link Split}, String> received from the preceding {@link * BatchMultiFunction} or {@link StreamingMultiFunction}. - * Contrary to the {@link MultiBucketTablesFunction} which has a parallelism of 1, this + * Contrary to the {@link CombineModeCompactorSourceFunction} which has a parallelism of 1, this * operator can have DOP > 1. */ public class MultiTablesReadOperator extends AbstractStreamOperator diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingMultiFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingMultiFunction.java index c5dc2c3a678a3..b5b5a3ba93617 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingMultiFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingMultiFunction.java @@ -18,7 +18,11 @@ package org.apache.paimon.flink.source.operator; +import org.apache.flink.configuration.Configuration; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.compact.BatchTableScanner; +import org.apache.paimon.flink.compact.MultiBucketTableCompactionLogic; +import org.apache.paimon.flink.compact.StreamingTableScanner; import org.apache.paimon.flink.utils.JavaTypeInfo; import org.apache.paimon.table.source.DataSplit; import org.apache.paimon.table.source.Split; @@ -40,10 +44,7 @@ /** It is responsible for monitoring compactor source in batch mode. */ public class StreamingMultiFunction - extends MultiBucketTablesFunction { - - private static final Logger LOG = - LoggerFactory.getLogger(StreamingMultiFunction.class); + extends CombineModeCompactorSourceFunction> { public StreamingMultiFunction( Catalog.Loader catalogLoader, @@ -61,8 +62,11 @@ public StreamingMultiFunction( } @Override - public void run(SourceContext> ctx) throws Exception { - incrementMonitor(ctx); + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + MultiBucketTableCompactionLogic multiBucketTableCompactionLogic = new MultiBucketTableCompactionLogic(catalogLoader, includingPattern, excludingPattern, databasePattern, isStreaming, isRunning); + this.compactionTableScanner = new StreamingTableScanner<>(monitorInterval, multiBucketTableCompactionLogic, isRunning); } public static DataStream buildSource( diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingUnawareFunction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingUnawareFunction.java index 56b907def5ab8..80293b5e8f84c 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingUnawareFunction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/operator/StreamingUnawareFunction.java @@ -18,8 +18,11 @@ package org.apache.paimon.flink.source.operator; +import org.apache.flink.configuration.Configuration; import org.apache.paimon.append.AppendOnlyCompactionTask; import org.apache.paimon.catalog.Catalog; +import org.apache.paimon.flink.compact.StreamingTableScanner; +import org.apache.paimon.flink.compact.UnwareCompactionLogic; import org.apache.paimon.flink.sink.CompactionTaskTypeInfo; import org.apache.flink.api.connector.source.Boundedness; @@ -33,7 +36,7 @@ /** * It is responsible for monitoring compactor source in stream mode for the table of unaware bucket. */ -public class StreamingUnawareFunction extends UnawareBucketTablesFunction { +public class StreamingUnawareFunction extends CombineModeCompactorSourceFunction { public StreamingUnawareFunction( Catalog.Loader catalogLoader, Pattern includingPattern, @@ -50,8 +53,11 @@ public StreamingUnawareFunction( } @Override - public void run(SourceContext sourceContext) throws Exception { - this.incrementMonitor(sourceContext); + public void open(Configuration parameters) throws Exception { + super.open(parameters); + + UnwareCompactionLogic unwareCompactionLogic = new UnwareCompactionLogic(catalogLoader, includingPattern, excludingPattern, databasePattern, isStreaming, isRunning); + this.compactionTableScanner =new StreamingTableScanner<>(monitorInterval,unwareCompactionLogic,isRunning); } public static DataStream buildSource(