Skip to content

Commit

Permalink
[flink] Adjust MarkPartitionDoneAction methods
Browse files Browse the repository at this point in the history
  • Loading branch information
JingsongLi committed Jun 26, 2024
1 parent c1551a4 commit a4085ca
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 98 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,15 @@ public static <T extends Throwable> void closeAll(
}
}

/**
* Closes all {@link AutoCloseable} objects in the parameter quietly.
*
* <p><b>Important:</b> This method is expected to never throw an exception.
*/
public static void closeAllQuietly(Iterable<? extends AutoCloseable> closeables) {
closeables.forEach(IOUtils::closeQuietly);
}

/**
* Closes the given AutoCloseable.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/** Utils for file system. */
public class PartitionPathUtils {
Expand Down Expand Up @@ -89,6 +90,15 @@ public static String generatePartitionPath(LinkedHashMap<String, String> partiti
return suffixBuf.toString();
}

public static List<String> generatePartitionPaths(
List<Map<String, String>> partitions, RowType partitionType) {
return partitions.stream()
.map(
partition ->
PartitionPathUtils.generatePartitionPath(partition, partitionType))
.collect(Collectors.toList());
}

public static String generatePartitionPath(
Map<String, String> partitionSpec, RowType partitionType) {
LinkedHashMap<String, String> linkedPartitionSpec = new LinkedHashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,23 @@

package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.PartitionPathUtils;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.closeActions;
import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.getPartitionMarkDoneActions;
import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone;

/** Table partition mark done action for Flink. */
public class PartitionMarkDoneAction extends TableActionBase {
public class MarkPartitionDoneAction extends TableActionBase {

private final FileStoreTable fileStoreTable;
private final List<Map<String, String>> partitions;
private final CoreOptions coreOptions;
private final Options options;

public PartitionMarkDoneAction(
public MarkPartitionDoneAction(
String warehouse,
String databaseName,
String tableName,
Expand All @@ -55,29 +50,19 @@ public PartitionMarkDoneAction(

this.fileStoreTable = (FileStoreTable) table;
this.partitions = partitions;
this.coreOptions = fileStoreTable.coreOptions();
this.options = coreOptions.toConfiguration();
}

@Override
public void run() throws Exception {
List<org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction> actions =
getPartitionMarkDoneActions(fileStoreTable, coreOptions, options);
List<PartitionMarkDoneAction> actions =
PartitionMarkDoneAction.createActions(fileStoreTable, fileStoreTable.coreOptions());

List<String> partitionPaths = getPartitionPaths(fileStoreTable, partitions);
List<String> partitionPaths =
PartitionPathUtils.generatePartitionPaths(
partitions, fileStoreTable.store().partitionType());

markDone(partitionPaths, actions);

closeActions(actions);
}

public static List<String> getPartitionPaths(
FileStoreTable fileStoreTable, List<Map<String, String>> partitions) {
return partitions.stream()
.map(
partition ->
PartitionPathUtils.generatePartitionPath(
partition, fileStoreTable.store().partitionType()))
.collect(Collectors.toList());
IOUtils.closeAllQuietly(actions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import java.util.Map;
import java.util.Optional;

/** Factory to create {@link PartitionMarkDoneAction}. */
public class PartitionMarkDoneActionFactory implements ActionFactory {
/** Factory to create {@link MarkPartitionDoneAction}. */
public class MarkPartitionDoneActionFactory implements ActionFactory {

public static final String IDENTIFIER = "mark_partition_done";

Expand All @@ -44,7 +44,7 @@ public Optional<Action> create(MultipleParameterToolAdapter params) {
Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);

return Optional.of(
new PartitionMarkDoneAction(
new MarkPartitionDoneAction(
tablePath.f0, tablePath.f1, tablePath.f2, partitions, catalogConfig));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,17 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.PartitionPathUtils;

import org.apache.flink.table.procedure.ProcedureContext;

import java.io.IOException;
import java.util.List;

import static org.apache.paimon.flink.action.PartitionMarkDoneAction.getPartitionPaths;
import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.closeActions;
import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.getPartitionMarkDoneActions;
import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone;
import static org.apache.paimon.utils.ParameterUtils.getPartitions;
import static org.apache.paimon.utils.Preconditions.checkArgument;
Expand All @@ -44,7 +43,7 @@
* CALL sys.mark_partition_done('tableId', 'partition1', 'partition2', ...)
* </code></pre>
*/
public class PartitionMarkDoneProcedure extends ProcedureBase {
public class MarkPartitionDoneProcedure extends ProcedureBase {

public static final String IDENTIFIER = "mark_partition_done";

Expand All @@ -64,17 +63,16 @@ public String[] call(

FileStoreTable fileStoreTable = (FileStoreTable) table;
CoreOptions coreOptions = fileStoreTable.coreOptions();
Options options = coreOptions.toConfiguration();

List<org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction> actions =
getPartitionMarkDoneActions(fileStoreTable, coreOptions, options);
List<PartitionMarkDoneAction> actions =
PartitionMarkDoneAction.createActions(fileStoreTable, coreOptions);

List<String> partitionPaths =
getPartitionPaths(fileStoreTable, getPartitions(partitionStrings));
PartitionPathUtils.generatePartitionPaths(
getPartitions(partitionStrings), fileStoreTable.store().partitionType());

markDone(partitionPaths, actions);

closeActions(actions);
IOUtils.closeAllQuietly(actions);

return new String[] {"Success"};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;

Expand All @@ -35,18 +35,12 @@
import java.io.Closeable;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_IDLE_TIME_TO_DONE;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_ACTION;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_WHEN_END_INPUT;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Mark partition done. */
public class PartitionMarkDone implements Closeable {
Expand Down Expand Up @@ -79,31 +73,11 @@ public static PartitionMarkDone create(
PartitionMarkDoneTrigger.create(coreOptions, isRestored, stateStore);

List<PartitionMarkDoneAction> actions =
getPartitionMarkDoneActions(table, coreOptions, options);
PartitionMarkDoneAction.createActions(table, coreOptions);

return new PartitionMarkDone(partitionComputer, trigger, actions);
}

public static List<PartitionMarkDoneAction> getPartitionMarkDoneActions(
FileStoreTable fileStoreTable, CoreOptions coreOptions, Options options) {
return Arrays.asList(options.get(PARTITION_MARK_DONE_ACTION).split(",")).stream()
.map(
action -> {
switch (action) {
case "success-file":
return new SuccessFileMarkDoneAction(
fileStoreTable.fileIO(), fileStoreTable.location());
case "done-partition":
return new AddDonePartitionAction(
createMetastoreClient(
fileStoreTable, coreOptions, options));
default:
throw new UnsupportedOperationException(action);
}
})
.collect(Collectors.toList());
}

private static boolean disablePartitionMarkDone(
boolean isStreaming, FileStoreTable table, Options options) {
boolean partitionMarkDoneWhenEndInput = options.get(PARTITION_MARK_DONE_WHEN_END_INPUT);
Expand All @@ -124,24 +98,6 @@ private static boolean disablePartitionMarkDone(
return false;
}

public static MetastoreClient createMetastoreClient(
FileStoreTable table, CoreOptions coreOptions, Options options) {
MetastoreClient.Factory metastoreClientFactory =
table.catalogEnvironment().metastoreClientFactory();

if (options.get(PARTITION_MARK_DONE_ACTION).contains("done-partition")) {
checkNotNull(
metastoreClientFactory,
"Cannot mark done partition for table without metastore.");
checkArgument(
coreOptions.partitionedTableInMetastore(),
"Table should enable %s",
METASTORE_PARTITIONED_TABLE.key());
}

return metastoreClientFactory.create();
}

public PartitionMarkDone(
InternalRowPartitionComputer partitionComputer,
PartitionMarkDoneTrigger trigger,
Expand Down Expand Up @@ -183,18 +139,12 @@ public static void markDone(List<String> partitions, List<PartitionMarkDoneActio
}
}

public static void closeActions(List<PartitionMarkDoneAction> actions) throws IOException {
for (PartitionMarkDoneAction action : actions) {
action.close();
}
}

public void snapshotState() throws Exception {
trigger.snapshotState();
}

@Override
public void close() throws IOException {
closeActions(actions);
IOUtils.closeAllQuietly(actions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,59 @@

package org.apache.paimon.flink.sink.partition;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.table.FileStoreTable;

import java.io.Closeable;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.METASTORE_PARTITIONED_TABLE;
import static org.apache.paimon.flink.FlinkConnectorOptions.PARTITION_MARK_DONE_ACTION;
import static org.apache.paimon.utils.Preconditions.checkArgument;
import static org.apache.paimon.utils.Preconditions.checkNotNull;

/** Action to mark partitions done. */
public interface PartitionMarkDoneAction extends Closeable {

void markDone(String partition) throws Exception;

static List<PartitionMarkDoneAction> createActions(
FileStoreTable fileStoreTable, CoreOptions options) {
return Arrays.asList(options.toConfiguration().get(PARTITION_MARK_DONE_ACTION).split(","))
.stream()
.map(
action -> {
switch (action) {
case "success-file":
return new SuccessFileMarkDoneAction(
fileStoreTable.fileIO(), fileStoreTable.location());
case "done-partition":
return new AddDonePartitionAction(
createMetastoreClient(fileStoreTable, options));
default:
throw new UnsupportedOperationException(action);
}
})
.collect(Collectors.toList());
}

static MetastoreClient createMetastoreClient(FileStoreTable table, CoreOptions options) {
MetastoreClient.Factory metastoreClientFactory =
table.catalogEnvironment().metastoreClientFactory();

if (options.toConfiguration().get(PARTITION_MARK_DONE_ACTION).contains("done-partition")) {
checkNotNull(
metastoreClientFactory,
"Cannot mark done partition for table without metastore.");
checkArgument(
options.partitionedTableInMetastore(),
"Table should enable %s",
METASTORE_PARTITIONED_TABLE.key());
}

return metastoreClientFactory.create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ org.apache.paimon.flink.action.MigrateDatabaseActionFactory
org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory
org.apache.paimon.flink.action.QueryServiceActionFactory
org.apache.paimon.flink.action.ExpirePartitionsActionFactory
org.apache.paimon.flink.action.PartitionMarkDoneActionFactory
org.apache.paimon.flink.action.MarkPartitionDoneActionFactory

### procedure factories
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
Expand Down Expand Up @@ -58,4 +58,4 @@ org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
org.apache.paimon.flink.procedure.RepairProcedure
org.apache.paimon.flink.procedure.ReplaceBranchProcedure
org.apache.paimon.flink.procedure.MergeBranchProcedure
org.apache.paimon.flink.procedure.PartitionMarkDoneProcedure
org.apache.paimon.flink.procedure.MarkPartitionDoneProcedure
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@

import static org.assertj.core.api.Assertions.assertThat;

/** IT cases for {@link PartitionMarkDoneAction}. */
public class PartitionMarkDoneActionITCase extends ActionITCaseBase {
/** IT cases for {@link MarkPartitionDoneAction}. */
public class MarkPartitionDoneActionITCase extends ActionITCaseBase {

private static final DataType[] FIELD_TYPES =
new DataType[] {DataTypes.INT(), DataTypes.INT(), DataTypes.STRING(), DataTypes.INT()};
Expand All @@ -54,7 +54,7 @@ public void testPartitionMarkDoneWithSinglePartitionKey(boolean hasPk) throws Ex
FileStoreTable table = prepareTable(hasPk);
if (ThreadLocalRandom.current().nextBoolean()) {
createAction(
PartitionMarkDoneAction.class,
MarkPartitionDoneAction.class,
"mark_partition_done",
"--warehouse",
warehouse,
Expand Down Expand Up @@ -84,7 +84,7 @@ public void testDropPartitionWithMultiplePartitionKey(boolean hasPk) throws Exce

if (ThreadLocalRandom.current().nextBoolean()) {
createAction(
PartitionMarkDoneAction.class,
MarkPartitionDoneAction.class,
"mark_partition_done",
"--warehouse",
warehouse,
Expand Down

0 comments on commit a4085ca

Please sign in to comment.