diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestActionBuilder.scala b/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestActionBuilder.scala index 6bc983e..542d961 100644 --- a/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestActionBuilder.scala +++ b/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestActionBuilder.scala @@ -6,18 +6,22 @@ import io.gatling.core.action.Action import io.gatling.core.action.builder.ActionBuilder import io.gatling.core.structure.ScenarioContext import org.apache.kafka.clients.producer.KafkaProducer +import org.apache.kafka.common.serialization.Serializer import scala.collection.JavaConverters._ -class KafkaRequestActionBuilder[K,V](kafkaAttributes: KafkaAttributes[K,V]) extends ActionBuilder { +class KafkaRequestActionBuilder[K, V](kafkaAttributes: KafkaAttributes[K, V]) extends ActionBuilder { override def build( ctx: ScenarioContext, next: Action ): Action = { import ctx.{protocolComponentsRegistry, coreComponents, throttled} val kafkaComponents: KafkaComponents = protocolComponentsRegistry.components(KafkaProtocol.KafkaProtocolKey) - - val producer = new KafkaProducer[K,V]( kafkaComponents.kafkaProtocol.properties.asJava ) + val producer = new KafkaProducer[K, V]( + kafkaComponents.kafkaProtocol.properties.asJava, + kafkaComponents.kafkaProtocol.keySerializerOpt.asInstanceOf[Option[Serializer[K]]].orNull, + kafkaComponents.kafkaProtocol.valueSerializerOpt.asInstanceOf[Option[Serializer[V]]].orNull + ) coreComponents.actorSystem.registerOnTermination(producer.close()) diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/protocol/KafkaProtocol.scala b/src/main/scala/com/github/mnogu/gatling/kafka/protocol/KafkaProtocol.scala index c026517..5dbc1cd 100644 --- a/src/main/scala/com/github/mnogu/gatling/kafka/protocol/KafkaProtocol.scala +++ b/src/main/scala/com/github/mnogu/gatling/kafka/protocol/KafkaProtocol.scala @@ -3,12 +3,15 @@ package com.github.mnogu.gatling.kafka.protocol import io.gatling.core.CoreComponents import io.gatling.core.config.GatlingConfiguration import io.gatling.core.protocol.{Protocol, ProtocolKey} +import org.apache.kafka.common.serialization.Serializer object KafkaProtocol { def apply(configuration: GatlingConfiguration): KafkaProtocol = KafkaProtocol ( topic = "", - properties = Map() + properties = Map(), + None, + None ) val KafkaProtocolKey = new ProtocolKey[KafkaProtocol, KafkaComponents] { @@ -35,8 +38,12 @@ object KafkaProtocol { case class KafkaProtocol( topic: String, - properties: Map[String, Object]) extends Protocol { + properties: Map[String, Object], + keySerializerOpt: Option[Serializer[_]], + valueSerializerOpt: Option[Serializer[_]]) extends Protocol { def topic(topic: String): KafkaProtocol = copy(topic = topic) def properties(properties: Map[String, Object]): KafkaProtocol = copy(properties = properties) + def keySerializer(keySerializer: Serializer[_]): KafkaProtocol = copy(keySerializerOpt = Some(keySerializer)) + def valueSerializer(valueSerializer: Serializer[_]): KafkaProtocol = copy(valueSerializerOpt = Some(valueSerializer)) }