Skip to content

Commit

Permalink
show paritition modify time
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Oct 27, 2023
1 parent c87c4f4 commit 962fead
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.paimon.io.DataFileMetaSerializer;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.DataTable;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.ReadonlyTable;
Expand All @@ -45,6 +46,7 @@
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IteratorRecordReader;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;

Expand All @@ -53,6 +55,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK;
import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE;
Expand Down Expand Up @@ -221,6 +224,45 @@ public static FileChange toFileChange(InternalRow row) throws IOException {
fileSerializer.deserializeList(row.getBinary(4)));
}

public static Map<GenericRow, Long> paritionWithMaxModifyTime(
TableSchema schema, List<FileMonitorTable.FileChange> results, List<String> paritions) {
RowDataToObjectArrayConverter rowDataToObjectArrayConverter =
new RowDataToObjectArrayConverter(schema.logicalPartitionType());
List<String> partitionKeys = schema.partitionKeys();

List<Integer> paritionIndexToMonitor =
paritions.stream()
.map(partitionKeys::indexOf)
.filter(i -> i != -1)
.collect(Collectors.toList());

Map<GenericRow, Long> paritionWithMaxModifyTime = new HashMap<>();
for (FileMonitorTable.FileChange result : results) {
result.dataFiles().stream()
.map(DataFileMeta::creationTimeEpochMillis)
.max(Long::compare)
.ifPresent(
maxModifyTime -> {
Object[] convert =
rowDataToObjectArrayConverter.convert(result.partition());
GenericRow genericRow =
new GenericRow(paritionIndexToMonitor.size());

for (int i = 0; i < paritionIndexToMonitor.size(); i++) {
genericRow.setField(i, convert[i]);
}

paritionWithMaxModifyTime.compute(
genericRow,
(k, old) ->
old == null || maxModifyTime > old
? maxModifyTime
: old);
});
}
return paritionWithMaxModifyTime;
}

/** Pojo to record of file change. */
public static class FileChange {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.table.sink.InnerTableCommit;
Expand All @@ -70,11 +69,13 @@
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.RowDataToObjectArrayConverter;
import org.apache.paimon.utils.SnapshotManager;
import org.apache.paimon.utils.TagManager;
import org.apache.paimon.utils.TraceableFileIO;

import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;

import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -95,7 +96,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand All @@ -116,6 +116,7 @@
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.WRITE_ONLY;
import static org.apache.paimon.table.system.FileMonitorTable.paritionWithMaxModifyTime;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.junit.jupiter.params.provider.Arguments.arguments;
Expand Down Expand Up @@ -1308,19 +1309,29 @@ public void testMultiParition() throws Exception {
RowType rowType =
RowType.of(
new DataType[] {
DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(),
DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(),
},
new String[] {"pt1", "pt2", "pt3", "v"});
FileStoreTable fileStoreTable =
createFileStoreTableMultiPartition(
(c) -> {}, rowType, Lists.newArrayList("pt1", "pt2", "pt3"));

HashMap<GenericRow, Long> expected = new HashMap<>();
try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
InnerTableCommit commit = fileStoreTable.newCommit(commitUser)) {
InnerTableCommit commit = fileStoreTable.newCommit(commitUser)) {

write.write(GenericRow.of(1, 1, 1, 2));
commit.commit(0, write.prepareCommit(true, 0));

write.write(GenericRow.of(1, 1, 1, 3));
commit.commit(1, write.prepareCommit(true, 1));
List<CommitMessage> commitMessages1 = write.prepareCommit(true, 1);
commit.commit(1, commitMessages1);

write.write(GenericRow.of(1, 2, 1, 3));
List<CommitMessage> commitMessages2 = write.prepareCommit(true, 2);
commit.commit(2, commitMessages2);

expected.put(GenericRow.of(1, 1), getMaxCreateTime(commitMessages1));
expected.put(GenericRow.of(1, 2), getMaxCreateTime(commitMessages2));
}

FileMonitorTable fileMonitorTable = new FileMonitorTable(fileStoreTable);
Expand All @@ -1341,47 +1352,24 @@ public void testMultiParition() throws Exception {
}
});

TableSchema schema = fileStoreTable.schema();
RowDataToObjectArrayConverter rowDataToObjectArrayConverter =
new RowDataToObjectArrayConverter(schema.logicalPartitionType());
List<String> paritionToMonitor = Lists.newArrayList("pt1", "pt2");
List<String> partitionKeys = schema.partitionKeys();

List<Integer> paritionIndexToMonitor = Lists.newArrayList();
for (String s : paritionToMonitor) {
if (partitionKeys.contains(s)) {
int index = partitionKeys.indexOf(s);
paritionIndexToMonitor.add(index);
}
}

Map<GenericRow, Long> paritionWithMaxModifyTime = new HashMap<>();
for (FileMonitorTable.FileChange result : results) {
Optional<Long> maxModifyTimeOpt =
result.dataFiles().stream()
.map(DataFileMeta::creationTimeEpochMillis)
.max(Long::compare);

if (maxModifyTimeOpt.isPresent()) {
Long maxModifyTime = maxModifyTimeOpt.get();
Object[] convert = rowDataToObjectArrayConverter.convert(result.partition());
GenericRow genericRow = new GenericRow(paritionIndexToMonitor.size());

for (int i = 0; i < paritionIndexToMonitor.size(); i++) {
genericRow.setField(i, convert[i]);
}
Map<GenericRow, Long> actual =
paritionWithMaxModifyTime(fileStoreTable.schema(), results, paritionToMonitor);
Assertions.assertThat(actual).isEqualTo(expected);
}

if (paritionWithMaxModifyTime.containsKey(genericRow)) {
Long old = paritionWithMaxModifyTime.get(genericRow);
if (maxModifyTime > old) {
paritionWithMaxModifyTime.put(genericRow, maxModifyTime);
}
}else {
paritionWithMaxModifyTime.put(genericRow,maxModifyTime);
private static long getMaxCreateTime(List<CommitMessage> commitMessages) {
long max = 0;
for (CommitMessage message : commitMessages) {
CommitMessageImpl commitMessage = (CommitMessageImpl) message;
List<DataFileMeta> dataFileMetas = commitMessage.newFilesIncrement().newFiles();
if (!dataFileMetas.isEmpty()) {
for (DataFileMeta dataFileMeta : dataFileMetas) {
max = Math.max(dataFileMeta.creationTimeEpochMillis(), max);
}
}
}

System.out.println(paritionWithMaxModifyTime);
return max;
}
}

0 comments on commit 962fead

Please sign in to comment.