Skip to content

Commit

Permalink
Added new pravega configuration attribute customPayload as boolean va…
Browse files Browse the repository at this point in the history
…lue to enabled or disable customizing payload for pravega

Signed-off-by: dada-dell-emc <[email protected]>
  • Loading branch information
dada-dell-emc committed Jul 26, 2023
1 parent 198e7b6 commit fff84c5
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ public static byte[] customizePayload(boolean isCustomPayload, byte[] payload) {
Map<String, Object> jsonPayload = mapper.readValue(payload, Map.class);
jsonPayload.put("Timestamp",System.currentTimeMillis());
payload = mapper.writeValueAsBytes(jsonPayload);
//log.info("customPayload: {}", new String(payload));
if(log.isDebugEnabled()) {
log.debug("customPayload: {}", new String(payload));
}
} catch (IOException e) {
log.warn("exception occur while customising payload: {}", new String(payload), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ public class PravegaBenchmarkProducer implements BenchmarkProducer {
private final EventStreamWriter<ByteBuffer> writer;
private final boolean includeTimestampInEvent;
private ByteBuffer timestampAndPayload;
private final boolean isCustomPayload;
private boolean isCustomPayload;
public PravegaBenchmarkProducer(String streamName, EventStreamClientFactory clientFactory,
boolean includeTimestampInEvent,
boolean enableConnectionPooling,
boolean isCustomPayload) {
boolean enableConnectionPooling) {
log.info("PravegaBenchmarkProducer: BEGIN: streamName={}", streamName);
writer = clientFactory.createEventWriter(
streamName,
Expand All @@ -49,7 +48,13 @@ public PravegaBenchmarkProducer(String streamName, EventStreamClientFactory clie
.enableConnectionPooling(enableConnectionPooling)
.build());
this.includeTimestampInEvent = includeTimestampInEvent;
this.isCustomPayload = isCustomPayload;
}
public PravegaBenchmarkProducer(String streamName, EventStreamClientFactory clientFactory,
boolean includeTimestampInEvent,
boolean enableConnectionPooling,
boolean isCustomPayload) {
this(streamName, clientFactory, includeTimestampInEvent, enableConnectionPooling);
this.isCustomPayload = isCustomPayload;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ public class PravegaBenchmarkTransactionProducer implements BenchmarkProducer {
private final int eventsPerTransaction;
private int eventCount = 0;
private ByteBuffer timestampAndPayload;
private final boolean isCustomPayload;
private boolean isCustomPayload;
public PravegaBenchmarkTransactionProducer(String streamName, EventStreamClientFactory clientFactory,
boolean includeTimestampInEvent, boolean enableConnectionPooling, int eventsPerTransaction, boolean isCustomPayload) {
boolean includeTimestampInEvent, boolean enableConnectionPooling, int eventsPerTransaction) {
log.info("PravegaBenchmarkProducer: BEGIN: streamName={}", streamName);

final String writerId = UUID.randomUUID().toString();
Expand All @@ -58,6 +58,10 @@ public PravegaBenchmarkTransactionProducer(String streamName, EventStreamClientF
EventWriterConfig.builder().enableConnectionPooling(enableConnectionPooling).build());
this.eventsPerTransaction = eventsPerTransaction;
this.includeTimestampInEvent = includeTimestampInEvent;
}
public PravegaBenchmarkTransactionProducer(String streamName, EventStreamClientFactory clientFactory,
boolean includeTimestampInEvent, boolean enableConnectionPooling, int eventsPerTransaction, boolean isCustomPayload) {
this(streamName, clientFactory, includeTimestampInEvent, enableConnectionPooling, eventsPerTransaction);
this.isCustomPayload = isCustomPayload;
}

Expand Down
2 changes: 1 addition & 1 deletion workloads/custom-payload-idrac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ consumerPerSubscription: 1
producersPerTopic: 1
producerRate: 10000
consumerBacklogSizeGB: 0
testDurationMinutes: 1
testDurationMinutes: 15
2 changes: 1 addition & 1 deletion workloads/custom-payload-k8s.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ consumerPerSubscription: 1
producersPerTopic: 1
producerRate: 10000
consumerBacklogSizeGB: 0
testDurationMinutes: 1
testDurationMinutes: 15

0 comments on commit fff84c5

Please sign in to comment.