Skip to content

Commit

Permalink
Fix NPE
Browse files Browse the repository at this point in the history
  • Loading branch information
rayokota committed Aug 4, 2024
1 parent ffba0b3 commit 040d1c0
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

<groupId>io.kcache</groupId>
<artifactId>kwack</artifactId>
<version>0.10.0</version>
<version>0.11.0</version>
<packaging>jar</packaging>

<name>kwack</name>
Expand Down
16 changes: 10 additions & 6 deletions src/main/java/io/kcache/kwack/KwackEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -791,15 +791,19 @@ public void handleUpdate(Headers headers,
List<Object> params = new ArrayList<>();
String sql = null;
try {
Serde keySerde = keySerdes.getOrDefault(topic, Serde.KEY_DEFAULT);
if (keySerde.usesSchemaRegistry()) {
keySchemaId = schemaIdFor(key.get());
if (key != null) {
Serde keySerde = keySerdes.getOrDefault(topic, Serde.KEY_DEFAULT);
if (keySerde.usesSchemaRegistry()) {
keySchemaId = schemaIdFor(key.get());
}
}
keyObj = deserializeKey(topic, key != null ? key.get() : null);

Serde valueSerde = valueSerdes.getOrDefault(topic, Serde.VALUE_DEFAULT);
if (valueSerde.usesSchemaRegistry()) {
valueSchemaId = schemaIdFor(value.get());
if (value != null) {
Serde valueSerde = valueSerdes.getOrDefault(topic, Serde.VALUE_DEFAULT);
if (valueSerde.usesSchemaRegistry()) {
valueSchemaId = schemaIdFor(value.get());
}
}
Object originalKey = keyObj._1 != null ? keyObj._1.getOriginalMessage() : null;
valueObj = deserializeValue(
Expand Down
65 changes: 65 additions & 0 deletions src/test/java/io/kcache/kwack/AvroKeyTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,71 @@ public void testComplexKey() throws IOException {
assertEquals(Timestamp.from(Instant.ofEpochSecond(1234567890L)), m.get("timestamp"));
}

@Test
public void testNullKey() throws IOException {
IndexedRecord value = createSimpleRecord();
Properties producerProps = createProducerProps(MOCK_URL);
KafkaProducer producer = createProducer(producerProps);
produce(producer, getTopic(), new Object[] { null }, new Object[] { value });
producer.close();

engine.init();
Observable<Map<String, Object>> obs = engine.start();
List<Map<String, Object>> lm = Lists.newArrayList(obs.blockingIterable().iterator());
Map<String, Object> m = lm.get(0);
assertEquals("hi", m.get("f1"));
assertEquals(123, m.get("f2"));
}

@Test
public void testNullValue() throws IOException {
IndexedRecord key = createComplexRecord();
Properties producerProps = createProducerProps(MOCK_URL);
KafkaProducer producer = createProducer(producerProps);
produce(producer, getTopic(), new Object[] { key }, new Object[] { null });
producer.close();

engine.init();
Observable<Map<String, Object>> obs = engine.start();
List<Map<String, Object>> lm = Lists.newArrayList(obs.blockingIterable().iterator());
Map<String, Object> row = lm.get(0);
Map<String, Object> m = (Map<String, Object>) row.get("rowkey");
assertNull(m.get("null"));
assertEquals(true, m.get("boolean"));
assertEquals(1, m.get("int"));
assertEquals(2L, m.get("long"));
assertEquals(3.0f, m.get("float"));
assertEquals(4.0d, m.get("double"));
assertEquals(Base64.getEncoder().encodeToString(new byte[]{0, 1, 2}), m.get("bytes"));
assertEquals("testUser", m.get("string"));
assertEquals("ONE", m.get("enum"));
assertEquals(ImmutableList.of("hi", "there"), m.get("array"));
assertEquals(ImmutableMap.of("bye", "there"), m.get("map"));
assertEquals("zap", m.get("nullable_string"));
assertEquals(123, m.get("union"));
assertEquals(Base64.getEncoder().encodeToString(new byte[]{0, 0, 0, 0}), m.get("fixed"));
assertEquals(new BigDecimal("123.45"), m.get("decimal"));
assertEquals(UUID.fromString("d21998e8-8737-432e-a83c-13768dabd821"), m.get("uuid"));
assertEquals(LocalDate.of(2024, 1, 1), m.get("date"));
assertEquals(LocalTime.of(8, 30, 30), m.get("time"));
assertEquals(Timestamp.from(Instant.ofEpochSecond(1234567890L)), m.get("timestamp"));
}

@Test
public void testNullKeyAndValue() throws IOException {
Properties producerProps = createProducerProps(MOCK_URL);
KafkaProducer producer = createProducer(producerProps);
produce(producer, getTopic(), new Object[] { null }, new Object[] { null });
producer.close();

engine.init();
Observable<Map<String, Object>> obs = engine.start();
List<Map<String, Object>> lm = Lists.newArrayList(obs.blockingIterable().iterator());
Map<String, Object> row = lm.get(0);
Map<String, Object> m = (Map<String, Object>) row.get("rowkey");
assertNull(m);
}

@Override
protected String getTopic() {
return "test-avro";
Expand Down

0 comments on commit 040d1c0

Please sign in to comment.