From 1d76a5f56bee293293bad1fa6cd2837d998df4f9 Mon Sep 17 00:00:00 2001 From: wgcn <1026688210@qq.com> Date: Tue, 31 Oct 2023 18:23:00 +0800 Subject: [PATCH] add validation --- .../paimon/table/system/FileMonitorTable.java | 21 +++++++++++++++---- 1 file changed, 17 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java index 3ec8ba3ca32d..cc9080210318 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/system/FileMonitorTable.java @@ -239,18 +239,31 @@ public static ParitionChange toPartitionChange(InternalRow row) throws IOExcepti public static Map paritionWithLatestModifyTime( TableSchema schema, List results, - List paritions) { + List paritionsToMonitor) { + Map paritionWithLatestModifyTime = new HashMap<>(); + + List partitionKeys = schema.partitionKeys(); + if (partitionKeys.isEmpty()) { + return paritionWithLatestModifyTime; + } + + paritionsToMonitor.stream() + .filter(part -> !partitionKeys.contains(part)) + .findAny() + .ifPresent( + part -> { + throw new RuntimeException(String.format("%s cannot be found", part)); + }); + RowDataToObjectArrayConverter rowDataToObjectArrayConverter = new RowDataToObjectArrayConverter(schema.logicalPartitionType()); - List partitionKeys = schema.partitionKeys(); List paritionIndexToMonitor = - paritions.stream() + paritionsToMonitor.stream() .map(partitionKeys::indexOf) .filter(i -> i != -1) .collect(Collectors.toList()); - Map paritionWithLatestModifyTime = new HashMap<>(); for (FileMonitorTable.ParitionChange paritionChange : results) { long latestModifyTime = paritionChange.latestModifyTime();