Skip to content

Commit

Permalink
add validation
Browse files Browse the repository at this point in the history
  • Loading branch information
wg1026688210 committed Oct 31, 2023
1 parent 2ffe382 commit 1d76a5f
Showing 1 changed file with 17 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,31 @@ public static ParitionChange toPartitionChange(InternalRow row) throws IOExcepti
public static Map<GenericRow, Long> paritionWithLatestModifyTime(
TableSchema schema,
List<FileMonitorTable.ParitionChange> results,
List<String> paritions) {
List<String> paritionsToMonitor) {
Map<GenericRow, Long> paritionWithLatestModifyTime = new HashMap<>();

List<String> partitionKeys = schema.partitionKeys();
if (partitionKeys.isEmpty()) {
return paritionWithLatestModifyTime;
}

paritionsToMonitor.stream()
.filter(part -> !partitionKeys.contains(part))
.findAny()
.ifPresent(
part -> {
throw new RuntimeException(String.format("%s cannot be found", part));
});

RowDataToObjectArrayConverter rowDataToObjectArrayConverter =
new RowDataToObjectArrayConverter(schema.logicalPartitionType());
List<String> partitionKeys = schema.partitionKeys();

List<Integer> paritionIndexToMonitor =
paritions.stream()
paritionsToMonitor.stream()
.map(partitionKeys::indexOf)
.filter(i -> i != -1)
.collect(Collectors.toList());

Map<GenericRow, Long> paritionWithLatestModifyTime = new HashMap<>();
for (FileMonitorTable.ParitionChange paritionChange : results) {
long latestModifyTime = paritionChange.latestModifyTime();

Expand Down

0 comments on commit 1d76a5f

Please sign in to comment.