Skip to content

Commit

Permalink
[avro] Writer throw more clear exception when NPE with aggregation me…
Browse files Browse the repository at this point in the history
…rge function (#4547)
  • Loading branch information
yuzelin authored Nov 19, 2024
1 parent 848cb59 commit d9d8d83
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public abstract class SingleFileWriter<T, R> implements FileWriter<T, R> {
protected final Path path;
private final Function<T, InternalRow> converter;

private final FormatWriter writer;
private FormatWriter writer;
private PositionOutputStream out;

private long recordCount;
Expand Down Expand Up @@ -144,7 +144,14 @@ public boolean reachTargetSize(boolean suggestedCheck, long targetSize) throws I

@Override
public void abort() {
IOUtils.closeQuietly(out);
if (writer != null) {
IOUtils.closeQuietly(writer);
writer = null;
}
if (out != null) {
IOUtils.closeQuietly(out);
out = null;
}
fileIO.deleteQuietly(path);
}

Expand All @@ -167,9 +174,15 @@ public void close() throws IOException {
}

try {
writer.close();
out.flush();
out.close();
if (writer != null) {
writer.close();
writer = null;
}
if (out != null) {
out.flush();
out.close();
out = null;
}
} catch (IOException e) {
LOG.warn("Exception occurs when closing file {}. Cleaning up.", path, e);
abort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import org.apache.flink.table.api.StatementSet;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
Expand All @@ -43,6 +44,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -629,4 +631,29 @@ public void testScanFromChangelog(String changelogProducer) throws Exception {
assertThat(iterator.collect(1)).containsExactlyInAnyOrder(Row.of("10", "11", "12"));
iterator.close();
}

@Test
public void testAvroRetractNotNullField() {
List<Row> input =
Arrays.asList(
Row.ofKind(RowKind.INSERT, 1, "A"), Row.ofKind(RowKind.DELETE, 1, "A"));
String id = TestValuesTableFactory.registerData(input);
sEnv.executeSql(
String.format(
"CREATE TEMPORARY TABLE source (pk INT PRIMARY KEY NOT ENFORCED, a STRING) "
+ "WITH ('connector'='values', 'bounded'='true', 'data-id'='%s', "
+ "'changelog-mode' = 'I,D,UA,UB')",
id));

sql(
"CREATE TABLE avro_sink (pk INT PRIMARY KEY NOT ENFORCED, a STRING NOT NULL) "
+ " WITH ('file.format' = 'avro', 'merge-engine' = 'aggregation')");

assertThatThrownBy(
() -> sEnv.executeSql("INSERT INTO avro_sink select * from source").await())
.satisfies(
anyCauseMatches(
RuntimeException.class,
"Caught NullPointerException, the possible reason is you have set following options together"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ public void write(InternalRow datum, Encoder out) throws IOException {
// top Row is a UNION type
out.writeIndex(1);
}
this.writer.writeRow(datum, out);
try {
this.writer.writeRow(datum, out);
} catch (NullPointerException npe) {
throw new RuntimeException(
"Caught NullPointerException, the possible reason is you have set following options together:\n"
+ " 1. file.format = avro;\n"
+ " 2. merge-function = aggregation/partial-update;\n"
+ " 3. some fields are not null.",
npe);
}
}
}

0 comments on commit d9d8d83

Please sign in to comment.