Skip to content

Commit

Permalink
add partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
Zouxxyy committed Nov 7, 2024
1 parent 6fb8dee commit 6e9c40a
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@
import org.apache.paimon.shade.guava30.com.google.common.cache.CacheBuilder;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

/** A {@link CommitCallback} to add newly created partitions to metastore. */
public class AddPartitionCommitCallback implements CommitCallback {
Expand All @@ -52,19 +55,21 @@ public AddPartitionCommitCallback(MetastoreClient client) {

@Override
public void call(List<ManifestEntry> committedEntries, Snapshot snapshot) {
committedEntries.stream()
.filter(e -> FileKind.ADD.equals(e.kind()))
.map(ManifestEntry::partition)
.distinct()
.forEach(this::addPartition);
Set<BinaryRow> partitions =
committedEntries.stream()
.filter(e -> FileKind.ADD.equals(e.kind()))
.map(ManifestEntry::partition)
.collect(Collectors.toSet());
addPartitions(partitions);
}

@Override
public void retry(ManifestCommittable committable) {
committable.fileCommittables().stream()
.map(CommitMessage::partition)
.distinct()
.forEach(this::addPartition);
Set<BinaryRow> partitions =
committable.fileCommittables().stream()
.map(CommitMessage::partition)
.collect(Collectors.toSet());
addPartitions(partitions);
}

private void addPartition(BinaryRow partition) {
Expand All @@ -81,6 +86,19 @@ private void addPartition(BinaryRow partition) {
}
}

private void addPartitions(Set<BinaryRow> partitions) {
try {
List<BinaryRow> filteredPartitions = new ArrayList<>();
for (BinaryRow partition : partitions) {
if (!cache.get(partition, () -> false)) filteredPartitions.add(partition);
}
client.addPartitions(filteredPartitions);
filteredPartitions.forEach(partition -> cache.put(partition, true));
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public void close() throws Exception {
client.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.io.Serializable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
Expand All @@ -32,8 +33,21 @@ public interface MetastoreClient extends AutoCloseable {

void addPartition(BinaryRow partition) throws Exception;

default void addPartitions(List<BinaryRow> partitions) throws Exception {
for (BinaryRow partition : partitions) {
addPartition(partition);
}
}

void addPartition(LinkedHashMap<String, String> partitionSpec) throws Exception;

default void addPartitionsSpec(List<LinkedHashMap<String, String>> partitionSpecsList)
throws Exception {
for (LinkedHashMap<String, String> partitionSpecs : partitionSpecsList) {
addPartition(partitionSpecs);
}
}

void deletePartition(LinkedHashMap<String, String> partitionSpec) throws Exception;

void markDone(LinkedHashMap<String, String> partitionSpec) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/** {@link MetastoreClient} for Hive tables. */
public class HiveMetastoreClient implements MetastoreClient {
Expand Down Expand Up @@ -81,6 +82,14 @@ public void addPartition(BinaryRow partition) throws Exception {
addPartition(partitionComputer.generatePartValues(partition));
}

@Override
public void addPartitions(List<BinaryRow> partitions) throws Exception {
addPartitionsSpec(
partitions.stream()
.map(partitionComputer::generatePartValues)
.collect(Collectors.toList()));
}

@Override
public void addPartition(LinkedHashMap<String, String> partitionSpec) throws Exception {
List<String> partitionValues = new ArrayList<>(partitionSpec.values());
Expand Down Expand Up @@ -113,6 +122,33 @@ public void addPartition(LinkedHashMap<String, String> partitionSpec) throws Exc
}
}

@Override
public void addPartitionsSpec(List<LinkedHashMap<String, String>> partitionSpecsList)
throws Exception {
String databaseName = identifier.getDatabaseName();
String tableName = identifier.getTableName();
ArrayList<Partition> hivePartitions = new ArrayList<>();
int currentTime = (int) (System.currentTimeMillis() / 1000);
for (LinkedHashMap<String, String> partitionSpec : partitionSpecsList) {
List<String> partitionValues = new ArrayList<>(partitionSpec.values());
StorageDescriptor newSd = new StorageDescriptor(sd);
newSd.setLocation(
sd.getLocation()
+ "/"
+ PartitionPathUtils.generatePartitionPath(partitionSpec));
Partition hivePartition = new Partition();
hivePartition.setDbName(databaseName);
hivePartition.setTableName(tableName);
hivePartition.setValues(partitionValues);
hivePartition.setSd(newSd);
hivePartition.setCreateTime(currentTime);
hivePartition.setLastAccessTime(currentTime);
hivePartitions.add(hivePartition);
}

clients.execute(client -> client.add_partitions(hivePartitions, true, false));
}

@Override
public void alterPartition(
LinkedHashMap<String, String> partitionSpec,
Expand Down

0 comments on commit 6e9c40a

Please sign in to comment.