Skip to content

Commit

Permalink
refact the scan logic of combined mode compaction to reuse the logic …
Browse files Browse the repository at this point in the history
…of scan table and file.
  • Loading branch information
wg1026688210 committed Apr 10, 2024
1 parent 4953776 commit b54f138
Show file tree
Hide file tree
Showing 11 changed files with 1,051 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact;

import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.Split;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;

import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.shouldCompactTable;

/**
* This class is responsible for implementing the scanning logic for the table of different type
* buckets during compaction.
*
* @param <T> the result of scanning file :
* <ol>
* <li>{@link Tuple2< Split ,String>} for the table with multi buckets, such as dynamic or
* fixed bucket table.
* <li>{@link AppendOnlyCompactionTask} for the table witch fixed single bucket ,such as
* unaware bucket table.
* </ol>
*/
public abstract class AbstractTableScanLogic<T> {
private static final Logger LOG = LoggerFactory.getLogger(AbstractTableScanLogic.class);
protected final Catalog.Loader catalogLoader;
protected final Pattern includingPattern;
protected final Pattern excludingPattern;
protected final Pattern databasePattern;

protected transient Catalog catalog;

protected AtomicBoolean isRunning;
protected boolean isStreaming;

public AbstractTableScanLogic(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
boolean isStreaming,
AtomicBoolean isRunning) {
this.catalogLoader = catalogLoader;
catalog = catalogLoader.load();

this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
this.databasePattern = databasePattern;
this.isRunning = isRunning;
this.isStreaming = isStreaming;
}

protected void updateTableMap()
throws Catalog.DatabaseNotExistException, Catalog.TableNotExistException {
List<String> databases = catalog.listDatabases();

for (String databaseName : databases) {
if (databasePattern.matcher(databaseName).matches()) {
List<String> tables = catalog.listTables(databaseName);
for (String tableName : tables) {
Identifier identifier = Identifier.create(databaseName, tableName);
if (shouldCompactTable(identifier, includingPattern, excludingPattern)
&& (!checkTableScanned(identifier))) {
Table table = catalog.getTable(identifier);
if (!(table instanceof FileStoreTable)) {
LOG.error(
String.format(
"Only FileStoreTable supports compact action. The table type is '%s'.",
table.getClass().getName()));
continue;
}

FileStoreTable fileStoreTable = (FileStoreTable) table;
addScanTable(fileStoreTable, identifier);
}
}
}
}
}

abstract Boolean collectFiles(SourceFunction.SourceContext<T> ctx)
throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException;

/** Check if table has been scanned. */
abstract boolean checkTableScanned(Identifier identifier);

/** Add the scan table to the table map. */
abstract void addScanTable(FileStoreTable fileStoreTable, Identifier identifier);

abstract String bucketType();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact;

import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* This class is responsible for scanning files that need to be compact by batch method {@link
* CompactionFileScanner}.
*/
public class BatchFileScanner<T> extends CompactionFileScanner<T> {
private static final Logger LOGGER = LoggerFactory.getLogger(BatchFileScanner.class);

public BatchFileScanner(AtomicBoolean isRunning, AbstractTableScanLogic<T> tableScanLogic) {
super(isRunning, tableScanLogic);
}

@Override
public void scan(SourceFunction.SourceContext<T> ctx) throws Exception {
if (isRunning.get()) {
Boolean isEmpty = tableScanLogic.collectFiles(ctx);
if (isEmpty == null) {
return;
}
if (isEmpty) {
// Currently, in the combined mode, there are two scan tasks for the table of two
// different bucket type (multi bucket & unaware bucket) running concurrently.
// There will be a situation that there is only one task compaction , therefore this
// should not be thrown exception here.
LOGGER.info(
"No file were collected for the table of {}", tableScanLogic.bucketType());
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact;

import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.table.source.Split;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.concurrent.atomic.AtomicBoolean;

/**
* The class is response for scanning the file which need compaction.
*
* @param <T> the result of scanning file :
* <ol>
* <li>{@link Split} for the table with multi buckets, such as dynamic or fixed bucket table.
* <li>{@link AppendOnlyCompactionTask} for the table witch fixed single bucket ,such as
* unaware bucket table.
* </ol>
*/
public abstract class CompactionFileScanner<T> {
protected final AtomicBoolean isRunning;

protected final AbstractTableScanLogic<T> tableScanLogic;

public CompactionFileScanner(
AtomicBoolean isRunning, AbstractTableScanLogic<T> tableScanLogic) {
this.isRunning = isRunning;
this.tableScanLogic = tableScanLogic;
}

public abstract void scan(SourceFunction.SourceContext<T> ctx) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.compact;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.table.BucketMode;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.EndOfScanException;
import org.apache.paimon.table.source.Split;
import org.apache.paimon.table.source.StreamTableScan;
import org.apache.paimon.table.system.BucketsTable;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.utils.MultiTablesCompactorUtil.compactOptions;

/**
* This class is responsible for implementing the scanning logic {@link AbstractTableScanLogic} for
* the table with multi bucket such as dynamic or fixed bucket table.
*/
public class MultiBucketTableScanLogic extends AbstractTableScanLogic<Tuple2<Split, String>> {
private static final Logger LOG = LoggerFactory.getLogger(MultiBucketTableScanLogic.class);
protected transient Map<Identifier, BucketsTable> tablesMap;
protected transient Map<Identifier, StreamTableScan> scansMap;

public MultiBucketTableScanLogic(
Catalog.Loader catalogLoader,
Pattern includingPattern,
Pattern excludingPattern,
Pattern databasePattern,
boolean isStreaming,
AtomicBoolean isRunning) {
super(
catalogLoader,
includingPattern,
excludingPattern,
databasePattern,
isStreaming,
isRunning);
tablesMap = new HashMap<>();
scansMap = new HashMap<>();
}

@Override
public Boolean collectFiles(SourceFunction.SourceContext<Tuple2<Split, String>> ctx)
throws Catalog.TableNotExistException, Catalog.DatabaseNotExistException {
boolean isEmpty;
synchronized (ctx.getCheckpointLock()) {
if (!isRunning.get()) {
return null;
}

// check for new tables
updateTableMap();

try {
List<Tuple2<Split, String>> splits = new ArrayList<>();
for (Map.Entry<Identifier, StreamTableScan> entry : scansMap.entrySet()) {
Identifier identifier = entry.getKey();
StreamTableScan scan = entry.getValue();
splits.addAll(
scan.plan().splits().stream()
.map(split -> new Tuple2<>(split, identifier.getFullName()))
.collect(Collectors.toList()));
}
isEmpty = splits.isEmpty();
splits.forEach(ctx::collect);
} catch (EndOfScanException esf) {
LOG.info("Catching EndOfStreamException, the stream is finished.");
return null;
}
}
return isEmpty;
}

@Override
public boolean checkTableScanned(Identifier identifier) {
return tablesMap.containsKey(identifier);
}

@Override
public void addScanTable(FileStoreTable fileStoreTable, Identifier identifier) {
if (fileStoreTable.bucketMode() != BucketMode.UNAWARE) {
BucketsTable bucketsTable =
new BucketsTable(fileStoreTable, isStreaming, identifier.getDatabaseName())
.copy(compactOptions(isStreaming));
tablesMap.put(identifier, bucketsTable);
scansMap.put(identifier, bucketsTable.newReadBuilder().newStreamScan());
}
}

@Override
String bucketType() {
return "multi-bucket";
}
}
Loading

0 comments on commit b54f138

Please sign in to comment.