Skip to content
This repository has been archived by the owner on Nov 4, 2024. It is now read-only.

Commit

Permalink
Merge pull request #242 from galasa-dev/iss1915-kafka-transaction
Browse files Browse the repository at this point in the history
Iss1915 - Fix transaction state exception
  • Loading branch information
KirbyKatcher authored Jul 2, 2024
2 parents fe1bdbf + 89cde24 commit a41e75c
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ public class KafkaEventProducer implements IEventProducer {
public KafkaEventProducer(Properties properties, String topic) {

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
producer.initTransactions();
this.producer = producer;

this.topic = topic;
}

public void sendEvent(IEvent event){
producer.initTransactions();
producer.beginTransaction();
producer.send(new ProducerRecord<>(topic, event.toString()));
producer.commitTransaction();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
public class KafkaEventProducerFactory implements IEventProducerFactory {

private final String AUTH_TOKEN;
private String runName;

public KafkaEventProducerFactory(String authToken) {
public KafkaEventProducerFactory(String authToken, String runName) {
this.AUTH_TOKEN = authToken;
}

Expand All @@ -32,6 +33,8 @@ public Properties createProducerConfig(IConfigurationPropertyStoreService cps, S

try {
String bootstrapServers = cps.getProperty("bootstrap", "servers");
// Transactional IDs need to be unique for each producer
String transactionalId = runName + "-" + topic;

// Needed to get the Kafka classes at runtime
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
Expand All @@ -46,7 +49,7 @@ public Properties createProducerConfig(IConfigurationPropertyStoreService cps, S
properties.put("ssl.protocol", "TLSv1.2");
properties.put("ssl.enabled.protocols", "TLSv1.2");
properties.put("ssl.endpoint.identification.algorithm", "HTTPS");
properties.put("transactional.id", "transactional-id");
properties.put("transactional.id", transactionalId);

} catch (ConfigurationPropertyStoreException e) {
throw new KafkaException("Unable to retrieve Kafka properties from the CPS", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ public void initialise(@NotNull IFrameworkInitialisation frameworkInitialisation
// If the CPS is ETCD, then register this version of the EventsService
if (cps.getScheme().equals("etcd")) {
IFramework framework = frameworkInitialisation.getFramework();
String runName = framework.getTestRunName();

SystemEnvironment env = new SystemEnvironment();
String authToken = env.getenv(TOKEN_NAME);
KafkaEventProducerFactory producerFactory = new KafkaEventProducerFactory(authToken);
KafkaEventProducerFactory producerFactory = new KafkaEventProducerFactory(authToken, runName);
IConfigurationPropertyStoreService cpsService = framework.getConfigurationPropertyService(NAMESPACE);

frameworkInitialisation.registerEventsService(new KafkaEventsService(cpsService, producerFactory));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ public boolean isInitialised() {

@Override
public String getTestRunName() {
throw new UnsupportedOperationException("Unimplemented method 'getTestRunName'");
Random random = new Random();
int randomNumber = 100 + random.nextInt(900);
return "C" + randomNumber;
}

@Override
Expand Down

0 comments on commit a41e75c

Please sign in to comment.