From 962fead2f4cc5f1aa0c477c85db2e30f6ad7e3b7 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Fri, 27 Oct 2023 19:16:35 +0800 Subject: [PATCH] show paritition modify time --- .../paimon/table/system/FileMonitorTable.java | 42 +++++++++++ .../paimon/table/FileStoreTableTestBase.java | 74 ++++++++----------- 2 files changed, 73 insertions(+), 43 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java index a43c5da5af39b..6f461962d0a4a 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java @@ -30,6 +30,7 @@ import org.apache.paimon.io.DataFileMetaSerializer; import org.apache.paimon.predicate.Predicate; import org.apache.paimon.reader.RecordReader; +import org.apache.paimon.schema.TableSchema; import org.apache.paimon.table.DataTable; import org.apache.paimon.table.FileStoreTable; import org.apache.paimon.table.ReadonlyTable; @@ -45,6 +46,7 @@ import org.apache.paimon.types.IntType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.IteratorRecordReader; +import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -53,6 +55,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import static org.apache.paimon.CoreOptions.SCAN_BOUNDED_WATERMARK; import static org.apache.paimon.CoreOptions.STREAM_SCAN_MODE; @@ -221,6 +224,45 @@ public static FileChange toFileChange(InternalRow row) throws IOException { fileSerializer.deserializeList(row.getBinary(4))); } + public static Map paritionWithMaxModifyTime( + TableSchema schema, List results, List paritions) { + RowDataToObjectArrayConverter rowDataToObjectArrayConverter = + new RowDataToObjectArrayConverter(schema.logicalPartitionType()); + List partitionKeys = schema.partitionKeys(); + + List paritionIndexToMonitor = + paritions.stream() + .map(partitionKeys::indexOf) + .filter(i -> i != -1) + .collect(Collectors.toList()); + + Map paritionWithMaxModifyTime = new HashMap<>(); + for (FileMonitorTable.FileChange result : results) { + result.dataFiles().stream() + .map(DataFileMeta::creationTimeEpochMillis) + .max(Long::compare) + .ifPresent( + maxModifyTime -> { + Object[] convert = + rowDataToObjectArrayConverter.convert(result.partition()); + GenericRow genericRow = + new GenericRow(paritionIndexToMonitor.size()); + + for (int i = 0; i < paritionIndexToMonitor.size(); i++) { + genericRow.setField(i, convert[i]); + } + + paritionWithMaxModifyTime.compute( + genericRow, + (k, old) -> + old == null || maxModifyTime > old + ? maxModifyTime + : old); + }); + } + return paritionWithMaxModifyTime; + } + /** Pojo to record of file change. */ public static class FileChange { diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index a2368644438f0..be5abf83c3e9e 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -49,7 +49,6 @@ import org.apache.paimon.schema.SchemaManager; import org.apache.paimon.schema.SchemaUtils; import org.apache.paimon.schema.TableSchema; -import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.table.sink.InnerTableCommit; @@ -70,11 +69,13 @@ import org.apache.paimon.types.DataTypes; import org.apache.paimon.types.RowKind; import org.apache.paimon.types.RowType; -import org.apache.paimon.utils.RowDataToObjectArrayConverter; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; import org.apache.paimon.utils.TraceableFileIO; +import org.apache.paimon.shade.guava30.com.google.common.collect.Lists; + +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -95,7 +96,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; @@ -116,6 +116,7 @@ import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX; import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN; import static org.apache.paimon.CoreOptions.WRITE_ONLY; +import static org.apache.paimon.table.system.FileMonitorTable.paritionWithMaxModifyTime; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.junit.jupiter.params.provider.Arguments.arguments; @@ -1308,19 +1309,29 @@ public void testMultiParition() throws Exception { RowType rowType = RowType.of( new DataType[] { - DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), + DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), DataTypes.INT(), }, new String[] {"pt1", "pt2", "pt3", "v"}); FileStoreTable fileStoreTable = createFileStoreTableMultiPartition( (c) -> {}, rowType, Lists.newArrayList("pt1", "pt2", "pt3")); - + HashMap expected = new HashMap<>(); try (StreamTableWrite write = fileStoreTable.newWrite(commitUser); - InnerTableCommit commit = fileStoreTable.newCommit(commitUser)) { + InnerTableCommit commit = fileStoreTable.newCommit(commitUser)) { + write.write(GenericRow.of(1, 1, 1, 2)); commit.commit(0, write.prepareCommit(true, 0)); + write.write(GenericRow.of(1, 1, 1, 3)); - commit.commit(1, write.prepareCommit(true, 1)); + List commitMessages1 = write.prepareCommit(true, 1); + commit.commit(1, commitMessages1); + + write.write(GenericRow.of(1, 2, 1, 3)); + List commitMessages2 = write.prepareCommit(true, 2); + commit.commit(2, commitMessages2); + + expected.put(GenericRow.of(1, 1), getMaxCreateTime(commitMessages1)); + expected.put(GenericRow.of(1, 2), getMaxCreateTime(commitMessages2)); } FileMonitorTable fileMonitorTable = new FileMonitorTable(fileStoreTable); @@ -1341,47 +1352,24 @@ public void testMultiParition() throws Exception { } }); - TableSchema schema = fileStoreTable.schema(); - RowDataToObjectArrayConverter rowDataToObjectArrayConverter = - new RowDataToObjectArrayConverter(schema.logicalPartitionType()); List paritionToMonitor = Lists.newArrayList("pt1", "pt2"); - List partitionKeys = schema.partitionKeys(); - List paritionIndexToMonitor = Lists.newArrayList(); - for (String s : paritionToMonitor) { - if (partitionKeys.contains(s)) { - int index = partitionKeys.indexOf(s); - paritionIndexToMonitor.add(index); - } - } - - Map paritionWithMaxModifyTime = new HashMap<>(); - for (FileMonitorTable.FileChange result : results) { - Optional maxModifyTimeOpt = - result.dataFiles().stream() - .map(DataFileMeta::creationTimeEpochMillis) - .max(Long::compare); - - if (maxModifyTimeOpt.isPresent()) { - Long maxModifyTime = maxModifyTimeOpt.get(); - Object[] convert = rowDataToObjectArrayConverter.convert(result.partition()); - GenericRow genericRow = new GenericRow(paritionIndexToMonitor.size()); - - for (int i = 0; i < paritionIndexToMonitor.size(); i++) { - genericRow.setField(i, convert[i]); - } + Map actual = + paritionWithMaxModifyTime(fileStoreTable.schema(), results, paritionToMonitor); + Assertions.assertThat(actual).isEqualTo(expected); + } - if (paritionWithMaxModifyTime.containsKey(genericRow)) { - Long old = paritionWithMaxModifyTime.get(genericRow); - if (maxModifyTime > old) { - paritionWithMaxModifyTime.put(genericRow, maxModifyTime); - } - }else { - paritionWithMaxModifyTime.put(genericRow,maxModifyTime); + private static long getMaxCreateTime(List commitMessages) { + long max = 0; + for (CommitMessage message : commitMessages) { + CommitMessageImpl commitMessage = (CommitMessageImpl) message; + List dataFileMetas = commitMessage.newFilesIncrement().newFiles(); + if (!dataFileMetas.isEmpty()) { + for (DataFileMeta dataFileMeta : dataFileMetas) { + max = Math.max(dataFileMeta.creationTimeEpochMillis(), max); } } } - - System.out.println(paritionWithMaxModifyTime); + return max; } }