Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 81: Customize pravega payload attributes before submitting to make each payload unique #80

Merged
merged 6 commits into from
Aug 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions driver-pravega/pravega.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ writer:
includeTimestampInEvent: True
enableStreamAutoScaling: False
eventsPerSecond: 10000
customPayload: true
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.openmessaging.benchmark.driver.pravega;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.ArrayUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;

/**
* This class is used to customize json payload by updating attributes of json.
*/
public class CustomPayloadUtils {
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
private static final Logger log = LoggerFactory.getLogger(CustomPayloadUtils.class);
private static final ObjectMapper mapper = new ObjectMapper();

/**
* If isCustomPayload is true, customize the byte array payload by updating json attribute values
* like Timestamp to make payload unique for almost each request.
* If isCustomPayload is false no change in payload.
*
* @param isCustomPayload custom payload is enabled or not
* @param payload static default payload to send to messaging system.
* @return customized payload in byte array
*/
public static byte[] customizePayload(boolean isCustomPayload, byte[] payload) {
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
if(isCustomPayload && ArrayUtils.isNotEmpty(payload)) {
try {
Map<String, Object> jsonPayload = mapper.readValue(payload, Map.class);
jsonPayload.put("Timestamp",System.currentTimeMillis());
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
payload = mapper.writeValueAsBytes(jsonPayload);
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
if(log.isDebugEnabled()) {
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
log.debug("customPayload: {}", new String(payload));
}
} catch (IOException e) {
log.warn("exception occur while customising payload: {}", new String(payload), e);
}
}
return payload;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
* This class implements pravega benchmark driver to create topics, producers and consumers required in benchmark.
*/
public class PravegaBenchmarkDriver implements BenchmarkDriver {
private static final Logger log = LoggerFactory.getLogger(PravegaBenchmarkDriver.class);

Expand All @@ -57,6 +60,11 @@ public class PravegaBenchmarkDriver implements BenchmarkDriver {
private EventStreamClientFactory clientFactory;
private final List<String> createdTopics = new ArrayList<>();

/**
* @param configurationFile pravega.yml configuration file
* @param statsLogger stats logger to collect stats from benchmark driver
* @throws IOException
*/
@Override
public void initialize(File configurationFile, StatsLogger statsLogger) throws IOException {
config = readConfig(configurationFile);
Expand All @@ -83,11 +91,19 @@ private String cleanName(String name) {
return name.replaceAll("[^A-Za-z0-9-]", "");
}

/**
* @return topic name prefix string
*/
@Override
public String getTopicNamePrefix() {
return "openmessaging-benchmark";
}

/**
* @param topic name of the topic you want to create
* @param partitions number of partitions in a topic
* @return
*/
@Override
public CompletableFuture<Void> createTopic(String topic, int partitions) {
topic = cleanName(topic);
Expand All @@ -112,20 +128,30 @@ public CompletableFuture<Void> createTopic(String topic, int partitions) {
return CompletableFuture.completedFuture(null);
}

/**
* @param topic name of the topic
* @return
*/
@Override
public CompletableFuture<BenchmarkProducer> createProducer(String topic) {
topic = cleanName(topic);
BenchmarkProducer producer = null;
if (config.enableTransaction) {
producer = new PravegaBenchmarkTransactionProducer(topic, clientFactory, config.includeTimestampInEvent,
config.writer.enableConnectionPooling, config.eventsPerTransaction);
config.writer.enableConnectionPooling, config.eventsPerTransaction, config.customPayload);
} else {
producer = new PravegaBenchmarkProducer(topic, clientFactory, config.includeTimestampInEvent,
config.writer.enableConnectionPooling);
config.writer.enableConnectionPooling, config.customPayload);
}
return CompletableFuture.completedFuture(producer);
}

/**
* @param topic name of the topic
* @param subscriptionName name of the subscription
* @param consumerCallback consumer call back object
* @return
*/
@Override
public CompletableFuture<BenchmarkConsumer> createConsumer(String topic, String subscriptionName,
ConsumerCallback consumerCallback) {
Expand All @@ -148,6 +174,10 @@ private void deleteTopics() {
}
}

/**
* Closes all the context objects of pravega
* @throws Exception
*/
@Override
public void close() throws Exception {
log.info("close: clientConfig={}", clientConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,24 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;


/**
* This class is pravega implementation of benchmark producer to produce messages for benchmark framework.
*/
public class PravegaBenchmarkProducer implements BenchmarkProducer {
private static final Logger log = LoggerFactory.getLogger(PravegaBenchmarkProducer.class);

private final EventStreamWriter<ByteBuffer> writer;
private final boolean includeTimestampInEvent;
private ByteBuffer timestampAndPayload;
private boolean isCustomPayload;

/**
* @param streamName name of the pravega stream
* @param clientFactory client factory to create pravega writers
* @param includeTimestampInEvent flag to include timestamp in the event
* @param enableConnectionPooling flag to enable/disable connection pool
*/
public PravegaBenchmarkProducer(String streamName, EventStreamClientFactory clientFactory,
boolean includeTimestampInEvent,
boolean enableConnectionPooling) {
Expand All @@ -50,8 +61,30 @@ public PravegaBenchmarkProducer(String streamName, EventStreamClientFactory clie
this.includeTimestampInEvent = includeTimestampInEvent;
}

/**
* @param streamName name of the pravega stream
* @param clientFactory client factory to create pravega writers
* @param includeTimestampInEvent flag to include timestamp in the event
* @param enableConnectionPooling flag to enable/disable connection pool
* @param isCustomPayload flag to enable/disable custom payload
*/
public PravegaBenchmarkProducer(String streamName, EventStreamClientFactory clientFactory,
boolean includeTimestampInEvent,
boolean enableConnectionPooling,
boolean isCustomPayload) {
this(streamName, clientFactory, includeTimestampInEvent, enableConnectionPooling);
this.isCustomPayload = isCustomPayload;
}

/**
* Sends async messages to pravega
* @param key the key associated with this message
* @param payload the message payload
* @return CompletableFuture object to get the state of sent message async
*/
@Override
public CompletableFuture<Void> sendAsync(Optional<String> key, byte[] payload) {
payload = CustomPayloadUtils.customizePayload(isCustomPayload, payload);
if (includeTimestampInEvent) {
if (timestampAndPayload == null || timestampAndPayload.limit() != Long.BYTES + payload.length) {
timestampAndPayload = ByteBuffer.allocate(Long.BYTES + payload.length);
Expand All @@ -68,6 +101,10 @@ private CompletableFuture<Void> writeEvent(Optional<String> key, ByteBuffer payl
return (key.isPresent()) ? writer.writeEvent(key.get(), payload) : writer.writeEvent(payload);
}

/**
* Closes pravega event writer
* @throws Exception
*/
@Override
public void close() throws Exception {
writer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import java.util.concurrent.CompletableFuture;
import java.util.UUID;

/**
* This class is pravega transaction implementation of benchmark producer to produce messages for benchmark framework.
*/
public class PravegaBenchmarkTransactionProducer implements BenchmarkProducer {
private static final Logger log = LoggerFactory.getLogger(PravegaBenchmarkProducer.class);

Expand All @@ -47,7 +50,15 @@ public class PravegaBenchmarkTransactionProducer implements BenchmarkProducer {
private final int eventsPerTransaction;
private int eventCount = 0;
private ByteBuffer timestampAndPayload;
private boolean isCustomPayload;

/**
* @param streamName name of the pravega stream
* @param clientFactory client factory to create pravega writers
* @param includeTimestampInEvent flag to include timestamp in the event
* @param enableConnectionPooling flag to enable/disable connection pool
* @param eventsPerTransaction number of events per transaction
*/
public PravegaBenchmarkTransactionProducer(String streamName, EventStreamClientFactory clientFactory,
boolean includeTimestampInEvent, boolean enableConnectionPooling, int eventsPerTransaction) {
log.info("PravegaBenchmarkProducer: BEGIN: streamName={}", streamName);
Expand All @@ -60,9 +71,30 @@ public PravegaBenchmarkTransactionProducer(String streamName, EventStreamClientF
this.includeTimestampInEvent = includeTimestampInEvent;
}

/**
* @param streamName name of the pravega stream
* @param clientFactory client factory to create pravega writers
* @param includeTimestampInEvent flag to include timestamp in the event
* @param enableConnectionPooling flag to enable/disable connection pool
* @param eventsPerTransaction number of events per transaction
* @param isCustomPayload flag to enable/disable custom payload
*/
public PravegaBenchmarkTransactionProducer(String streamName, EventStreamClientFactory clientFactory,
dada-dell-emc marked this conversation as resolved.
Show resolved Hide resolved
boolean includeTimestampInEvent, boolean enableConnectionPooling, int eventsPerTransaction, boolean isCustomPayload) {
this(streamName, clientFactory, includeTimestampInEvent, enableConnectionPooling, eventsPerTransaction);
this.isCustomPayload = isCustomPayload;
}

/**
* Sends async messages to pravega
* @param key the key associated with this message
* @param payload the message payload
* @return CompletableFuture object to get the state of sent message async
*/
@Override
public CompletableFuture<Void> sendAsync(Optional<String> key, byte[] payload) {
try {
payload = CustomPayloadUtils.customizePayload(isCustomPayload, payload);
if (transaction == null) {
transaction = transactionWriter.beginTxn();
}
Expand Down Expand Up @@ -97,6 +129,10 @@ private void writeEvent(Optional<String> key, ByteBuffer payload) throws TxnFail
}
}

/**
* Closes pravega event writer and aborts the transaction
* @throws Exception
*/
@Override
public void close() throws Exception {
synchronized (this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,5 @@ public class PravegaConfig {
// By default, streams created for benchmarking will be deleted at the end of the test.
// Set to false to keep the streams.
public boolean deleteStreams = true;
public boolean customPayload = false;
}
1 change: 1 addition & 0 deletions payload/custom-payload-idrac.data
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"metrics":[{"fields":{"cmos-batterystate":4,"cpu1-temp":190,"cpu2-temp":330,"exhaust-temp":510,"fan1-speed":11640,"fan2-speed":9360,"fan3-speed":11880,"fan4-speed":9240,"fan5-speed":11880,"fan6-speed":9240,"idrac-url":"https://11.111.11.111:123","inlet-temp":470,"power-state":4,"system-globalstatus":3,"system-model":"PowerEdge R640","system-servicetag":"3XXXX13","system-uptime":2501967,"system-watts":336},"name":"snmp","tags":{"agent_host":"11.111.11.111","host":"XXXXXXXX-xxxxxxx-service","system-name":"xXXXX-3XXXX13"},"timestamp":1686258320},{"fields":{"cmos-batterystate":4,"cpu1-temp":180,"cpu2-temp":320,"exhaust-temp":490,"fan1-speed":11400,"fan2-speed":9000,"fan3-speed":11520,"fan4-speed":8880,"fan5-speed":11400,"fan6-speed":9000,"idrac-url":"https://11.111.11.111:123","inlet-temp":510,"power-state":4,"system-globalstatus":3,"system-model":"PowerEdge R640","system-servicetag":"3XXXX13","system-uptime":2499053,"system-watts":336},"name":"snmp","tags":{"agent_host":"11.111.11.111","host":"XXXXXXXX-xxxxxxx-service","system-name":"xXXXX-3XXXX13"},"timestamp":1686258320},{"fields":{"cmos-batterystate":4,"cpu1-temp":200,"cpu2-temp":350,"exhaust-temp":500,"fan1-speed":11280,"fan2-speed":8760,"fan3-speed":11160,"fan4-speed":8760,"fan5-speed":11160,"fan6-speed":8760,"idrac-url":"https://11.111.11.111:123","inlet-temp":440,"power-state":4,"system-globalstatus":3,"system-model":"PowerEdge R640","system-servicetag":"3XXXX13","system-uptime":2348643,"system-watts":336},"name":"snmp","tags":{"agent_host":"11.111.11.111","host":"XXXXXXXX-xxxxxxx-service","system-name":"xXXXX-3XXXX13"},"timestamp":1686258320},{"fields":{"cmos-batterystate":4,"cpu1-temp":180,"cpu2-temp":380,"exhaust-temp":680,"fan1-speed":6480,"fan2-speed":5160,"fan3-speed":6600,"fan4-speed":5160,"fan5-speed":6600,"fan6-speed":5160,"idrac-url":"https://11.111.11.111:123","inlet-temp":580,"power-state":4,"system-globalstatus":3,"system-model":"PowerEdge R640","system-servicetag":"3XXXX13","system-uptime":17300885,"system-watts":312},"name":"snmp","tags":{"agent_host":"11.111.11.111","host":"XXXXXXXX-xxxxxxx-service","system-name":"xXXXX-3XXXX13"},"timestamp":1686258320},{"fields":{"cmos-batterystate":4,"cpu1-temp":190,"cpu2-temp":330,"exhaust-temp":490,"fan1-speed":11280,"fan2-speed":8760,"fan3-speed":11280,"fan4-speed":8640,"fan5-speed":11160,"fan6-speed":8640,"idrac-url":"https://11.111.11.111:123","inlet-temp":440,"power-state":4,"system-globalstatus":3,"system-model":"PowerEdge R640","system-servicetag":"3XXXX13","system-uptime":2499984,"system-watts":312},"name":"snmp","tags":{"agent_host":"11.111.11.111","host":"XXXXXXXX-xxxxxxx-service","system-name":"xXXXX-3XXXX13"},"timestamp":1686258320}],"Timestamp":1686258320156}
1 change: 1 addition & 0 deletions payload/custom-payload-k8s.data

Large diffs are not rendered by default.

32 changes: 32 additions & 0 deletions workloads/custom-payload-idrac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: custom payload idrac

topics: 1
partitionsPerTopic: 1
keyDistributor: "NO_KEY"
messageSize: 2727
payloadFile: "payload/custom-payload-idrac.data"
subscriptionsPerTopic: 1
consumerPerSubscription: 1
producersPerTopic: 1
producerRate: 10000
consumerBacklogSizeGB: 0
testDurationMinutes: 15
32 changes: 32 additions & 0 deletions workloads/custom-payload-k8s.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

name: custom payload k8s

topics: 1
partitionsPerTopic: 1
keyDistributor: "NO_KEY"
messageSize: 712865
payloadFile: "payload/custom-payload-k8s.data"
subscriptionsPerTopic: 1
consumerPerSubscription: 1
producersPerTopic: 1
producerRate: 10000
consumerBacklogSizeGB: 0
testDurationMinutes: 15