Skip to content

Commit

Permalink
fix persistent store
Browse files Browse the repository at this point in the history
  • Loading branch information
birdayz committed Jun 25, 2021
1 parent 6587f4f commit 4f91544
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 4 deletions.
8 changes: 4 additions & 4 deletions src/main/java/de/nerden/kafka/streams/MoreTransformers.java
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,17 @@ public static <K, V> TransformerSupplier<K, V, KeyValue<K, List<V>>> Batch(
final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal =
new MaterializedInternal<>(materialized);

if (materializedInternal.storeName() != null) {
if (materializedInternal.storeSupplier() != null) {
return new BatchTransformerSupplier<>(
materializedInternal.storeName(),
(KeyValueBytesStoreSupplier) materializedInternal.storeSupplier(),
materializedInternal.keySerde(),
materializedInternal.valueSerde(),
materializedInternal.loggingEnabled(),
maxBatchDurationMillis,
maxBatchSizePerKey);
} else if (materializedInternal.storeSupplier() != null) {
} else if (materializedInternal.storeName() != null) {
return new BatchTransformerSupplier<>(
(KeyValueBytesStoreSupplier) materializedInternal.storeSupplier(),
materializedInternal.storeName(),
materializedInternal.keySerde(),
materializedInternal.valueSerde(),
materializedInternal.loggingEnabled(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public BatchTransformerSupplier(
this.changeLoggingEnabled = changeLoggingEnabled;
this.maxBatchDurationMillis = maxBatchDurationMillis;
this.maxBatchSizePerKey = maxBatchSizePerKey;
this.storeName = this.storeSupplier.name();
}

public BatchTransformerSupplier(
Expand Down

0 comments on commit 4f91544

Please sign in to comment.