From 1bb00c01a8e3d5693ebfe05ce03b66c283ac21f4 Mon Sep 17 00:00:00 2001 From: Jingsong Date: Thu, 15 Aug 2024 12:00:57 +0800 Subject: [PATCH] [core] Add retry to Consumer.fromPath --- .../org/apache/paimon/consumer/Consumer.java | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java index ffa63ece3594..bad0e2509b77 100644 --- a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java +++ b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java @@ -26,6 +26,7 @@ import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonGetter; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties; import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.databind.exc.MismatchedInputException; import java.io.IOException; import java.io.UncheckedIOException; @@ -58,10 +59,24 @@ public static Consumer fromJson(String json) { } public static Optional fromPath(FileIO fileIO, Path path) { - try { - return fileIO.readOverwrittenFileUtf8(path).map(Consumer::fromJson); - } catch (IOException e) { - throw new UncheckedIOException(e); + int retryNumber = 0; + MismatchedInputException exception = null; + while (retryNumber++ < 5) { + try { + return fileIO.readOverwrittenFileUtf8(path).map(Consumer::fromJson); + } catch (MismatchedInputException e) { + // retry + exception = e; + try { + Thread.sleep(100); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException(ie); + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } } + throw new UncheckedIOException(exception); } }