Skip to content

Commit

Permalink
Remove key-value for null value
Browse files Browse the repository at this point in the history
  • Loading branch information
BewareMyPower committed Jul 24, 2024
1 parent ffcc578 commit 4fdab3a
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ public TransactionBufferSnapshot readLatest(String topic) throws Exception {
while (wait(reader.hasMoreEventsAsync(), "has more events")) {
final var msg = wait(reader.readNextAsync(), "read message");
if (msg.getKey() != null) {
snapshots.put(msg.getKey(), msg.getValue());
if (msg.getValue() != null) {
snapshots.put(msg.getKey(), msg.getValue());
} else {
snapshots.remove(msg.getKey());
}
}
}
return snapshots.get(topic);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
Expand Down Expand Up @@ -684,25 +683,6 @@ private void checkCloseTopic(PulsarClient pulsarClient,
txn.commit().get();
}


@Test
public void testTransactionBufferNoSnapshotCloseReader() throws Exception{
String topic = NAMESPACE1 + "/test";
@Cleanup
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).producerName("testTxnTimeOut_producer")
.topic(topic).sendTimeout(0, TimeUnit.SECONDS).enableBatching(false).create();

admin.topics().unload(topic);

// unload success, all readers have been closed except for the compaction sub
producer.send("test");
TopicStats stats = admin.topics().getStats(NAMESPACE1 + "/" + TRANSACTION_BUFFER_SNAPSHOT);

// except for the compaction sub
assertEquals(stats.getSubscriptions().size(), 1);
assertTrue(stats.getSubscriptions().keySet().contains("__compaction"));
}

@Test
public void testTransactionBufferIndexSystemTopic() throws Exception {
SystemTopicTxnBufferSnapshotService<TransactionBufferSnapshotIndexes> transactionBufferSnapshotIndexService =
Expand Down

0 comments on commit 4fdab3a

Please sign in to comment.