diff --git a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java index d41040e05bb7..f303e8597870 100644 --- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java +++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java @@ -49,7 +49,7 @@ public abstract class SingleFileWriter implements FileWriter { protected final Path path; private final Function converter; - private final FormatWriter writer; + private FormatWriter writer; private PositionOutputStream out; private long recordCount; @@ -144,7 +144,14 @@ public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws I @Override public void abort() { - IOUtils.closeQuietly(out); + if (writer != null) { + IOUtils.closeQuietly(writer); + writer = null; + } + if (out != null) { + IOUtils.closeQuietly(out); + out = null; + } fileIO.deleteQuietly(path); } @@ -167,9 +174,15 @@ public void close() throws IOException { } try { - writer.close(); - out.flush(); - out.close(); + if (writer != null) { + writer.close(); + writer = null; + } + if (out != null) { + out.flush(); + out.close(); + out = null; + } } catch (IOException e) { LOG.warn("Exception occurs when closing file {}. Cleaning up.", path, e); abort(); diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java index cf97f7b67d4d..2e15697511dd 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ContinuousFileStoreITCase.java @@ -28,6 +28,7 @@ import org.apache.flink.table.api.StatementSet; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.planner.factories.TestValuesTableFactory; import org.apache.flink.types.Row; import org.apache.flink.types.RowKind; import org.apache.flink.util.CloseableIterator; @@ -43,6 +44,7 @@ import java.util.Set; import java.util.stream.Collectors; +import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -629,4 +631,29 @@ public void testScanFromChangelog(String changelogProducer) throws Exception { assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("10", "11", "12")); iterator.close(); } + + @Test + public void testAvroRetractNotNullField() { + List input = + Arrays.asList( + Row.ofKind(RowKind.INSERT, 1, "A"), Row.ofKind(RowKind.DELETE, 1, "A")); + String id = TestValuesTableFactory.registerData(input); + sEnv.executeSql( + String.format( + "CREATE TEMPORARY TABLE source (pk INT PRIMARY KEY NOT ENFORCED, a STRING) " + + "WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', " + + "'changelog-mode' = 'I,D,UA,UB')", + id)); + + sql( + "CREATE TABLE avro_sink (pk INT PRIMARY KEY NOT ENFORCED, a STRING NOT NULL) " + + " WITH ('file.format' = 'avro', 'merge-engine' = 'aggregation')"); + + assertThatThrownBy( + () -> sEnv.executeSql("INSERT INTO avro_sink select * from source").await()) + .satisfies( + anyCauseMatches( + RuntimeException.class, + "Caught NullPointerException, the possible reason is you have set following options together")); + } } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java index c2bd81d0038f..d30245162572 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/avro/AvroRowDatumWriter.java @@ -56,6 +56,15 @@ public void write(InternalRow datum, Encoder out) throws IOException { // top Row is a UNION type out.writeIndex(1); } - this.writer.writeRow(datum, out); + try { + this.writer.writeRow(datum, out); + } catch (NullPointerException npe) { + throw new RuntimeException( + "Caught NullPointerException, the possible reason is you have set following options together:\n" + + " 1. file.format = avro;\n" + + " 2. merge-function = aggregation/partial-update;\n" + + " 3. some fields are not null.", + npe); + } } }