Skip to content

Commit

Permalink
[flink] Introduce PartitionMarkDone action and procedure (#3584)
Browse files Browse the repository at this point in the history
  • Loading branch information
wwj6591812 authored Jun 26, 2024
1 parent 83444c2 commit c1551a4
Show file tree
Hide file tree
Showing 8 changed files with 460 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.paimon.utils;

import org.apache.paimon.fs.Path;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;

import java.util.ArrayList;
import java.util.BitSet;
Expand Down Expand Up @@ -87,6 +89,22 @@ public static String generatePartitionPath(LinkedHashMap<String, String> partiti
return suffixBuf.toString();
}

public static String generatePartitionPath(
Map<String, String> partitionSpec, RowType partitionType) {
LinkedHashMap<String, String> linkedPartitionSpec = new LinkedHashMap<>();
List<DataField> fields = partitionType.getFields();

for (DataField dataField : fields) {
String partitionColumnName = dataField.name();
String partitionColumnValue = partitionSpec.get(partitionColumnName);
if (partitionColumnValue != null) {
linkedPartitionSpec.put(partitionColumnName, partitionColumnValue);
}
}

return generatePartitionPath(linkedPartitionSpec);
}

/**
* Generate all hierarchical paths from partition spec.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.utils.PartitionPathUtils;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.closeActions;
import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.getPartitionMarkDoneActions;
import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone;

/** Table partition mark done action for Flink. */
public class PartitionMarkDoneAction extends TableActionBase {

private final FileStoreTable fileStoreTable;
private final List<Map<String, String>> partitions;
private final CoreOptions coreOptions;
private final Options options;

public PartitionMarkDoneAction(
String warehouse,
String databaseName,
String tableName,
List<Map<String, String>> partitions,
Map<String, String> catalogConfig) {
super(warehouse, databaseName, tableName, catalogConfig);
if (!(table instanceof FileStoreTable)) {
throw new UnsupportedOperationException(
String.format(
"Only FileStoreTable supports mark_partition_done action. The table type is '%s'.",
table.getClass().getName()));
}

this.fileStoreTable = (FileStoreTable) table;
this.partitions = partitions;
this.coreOptions = fileStoreTable.coreOptions();
this.options = coreOptions.toConfiguration();
}

@Override
public void run() throws Exception {
List<org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction> actions =
getPartitionMarkDoneActions(fileStoreTable, coreOptions, options);

List<String> partitionPaths = getPartitionPaths(fileStoreTable, partitions);

markDone(partitionPaths, actions);

closeActions(actions);
}

public static List<String> getPartitionPaths(
FileStoreTable fileStoreTable, List<Map<String, String>> partitions) {
return partitions.stream()
.map(
partition ->
PartitionPathUtils.generatePartitionPath(
partition, fileStoreTable.store().partitionType()))
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action;

import org.apache.flink.api.java.tuple.Tuple3;

import java.util.List;
import java.util.Map;
import java.util.Optional;

/** Factory to create {@link PartitionMarkDoneAction}. */
public class PartitionMarkDoneActionFactory implements ActionFactory {

public static final String IDENTIFIER = "mark_partition_done";

@Override
public String identifier() {
return IDENTIFIER;
}

@Override
public Optional<Action> create(MultipleParameterToolAdapter params) {
Tuple3<String, String, String> tablePath = getTablePath(params);

checkRequiredArgument(params, PARTITION);
List<Map<String, String>> partitions = getPartitions(params);

Map<String, String> catalogConfig = optionalConfigMap(params, CATALOG_CONF);

return Optional.of(
new PartitionMarkDoneAction(
tablePath.f0, tablePath.f1, tablePath.f2, partitions, catalogConfig));
}

@Override
public void printHelp() {
System.out.println(
"Action \"mark_partition_done\" mark done of specified partitions for a table.");
System.out.println();

System.out.println("Syntax:");
System.out.println(
" mark_partition_done --warehouse <warehouse_path> --database <database_name> "
+ "--table <table_name> --partition <partition_name> [--partition <partition_name> ...]");
System.out.println(
" mark_partition_done --path <table_path> --partition <partition_name> [--partition <partition_name> ...]");
System.out.println();

System.out.println("Partition name syntax:");
System.out.println(" key1=value1,key2=value2,...");
System.out.println();

System.out.println("Examples:");
System.out.println(
" mark_partition_done --warehouse hdfs:///path/to/warehouse --database test_db --table test_table --partition dt=20221126,hh=08");
System.out.println(
" mark_partition_done --path hdfs:///path/to/warehouse/test_db.db/test_table --partition dt=20221126,hh=08 --partition dt=20221127,hh=09");
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.procedure;

import org.apache.paimon.CoreOptions;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;

import org.apache.flink.table.procedure.ProcedureContext;

import java.io.IOException;
import java.util.List;

import static org.apache.paimon.flink.action.PartitionMarkDoneAction.getPartitionPaths;
import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.closeActions;
import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.getPartitionMarkDoneActions;
import static org.apache.paimon.flink.sink.partition.PartitionMarkDone.markDone;
import static org.apache.paimon.utils.ParameterUtils.getPartitions;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* Partition mark done procedure. Usage:
*
* <pre><code>
* CALL sys.mark_partition_done('tableId', 'partition1', 'partition2', ...)
* </code></pre>
*/
public class PartitionMarkDoneProcedure extends ProcedureBase {

public static final String IDENTIFIER = "mark_partition_done";

public String[] call(
ProcedureContext procedureContext, String tableId, String... partitionStrings)
throws Catalog.TableNotExistException, IOException {
checkArgument(
partitionStrings.length > 0,
"mark_partition_done procedure must specify partitions.");

Identifier identifier = Identifier.fromString(tableId);
Table table = catalog.getTable(identifier);
checkArgument(
table instanceof FileStoreTable,
"Only FileStoreTable supports mark_partition_done procedure. The table type is '%s'.",
table.getClass().getName());

FileStoreTable fileStoreTable = (FileStoreTable) table;
CoreOptions coreOptions = fileStoreTable.coreOptions();
Options options = coreOptions.toConfiguration();

List<org.apache.paimon.flink.sink.partition.PartitionMarkDoneAction> actions =
getPartitionMarkDoneActions(fileStoreTable, coreOptions, options);

List<String> partitionPaths =
getPartitionPaths(fileStoreTable, getPartitions(partitionStrings));

markDone(partitionPaths, actions);

closeActions(actions);

return new String[] {"Success"};
}

@Override
public String identifier() {
return IDENTIFIER;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.utils.InternalRowPartitionComputer;
import org.apache.paimon.utils.PartitionPathUtils;

import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.table.utils.PartitionPathUtils;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -79,26 +79,31 @@ public static PartitionMarkDone create(
PartitionMarkDoneTrigger.create(coreOptions, isRestored, stateStore);

List<PartitionMarkDoneAction> actions =
Arrays.asList(options.get(PARTITION_MARK_DONE_ACTION).split(",")).stream()
.map(
action -> {
switch (action) {
case "success-file":
return new SuccessFileMarkDoneAction(
table.fileIO(), table.location());
case "done-partition":
return new AddDonePartitionAction(
checkMetastoreAndCreateMetastoreClient(
table, coreOptions, options));
default:
throw new UnsupportedOperationException(action);
}
})
.collect(Collectors.toList());
getPartitionMarkDoneActions(table, coreOptions, options);

return new PartitionMarkDone(partitionComputer, trigger, actions);
}

public static List<PartitionMarkDoneAction> getPartitionMarkDoneActions(
FileStoreTable fileStoreTable, CoreOptions coreOptions, Options options) {
return Arrays.asList(options.get(PARTITION_MARK_DONE_ACTION).split(",")).stream()
.map(
action -> {
switch (action) {
case "success-file":
return new SuccessFileMarkDoneAction(
fileStoreTable.fileIO(), fileStoreTable.location());
case "done-partition":
return new AddDonePartitionAction(
createMetastoreClient(
fileStoreTable, coreOptions, options));
default:
throw new UnsupportedOperationException(action);
}
})
.collect(Collectors.toList());
}

private static boolean disablePartitionMarkDone(
boolean isStreaming, FileStoreTable table, Options options) {
boolean partitionMarkDoneWhenEndInput = options.get(PARTITION_MARK_DONE_WHEN_END_INPUT);
Expand All @@ -119,7 +124,7 @@ private static boolean disablePartitionMarkDone(
return false;
}

private static MetastoreClient checkMetastoreAndCreateMetastoreClient(
public static MetastoreClient createMetastoreClient(
FileStoreTable table, CoreOptions coreOptions, Options options) {
MetastoreClient.Factory metastoreClientFactory =
table.catalogEnvironment().metastoreClientFactory();
Expand Down Expand Up @@ -163,7 +168,11 @@ public void notifyCommittable(List<ManifestCommittable> committables) {
.map(PartitionPathUtils::generatePartitionPath)
.forEach(trigger::notifyPartition);

for (String partition : trigger.donePartitions(endInput)) {
markDone(trigger.donePartitions(endInput), actions);
}

public static void markDone(List<String> partitions, List<PartitionMarkDoneAction> actions) {
for (String partition : partitions) {
try {
for (PartitionMarkDoneAction action : actions) {
action.markDone(partition);
Expand All @@ -174,14 +183,18 @@ public void notifyCommittable(List<ManifestCommittable> committables) {
}
}

public static void closeActions(List<PartitionMarkDoneAction> actions) throws IOException {
for (PartitionMarkDoneAction action : actions) {
action.close();
}
}

public void snapshotState() throws Exception {
trigger.snapshotState();
}

@Override
public void close() throws IOException {
for (PartitionMarkDoneAction action : actions) {
action.close();
}
closeActions(actions);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ org.apache.paimon.flink.action.MigrateDatabaseActionFactory
org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory
org.apache.paimon.flink.action.QueryServiceActionFactory
org.apache.paimon.flink.action.ExpirePartitionsActionFactory
org.apache.paimon.flink.action.PartitionMarkDoneActionFactory

### procedure factories
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
Expand Down Expand Up @@ -57,3 +58,4 @@ org.apache.paimon.flink.procedure.privilege.RevokePrivilegeFromUserProcedure
org.apache.paimon.flink.procedure.RepairProcedure
org.apache.paimon.flink.procedure.ReplaceBranchProcedure
org.apache.paimon.flink.procedure.MergeBranchProcedure
org.apache.paimon.flink.procedure.PartitionMarkDoneProcedure
Loading

0 comments on commit c1551a4

Please sign in to comment.