Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Feb 6, 2024
1 parent b5e0f2c commit f6274ef
Show file tree
Hide file tree
Showing 13 changed files with 451 additions and 242 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


package org.apache.paimon.flink.compact;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompactTable;

public abstract class AbstractTableScanLogic <T> implements CompactionTableScanner.TableScanLogic<T> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTableScanLogic.class);
protected final Catalog.Loader catalogLoader;
protected final Pattern includingPattern;
protected final Pattern excludingPattern;
protected final Pattern databasePattern;

protected transient Catalog catalog;

protected AtomicBoolean isRunning;
protected boolean isStreaming;

public AbstractTableScanLogic(Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, AtomicBoolean isRunning, boolean isStreaming) {
this.catalogLoader = catalogLoader;
catalog = catalogLoader.load();

this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
this.databasePattern = databasePattern;
this.isRunning = isRunning;
this.isStreaming = isStreaming;
}

protected void updateTableMap()
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
List<String> databases = catalog.listDatabases();

for (String databaseName : databases) {
if (databasePattern.matcher(databaseName).matches()) {
List<String> tables = catalog.listTables(databaseName);
for (String tableName : tables) {
Identifier identifier = Identifier.create(databaseName, tableName);
if (shouldCompactTable(identifier, includingPattern, excludingPattern)
&& (!tableScanned(identifier))) {
Table table = catalog.getTable(identifier);
if (!(table instanceof FileStoreTable)) {
LOG.error(
String.format(
"Only FileStoreTable supports compact action. The table type is '%s'.",
table.getClass().getName()));
continue;
}

FileStoreTable fileStoreTable = (FileStoreTable) table;
if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) {
LOG.info(
String.format(
"the bucket mode of %s is unware. ",
identifier.getFullName())
+ "currently, the table with unware bucket mode is not support in combined mode.");
continue;
}

addScanTable(fileStoreTable, identifier);
}
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.concurrent.atomic.AtomicBoolean;

public class BatchTableScanner<T> implements CompactionTableScanner<T> {
private AtomicBoolean isRunning = new AtomicBoolean(true);
private TableScanLogic<T> tableScanLogic;

public BatchTableScanner(AtomicBoolean isRunning, TableScanLogic<T> tableScanLogic) {
this.isRunning = isRunning;
this.tableScanLogic = tableScanLogic;
}

@Override
public void scan(SourceFunction.SourceContext<T> ctx) throws Exception {
if (isRunning.get()) {
Boolean isEmpty = tableScanLogic.collectFiles(ctx);
if (isEmpty == null) return;
if (isEmpty) {
throw new Exception(
"No file were collected. Please ensure there are tables detected after pattern matching");
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FileStoreTable;

public interface CompactionTableScanner<T> {
void scan(SourceFunction.SourceContext<T> ctx) throws Exception;

interface TableScanLogic<T>{
Boolean collectFiles(SourceFunction.SourceContext<T> ctx) throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException;

boolean tableScanned(Identifier identifier);

void addScanTable(FileStoreTable fileStoreTable, Identifier identifier);
}
//

}

Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
* limitations under the License.
*/

package org.apache.paimon.flink.source.operator;
package org.apache.paimon.flink.compact;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.BucketMode;
Expand All @@ -26,97 +28,35 @@
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.system.BucketsTable;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.compactOptions;

/**
* This is the single (non-parallel) monitoring task, it is responsible for:
*
* <ol>
* <li>Monitoring snapshots of the Paimon table and the new Paimon table
* <li>Creating the Tuple2<{@link Split}, String> splits corresponding to the incremental files.
* <li>Assigning them to downstream tasks for further processing.
* </ol>
*
* <p>The splits to be read are forwarded to the downstream {@link MultiTablesReadOperator} which
* can have parallelism greater than one.
*
* <p>Currently, the dedicated combine mode compaction of job for multi-tables with multi bucket rely on this monitor.
*/
public abstract class MultiBucketTablesFunction
extends CombineModeCompactorSourceFunction<Tuple2<Split, String>> {

private static final long serialVersionUID = 1L;

private static final Logger LOG =
LoggerFactory.getLogger(MultiBucketTablesFunction.class);

public class MultiBucketTableCompactionLogic extends AbstractTableScanLogic<Tuple2<Split, String>> {
private static final Logger LOG = LoggerFactory.getLogger(MultiBucketTableCompactionLogic.class);
protected transient Map<Identifier, BucketsTable> tablesMap;
protected transient Map<Identifier, StreamTableScan> scansMap;

public MultiBucketTablesFunction(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
boolean isStreaming,
long monitorInterval) {
super(
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
isStreaming,
monitorInterval);
}

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
public MultiBucketTableCompactionLogic(Catalog.Loader catalogLoader, Pattern includingPattern, Pattern excludingPattern, Pattern databasePattern, boolean isStreaming, AtomicBoolean isRunning) {
super(catalogLoader, includingPattern, excludingPattern, databasePattern, isRunning, isStreaming);
tablesMap = new HashMap<>();
scansMap = new HashMap<>();
}

@Override
boolean hasScanned(Identifier identifier) {
return tablesMap.containsKey(identifier);
}

@Override
void applyFileTable(FileStoreTable fileStoreTable, Identifier identifier) {
if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) {
LOG.info(
String.format("the bucket mode of %s is unware. ", identifier.getFullName())
+ "currently, the table with unware bucket mode is not support in combined mode.");
return;
}

BucketsTable bucketsTable =
new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName())
.copy(compactOptions(isStreaming));
tablesMap.put(identifier, bucketsTable);
scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan());
}

@Nullable
@Override
public Boolean execute() throws Exception {
public Boolean collectFiles(SourceFunction.SourceContext<Tuple2<Split, String>> ctx) throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException {
boolean isEmpty;
synchronized (ctx.getCheckpointLock()) {
if (!isRunning) {
if (!isRunning.get()) {
return null;
}

Expand All @@ -143,4 +83,25 @@ public Boolean execute() throws Exception {
}
return isEmpty;
}

@Override
public boolean tableScanned(Identifier identifier) {
return tablesMap.containsKey(identifier);
}

@Override
public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) {
if (fileStoreTable.bucketMode() == BucketMode.UNAWARE) {
LOG.info(
String.format("the bucket mode of %s is unware. ", identifier.getFullName())
+ "currently, the table with unware bucket mode is not support in combined mode.");
return;
}

BucketsTable bucketsTable =
new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName())
.copy(compactOptions(isStreaming));
tablesMap.put(identifier, bucketsTable);
scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.concurrent.atomic.AtomicBoolean;

public class StreamingTableScanner<T> implements CompactionTableScanner<T> {
private final AtomicBoolean isRunning;

private final long monitorInterval;

private final TableScanLogic<T> tableScanLogic;

public StreamingTableScanner(long monitorInterval, TableScanLogic<T> tableScanLogic, AtomicBoolean isRunning) {
this.monitorInterval = monitorInterval;
this.tableScanLogic = tableScanLogic;
this.isRunning = isRunning;
}

@SuppressWarnings("BusyWait")
@Override
public void scan(SourceFunction.SourceContext<T> ctx) throws Exception {
while (isRunning.get()) {
Boolean isEmpty = tableScanLogic.collectFiles(ctx);
if (isEmpty == null) return;
if (isEmpty) {
Thread.sleep(monitorInterval);
}
}
}
}
Loading

0 comments on commit f6274ef

Please sign in to comment.