diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 02e56131..ddf290bd 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -10,8 +10,9 @@ jobs: strategy: matrix: scala: - - 2.13.8 - - 2.12.15 + - 2.13.12 + - 2.12.18 + - 3.3.1 steps: - uses: actions/checkout@v3 diff --git a/build.sbt b/build.sbt index 7c85f9d6..62581711 100644 --- a/build.sbt +++ b/build.sbt @@ -5,6 +5,13 @@ ThisBuild / versionScheme := Some("early-semver") ThisBuild / evictionErrorLevel := Level.Warn +def crossSettings[T](scalaVersion: String, if3: List[T], if2: List[T]) = + CrossVersion.partialVersion(scalaVersion) match { + case Some((3, _)) => if3 + case Some((2, 12 | 13)) => if2 + case _ => Nil + } + lazy val commonSettings = Seq( organization := "com.evolutiongaming", homepage := Some(new URL("https://github.com/evolution-gaming/skafka")), @@ -12,11 +19,20 @@ lazy val commonSettings = Seq( organizationName := "Evolution", organizationHomepage := Some(url("https://evolution.com")), scalaVersion := crossScalaVersions.value.head, - crossScalaVersions := Seq("2.13.8", "2.12.15"), + crossScalaVersions := Seq("2.13.12", "3.3.1", "2.12.18"), licenses := Seq(("MIT", url("https://opensource.org/licenses/MIT"))), releaseCrossBuild := true, Compile / doc / scalacOptions += "-no-link-warnings", - libraryDependencies += compilerPlugin(`kind-projector` cross CrossVersion.full), + scalacOptions ++= crossSettings( + scalaVersion.value, + if3 = List("-Ykind-projector", "-language:implicitConversions", "-explain", "-deprecation"), + if2 = List("-Xsource:3"), + ), + libraryDependencies ++= crossSettings( + scalaVersion.value, + if3 = Nil, + if2 = List(compilerPlugin(`kind-projector` cross CrossVersion.full)) + ), scalacOptsFailOnWarn := Some(false), publishTo := Some(Resolver.evolutionReleases), // KeyRanks.Invisible to suppress sbt warning about key not being used in root/tests where MiMa plugin is disabled @@ -24,7 +40,13 @@ lazy val commonSettings = Seq( val versions = List( "11.0.0", ) - versions.map(organization.value %% moduleName.value % _).toSet + + // check against all versions once Scala 3 lib version is published + crossSettings( + scalaVersion.value, + if3 = Nil, + if2 = versions.map(organization.value %% moduleName.value % _) + ).toSet }, mimaBinaryIssueFilters ++= Seq( ProblemFilters.exclude[ReversedMissingMethodProblem]("com.evolutiongaming.skafka.consumer.Consumer.subscribe"), diff --git a/modules/play-json/src/main/scala/com/evolutiongaming/skafka/producer/JsonProducer.scala b/modules/play-json/src/main/scala/com/evolutiongaming/skafka/producer/JsonProducer.scala index ae572e1d..17549704 100644 --- a/modules/play-json/src/main/scala/com/evolutiongaming/skafka/producer/JsonProducer.scala +++ b/modules/play-json/src/main/scala/com/evolutiongaming/skafka/producer/JsonProducer.scala @@ -2,7 +2,7 @@ package com.evolutiongaming.skafka.producer import cats.Applicative import com.evolutiongaming.catshelper.FromTry -import com.evolutiongaming.jsonitertool.PlayJsonJsoniter +import com.evolution.playjson.jsoniter.PlayJsonJsoniter import com.evolutiongaming.skafka.{ToBytes, Topic} import play.api.libs.json.{JsValue, Json} diff --git a/modules/play-json/src/test/scala/com/evolutiongaming/skafka/producer/JsonProducerSpec.scala b/modules/play-json/src/test/scala/com/evolutiongaming/skafka/producer/JsonProducerSpec.scala index 20a7bce3..0c20b82e 100644 --- a/modules/play-json/src/test/scala/com/evolutiongaming/skafka/producer/JsonProducerSpec.scala +++ b/modules/play-json/src/test/scala/com/evolutiongaming/skafka/producer/JsonProducerSpec.scala @@ -8,7 +8,6 @@ import org.scalatest.funsuite.AnyFunSuite import org.scalatest.matchers.should.Matchers class JsonProducerSpec extends AnyFunSuite with Matchers { - test("apply") { val metadata = RecordMetadata(TopicPartition("topic", Partition.min)) var actual = Option.empty[(Option[Bytes], Option[Bytes])] @@ -18,7 +17,7 @@ class JsonProducerSpec extends AnyFunSuite with Matchers { record: ProducerRecord[K, V])(implicit toBytesK: ToBytes[Id, K], toBytesV: ToBytes[Id, V] - ) = { + ): RecordMetadata = { val topic = record.topic val value = record.value.map(toBytesV(_, topic)) val key = record.key.map(toBytesK(_, topic)) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 9061f687..398dc0fd 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -2,15 +2,14 @@ import sbt._ object Dependencies { - val `executor-tools` = "com.evolutiongaming" %% "executor-tools" % "1.0.2" - val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.4" - val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.6" - val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.5.0" - val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.40.17" - val `play-json-jsoniter` = "com.evolutiongaming" %% "play-json-jsoniter" % "0.10.0" + val `config-tools` = "com.evolutiongaming" %% "config-tools" % "1.0.5" + val `future-helper` = "com.evolutiongaming" %% "future-helper" % "1.0.7" + val `cats-helper` = "com.evolutiongaming" %% "cats-helper" % "3.9.0" + val `testcontainers-kafka` = "com.dimafeng" %% "testcontainers-scala-kafka" % "0.41.0" + val `play-json-jsoniter` = "com.evolution" %% "play-json-jsoniter" % "1.0.0" val `scala-java8-compat` = "org.scala-lang.modules" %% "scala-java8-compat" % "1.0.2" - val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.8.1" - val scalatest = "org.scalatest" %% "scalatest" % "3.2.13" + val `collection-compat` = "org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" + val scalatest = "org.scalatest" %% "scalatest" % "3.2.17" val `kind-projector` = "org.typelevel" % "kind-projector" % "0.13.2" val discipline = "org.typelevel" %% "discipline-scalatest" % "2.2.0" @@ -20,19 +19,19 @@ object Dependencies { } object Logback { - private val version = "1.2.11" + private val version = "1.4.11" val core = "ch.qos.logback" % "logback-core" % version val classic = "ch.qos.logback" % "logback-classic" % version } object Slf4j { - private val version = "1.7.36" + private val version = "2.0.9" val api = "org.slf4j" % "slf4j-api" % version val `log4j-over-slf4j` = "org.slf4j" % "log4j-over-slf4j" % version } object Cats { - private val version = "2.8.0" + private val version = "2.10.0" val core = "org.typelevel" %% "cats-core" % version val laws = "org.typelevel" %% "cats-laws" % version } @@ -44,7 +43,7 @@ object Dependencies { } object Smetrics { - private val version = "2.0.0" + private val version = "2.1.0" val smetrics = "com.evolutiongaming" %% "smetrics" % version val `smetrics-prometheus` = "com.evolutiongaming" %% "smetrics-prometheus" % version } diff --git a/project/build.properties b/project/build.properties index 22af2628..e8a1e246 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.7.1 +sbt.version=1.9.7 diff --git a/project/plugins.sbt b/project/plugins.sbt index 7472025f..c4f79e44 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -1,6 +1,6 @@ -addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.9.3") +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.9") -addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.2") +addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.3.11") addSbtPlugin("com.github.sbt" % "sbt-release" % "1.1.0") @@ -8,4 +8,4 @@ addSbtPlugin("com.evolution" % "sbt-scalac-opts-plugin" % "0.0.9") addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2") -addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.0.1") +addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "1.1.3") diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/CommonConfig.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/CommonConfig.scala index a1a60413..c0535d43 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/CommonConfig.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/CommonConfig.scala @@ -55,7 +55,7 @@ object CommonConfig { val Default: CommonConfig = CommonConfig() - private implicit val SecurityProtocolFromConf = FromConf[SecurityProtocol] { (conf, path) => + private implicit val SecurityProtocolFromConf: FromConf[SecurityProtocol] = FromConf[SecurityProtocol] { (conf, path) => val str = conf.getString(path) val value = SecurityProtocol.Values.find { _.name equalsIgnoreCase str } value getOrElse { @@ -63,7 +63,7 @@ object CommonConfig { } } - private implicit val ClientDnsLookupFromConf = FromConf[ClientDnsLookup] { (conf, path) => + private implicit val ClientDnsLookupFromConf: FromConf[ClientDnsLookup] = FromConf[ClientDnsLookup] { (conf, path) => val str = conf.getString(path) val value = ClientDnsLookup.Values.find { _.name.equalsIgnoreCase(str) } value.getOrElse(throw new ConfigException.BadValue(conf.origin(), path, s"Cannot parse ClientDnsLookup from $str")) diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/Converters.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/Converters.scala index b549209a..756ad461 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/Converters.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/Converters.scala @@ -6,7 +6,7 @@ import java.util.{Optional, Collection => CollectionJ, Map => MapJ, Set => SetJ, import cats.Monad import cats.data.{NonEmptyList => Nel, NonEmptySet => Nes, NonEmptyMap => Nem} -import cats.implicits._ +import cats.syntax.all.* import com.evolutiongaming.catshelper.CatsHelper._ import com.evolutiongaming.catshelper.{ApplicativeThrowable, FromTry, MonadThrowable, ToTry} import org.apache.kafka.clients.consumer.{OffsetAndMetadata => OffsetAndMetadataJ} @@ -95,20 +95,16 @@ object Converters { } implicit class MapJOps[K, V](val self: MapJ[K, V]) extends AnyVal { - def asScalaMap[F[_]: Monad, A, B](ka: K => F[A], vb: V => F[B], keepNullValues: Boolean): F[Map[A, B]] = { self .asScala .toList - .collect { - case (k, v) if k != null && (keepNullValues || v != null) => - for { - a <- ka(k) - b <- vb(v) - } yield (a, b) + // at the moment we cannot use partial functions inside `AnyVal`, see: https://github.com/lampepfl/dotty/issues/18769 + .traverseFilter { case (k, v) => + if (k != null && (keepNullValues || v != null)) (ka(k), vb(v)).mapN((_, _).some) + else none[(A, B)].pure[F] } - .sequence - .map { _.toMap } + .map(_.toMap) } def asScalaMap[F[_]: Monad, A, B](ka: K => F[A], vb: V => F[B]): F[Map[A, B]] = { diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/FromBytes.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/FromBytes.scala index bb9e66b0..d85753d6 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/FromBytes.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/FromBytes.scala @@ -22,7 +22,7 @@ object FromBytes { implicit def functorFromBytes[F[_]: Functor]: Functor[FromBytes[F, *]] = new Functor[FromBytes[F, *]] { - def map[A, B](fa: FromBytes[F, A])(f: A => B) = new FromBytes[F, B] { + def map[A, B](fa: FromBytes[F, A])(f: A => B): FromBytes[F, B] = new FromBytes[F, B] { def apply(bytes: Bytes, topic: Topic) = fa(bytes, topic).map(f) } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/OffsetAndMetadata.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/OffsetAndMetadata.scala index 9db743e9..8c09eb45 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/OffsetAndMetadata.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/OffsetAndMetadata.scala @@ -19,7 +19,7 @@ object OffsetAndMetadata { implicit val showOffset: Show[OffsetAndMetadata] = Show.fromToString implicit val orderOffset: Order[OffsetAndMetadata] = - Order.whenEqual(Order.by { a: OffsetAndMetadata => a.offset }, Order.by { a: OffsetAndMetadata => a.metadata }) + Order.whenEqual(Order.by(_.offset), Order.by(_.metadata)) implicit val orderingOffset: Ordering[OffsetAndMetadata] = orderOffset.toOrdering } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/TopicPartition.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/TopicPartition.scala index 2f12bd98..fb3478f0 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/TopicPartition.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/TopicPartition.scala @@ -13,7 +13,7 @@ object TopicPartition { val empty: TopicPartition = TopicPartition("", Partition.min) implicit val orderTopicPartition: Order[TopicPartition] = - Order.whenEqual(Order.by { a: TopicPartition => a.topic }, Order.by { a: TopicPartition => a.partition }) + Order.whenEqual(Order.by(_.topic), Order.by(_.partition)) implicit val showTopicPartition: Show[TopicPartition] = Show.fromToString diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/Consumer.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/Consumer.scala index 648d366a..f3f909e3 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/Consumer.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/Consumer.scala @@ -6,7 +6,7 @@ import cats.effect._ import cats.effect.implicits._ import cats.effect.std.Semaphore import cats.implicits._ -import cats.{Applicative, Monad, MonadError, ~>} +import cats.{Applicative, Monad, Monoid, MonadError, ~>} import com.evolutiongaming.catshelper.CatsHelper._ import com.evolutiongaming.catshelper._ import com.evolutiongaming.skafka.Converters._ @@ -590,7 +590,7 @@ object Consumer { metrics: ConsumerMetrics[F] )(implicit F: MonadError[F, E], measureDuration: MeasureDuration[F]): Consumer[F, K, V] = { - implicit val monoidUnit = Applicative.monoid[F, Unit] + implicit val monoidUnit: Monoid[F[Unit]] = Applicative.monoid[F, Unit] val topics = for { topicPartitions <- self.assignment @@ -953,7 +953,7 @@ object Consumer { metrics: ConsumerMetrics[F] )(implicit F: MonadError[F, E], measureDuration: MeasureDuration[F], clock: Clock[F]): Consumer[F, K, V] = { - implicit val monoidUnit = Applicative.monoid[F, Unit] + implicit val monoidUnit: Monoid[F[Unit]] = Applicative.monoid[F, Unit] val topics = for { topicPartitions <- self.assignment diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerConfig.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerConfig.scala index 2840b297..0195ce13 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerConfig.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerConfig.scala @@ -75,7 +75,7 @@ object ConsumerConfig { val Default: ConsumerConfig = ConsumerConfig() - private implicit val AutoOffsetResetFromConf = FromConf[AutoOffsetReset] { (conf, path) => + private implicit val AutoOffsetResetFromConf: FromConf[AutoOffsetReset] = FromConf[AutoOffsetReset] { (conf, path) => val str = conf.getString(path) val value = AutoOffsetReset.Values.find { _.toString equalsIgnoreCase str } value getOrElse { @@ -83,7 +83,7 @@ object ConsumerConfig { } } - private implicit val IsolationLevelFromConf = FromConf[IsolationLevel] { (conf, path) => + private implicit val IsolationLevelFromConf: FromConf[IsolationLevel] = FromConf[IsolationLevel] { (conf, path) => val str = conf.getString(path) val value = IsolationLevel.Values.find { _.name equalsIgnoreCase str } value getOrElse { diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala index 41734e09..6d0f8340 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerMetrics.scala @@ -110,7 +110,7 @@ object ConsumerMetrics { quantiles = Quantiles.Default, labels = LabelNames("client", "topic") ) - } yield { clientId: ClientId => + } yield { (clientId: ClientId) => new ConsumerMetrics[F] { def call(name: String, topic: Topic, latency: FiniteDuration, success: Boolean) = { diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerRecord.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerRecord.scala index 58da4fcd..974d2858 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerRecord.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/ConsumerRecord.scala @@ -23,10 +23,10 @@ object ConsumerRecord { implicit def orderConsumerRecord[K: Order, V]: Order[ConsumerRecord[K, V]] = { Order.whenEqual( Order.whenEqual( - Order.by { a: ConsumerRecord[K, V] => a.topicPartition }, - Order.by { a: ConsumerRecord[K, V] => a.key } + Order.by(_.topicPartition), + Order.by(_.key) ), - Order.by { a: ConsumerRecord[K, V] => a.offset } + Order.by(_.offset) ) } diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/RebalanceCallback.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/RebalanceCallback.scala index 5a68655d..df09b21b 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/RebalanceCallback.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/consumer/RebalanceCallback.scala @@ -224,7 +224,6 @@ object RebalanceCallback extends RebalanceCallbackInstances with RebalanceCallba case HandleErrorWith(source, fe) => HandleErrorWith(() => source().mapK(fg), fe andThen (_.mapK(fg))) } } - } implicit class RebalanceCallbackNothingOps[A](val self: RebalanceCallback[Nothing, A]) extends AnyVal { diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerConfig.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerConfig.scala index 0f862e8e..696572d6 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerConfig.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerConfig.scala @@ -79,7 +79,7 @@ object ProducerConfig { val Default: ProducerConfig = ProducerConfig() - private implicit val CompressionTypeFromConf = FromConf[CompressionType] { (conf, path) => + private implicit val CompressionTypeFromConf: FromConf[CompressionType] = FromConf[CompressionType] { (conf, path) => val str = conf.getString(path) val value = CompressionType.Values.find { _.toString equalsIgnoreCase str } value getOrElse { @@ -87,7 +87,7 @@ object ProducerConfig { } } - private implicit val AcksFromConf = FromConf[Acks] { (conf, path) => + private implicit val AcksFromConf: FromConf[Acks] = FromConf[Acks] { (conf, path) => val str = conf.getString(path) val values = Acks.Values.filter(_.names.exists(str.equalsIgnoreCase)) diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala index 3b0eafd5..8accb7c0 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerMetrics.scala @@ -106,7 +106,7 @@ object ProducerMetrics { resultCounter <- resultCounter callLatency <- callLatency callCount <- callCount - } yield { clientId: ClientId => + } yield { (clientId: ClientId) => { def sendMeasure(result: String, topic: Topic, latency: FiniteDuration) = { diff --git a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerRecord.scala b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerRecord.scala index a58493f1..fd5d7007 100644 --- a/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerRecord.scala +++ b/skafka/src/main/scala/com/evolutiongaming/skafka/producer/ProducerRecord.scala @@ -17,9 +17,15 @@ final case class ProducerRecord[+K, +V]( object ProducerRecord { - def apply[K, V](topic: Topic, value: V, key: K): ProducerRecord[K, V] = { - ProducerRecord(topic = topic, value = Some(value), key = Some(key)) - } + def apply[K, V](topic: Topic, value: V, key: K): ProducerRecord[K, V] = + ProducerRecord( + topic = topic, + value = Some(value), + key = Some(key), + partition = None, + timestamp = None, + headers = Nil + ) implicit class ProducerRecordOps[K, V](val self: ProducerRecord[K, V]) extends AnyVal { diff --git a/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/ConsumerSpec.scala b/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/ConsumerSpec.scala index 8c66843e..6a866e42 100644 --- a/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/ConsumerSpec.scala +++ b/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/ConsumerSpec.scala @@ -18,7 +18,7 @@ import com.evolutiongaming.skafka.IOSuite._ import com.evolutiongaming.skafka._ import com.evolutiongaming.skafka.consumer.ConsumerConverters._ import org.apache.kafka.clients.consumer.{Consumer => ConsumerJ, ConsumerGroupMetadata => ConsumerGroupMetadataJ, ConsumerRebalanceListener => ConsumerRebalanceListenerJ, ConsumerRecords => ConsumerRecordsJ, OffsetAndMetadata => OffsetAndMetadataJ, OffsetCommitCallback => OffsetCommitCallbackJ} -import org.apache.kafka.common.{Node, TopicPartition => TopicPartitionJ} +import org.apache.kafka.common.{Node, TopicPartition => TopicPartitionJ, MetricName, Metric} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec @@ -399,7 +399,7 @@ class ConsumerSpec extends AnyWordSpec with Matchers { else Map(new TopicPartitionJ(topic, partition.value) -> null).asJavaMap(identity, identity) - def metrics() = new java.util.HashMap() + def metrics(): MapJ[MetricName, _ <: Metric] = new java.util.HashMap() def partitionsFor(topic: Topic) = { List(partitionInfo.asJava).asJava diff --git a/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/ExplodingConsumer.scala b/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/ExplodingConsumer.scala index a589d896..eddaaa8f 100644 --- a/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/ExplodingConsumer.scala +++ b/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/ExplodingConsumer.scala @@ -101,7 +101,7 @@ class ExplodingConsumer extends ConsumerJ[String, String] { def metrics(): MapJ[MetricName, _ <: Metric] = notImplemented def pause(partitions: CollectionJ[TopicPartitionJ]): Unit = notImplemented def resume(partitions: CollectionJ[TopicPartitionJ]): Unit = notImplemented - def enforceRebalance() = notImplemented + def enforceRebalance(): Unit = notImplemented def close(): Unit = notImplemented def close(timeout: Long, unit: TimeUnit): Unit = notImplemented def close(timeout: DurationJ): Unit = notImplemented diff --git a/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/RebalanceConsumerSpec.scala b/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/RebalanceConsumerSpec.scala index 22e4a502..b60e9e88 100644 --- a/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/RebalanceConsumerSpec.scala +++ b/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/RebalanceConsumerSpec.scala @@ -4,14 +4,18 @@ import java.time.Duration import java.util.concurrent.TimeUnit import java.util.regex.Pattern import java.{lang, util} +import java.lang.{Long => LongJ} import com.evolutiongaming.skafka.consumer.RebalanceConsumerSpec._ import org.apache.kafka.clients.consumer.{ ConsumerRebalanceListener, + ConsumerGroupMetadata => ConsumerGroupMetadataJ, OffsetAndMetadata, OffsetCommitCallback, - Consumer => ConsumerJ + OffsetAndTimestamp => OffsetAndTimestampJ, + Consumer => ConsumerJ, + ConsumerRecords => ConsumerRecordsJ } -import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.{TopicPartition, PartitionInfo, MetricName, Metric} import org.scalatest.freespec.AnyFreeSpec import org.scalatest.matchers.must.Matchers @@ -25,66 +29,81 @@ class RebalanceConsumerSpec extends AnyFreeSpec with Matchers { // val rebalanceConsumer = RebalanceConsumer(new MockConsumer[String, String](OffsetResetStrategy.NONE)) val consumerJ = new ConsumerJ[String, String] { - def assignment() = supported // rebalanceConsumer.assignment() - def subscription() = supported // rebalanceConsumer.subscription() - def subscribe(topics: util.Collection[String]) = unsupported - def subscribe(topics: util.Collection[String], callback: ConsumerRebalanceListener) = unsupported - def assign(partitions: util.Collection[TopicPartition]) = unsupported - def subscribe(pattern: Pattern, callback: ConsumerRebalanceListener) = unsupported - def subscribe(pattern: Pattern) = unsupported - def unsubscribe() = unsupported - def poll(timeout: Long) = unsupported - def poll(timeout: Duration) = unsupported - def commitSync() = supported // rebalanceConsumer.commit() - def commitSync(timeout: Duration) = supported // rebalanceConsumer.commit(timeout) - def commitSync(offsets: util.Map[TopicPartition, OffsetAndMetadata]) = + def assignment(): util.Set[TopicPartition] = supported // rebalanceConsumer.assignment() + def subscription(): util.Set[String] = supported // rebalanceConsumer.subscription() + def subscribe(topics: util.Collection[String]): Unit = unsupported + def subscribe(topics: util.Collection[String], callback: ConsumerRebalanceListener): Unit = unsupported + def assign(partitions: util.Collection[TopicPartition]): Unit = unsupported + def subscribe(pattern: Pattern, callback: ConsumerRebalanceListener): Unit = unsupported + def subscribe(pattern: Pattern): Unit = unsupported + def unsubscribe(): Unit = unsupported + def poll(timeout: Long): ConsumerRecordsJ[String, String] = unsupported + def poll(timeout: Duration): ConsumerRecordsJ[String, String] = unsupported + def commitSync(): Unit = supported // rebalanceConsumer.commit() + def commitSync(timeout: Duration): Unit = supported // rebalanceConsumer.commit(timeout) + def commitSync(offsets: util.Map[TopicPartition, OffsetAndMetadata]): Unit = supported // rebalanceConsumer.commit(offsets) - def commitSync(offsets: util.Map[TopicPartition, OffsetAndMetadata], timeout: Duration) = + def commitSync(offsets: util.Map[TopicPartition, OffsetAndMetadata], timeout: Duration): Unit = supported // rebalanceConsumer.commit(offsets, timeout) - def commitAsync() = unsupported - def commitAsync(callback: OffsetCommitCallback) = unsupported - def commitAsync(offsets: util.Map[TopicPartition, OffsetAndMetadata], callback: OffsetCommitCallback) = + def commitAsync(): Unit = unsupported + def commitAsync(callback: OffsetCommitCallback): Unit = unsupported + def commitAsync(offsets: util.Map[TopicPartition, OffsetAndMetadata], callback: OffsetCommitCallback): Unit = unsupported - def seek(partition: TopicPartition, offset: Long) = supported // rebalanceConsumer.seek(partition, offset) - def seek(partition: TopicPartition, offsetAndMetadata: OffsetAndMetadata) = + def seek(partition: TopicPartition, offset: Long): Unit = supported // rebalanceConsumer.seek(partition, offset) + def seek(partition: TopicPartition, offsetAndMetadata: OffsetAndMetadata): Unit = supported // rebalanceConsumer.seek(partition, offsetAndMetadata) - def seekToBeginning(partitions: util.Collection[TopicPartition]) = + def seekToBeginning(partitions: util.Collection[TopicPartition]): Unit = supported // rebalanceConsumer.seekToBeginning(partitions) - def seekToEnd(partitions: util.Collection[TopicPartition]) = supported // rebalanceConsumer.seekToEnd(partitions) - def position(partition: TopicPartition) = supported // rebalanceConsumer.position(partition) - def position(partition: TopicPartition, timeout: Duration) = + def seekToEnd(partitions: util.Collection[TopicPartition]): Unit = + supported // rebalanceConsumer.seekToEnd(partitions) + def position(partition: TopicPartition): Long = supported // rebalanceConsumer.position(partition) + def position(partition: TopicPartition, timeout: Duration): Long = supported // rebalanceConsumer.position(partition, timeout) - def committed(partition: TopicPartition) = unsupported - def committed(partition: TopicPartition, timeout: Duration) = unsupported - def committed(partitions: util.Set[TopicPartition]) = supported // rebalanceConsumer.committed(partitions) - def committed(partitions: util.Set[TopicPartition], timeout: Duration) = + def committed(partition: TopicPartition): OffsetAndMetadata = unsupported + def committed(partition: TopicPartition, timeout: Duration): OffsetAndMetadata = unsupported + def committed(partitions: util.Set[TopicPartition]): util.Map[TopicPartition, OffsetAndMetadata] = + supported // rebalanceConsumer.committed(partitions) + def committed( + partitions: util.Set[TopicPartition], + timeout: Duration + ): util.Map[TopicPartition, OffsetAndMetadata] = supported // rebalanceConsumer.committed(partitions, timeout) - def metrics() = unsupported - def partitionsFor(topic: String) = supported // rebalanceConsumer.partitionsFor(topic) - def partitionsFor(topic: String, timeout: Duration) = supported // rebalanceConsumer.partitionsFor(topic, timeout) - def listTopics() = supported // rebalanceConsumer.topics() - def listTopics(timeout: Duration) = supported // rebalanceConsumer.topics(timeout) - def paused() = supported // rebalanceConsumer.paused() - def pause(partitions: util.Collection[TopicPartition]) = unsupported - def resume(partitions: util.Collection[TopicPartition]) = unsupported - def offsetsForTimes(timestampsToSearch: util.Map[TopicPartition, lang.Long]) = + def metrics(): util.Map[MetricName, _ <: Metric] = unsupported + def partitionsFor(topic: String): util.List[PartitionInfo] = supported // rebalanceConsumer.partitionsFor(topic) + def partitionsFor(topic: String, timeout: Duration): util.List[PartitionInfo] = + supported // rebalanceConsumer.partitionsFor(topic, timeout) + def listTopics(): util.Map[String, util.List[PartitionInfo]] = supported // rebalanceConsumer.topics() + def listTopics(timeout: Duration): util.Map[String, util.List[PartitionInfo]] = + supported // rebalanceConsumer.topics(timeout) + def paused(): util.Set[TopicPartition] = supported // rebalanceConsumer.paused() + def pause(partitions: util.Collection[TopicPartition]): Unit = unsupported + def resume(partitions: util.Collection[TopicPartition]): Unit = unsupported + def offsetsForTimes( + timestampsToSearch: util.Map[TopicPartition, lang.Long] + ): util.Map[TopicPartition, OffsetAndTimestampJ] = supported // rebalanceConsumer.offsetsForTimes(timestampsToSearch) - def offsetsForTimes(timestampsToSearch: util.Map[TopicPartition, lang.Long], timeout: Duration) = + def offsetsForTimes( + timestampsToSearch: util.Map[TopicPartition, lang.Long], + timeout: Duration + ): util.Map[TopicPartition, OffsetAndTimestampJ] = supported // rebalanceConsumer.offsetsForTimes(timestampsToSearch, timeout) - def beginningOffsets(partitions: util.Collection[TopicPartition]) = + def beginningOffsets(partitions: util.Collection[TopicPartition]): util.Map[TopicPartition, LongJ] = supported // rebalanceConsumer.beginningOffsets(partitions) - def beginningOffsets(partitions: util.Collection[TopicPartition], timeout: Duration) = + def beginningOffsets( + partitions: util.Collection[TopicPartition], + timeout: Duration + ): util.Map[TopicPartition, LongJ] = supported // rebalanceConsumer.beginningOffsets(partitions, timeout) - def endOffsets(partitions: util.Collection[TopicPartition]) = + def endOffsets(partitions: util.Collection[TopicPartition]): util.Map[TopicPartition, LongJ] = supported // rebalanceConsumer.endOffsets(partitions) - def endOffsets(partitions: util.Collection[TopicPartition], timeout: Duration) = + def endOffsets(partitions: util.Collection[TopicPartition], timeout: Duration): util.Map[TopicPartition, LongJ] = supported // rebalanceConsumer.endOffsets(partitions, timeout) - def groupMetadata() = supported // rebalanceConsumer.groupMetadata() - def enforceRebalance() = unsupported - def close() = unsupported - def close(timeout: Long, unit: TimeUnit) = unsupported - def close(timeout: Duration) = unsupported - def wakeup() = unsupported + def groupMetadata(): ConsumerGroupMetadataJ = supported // rebalanceConsumer.groupMetadata() + def enforceRebalance(): Unit = unsupported + def close(): Unit = unsupported + def close(timeout: Long, unit: TimeUnit): Unit = unsupported + def close(timeout: Duration): Unit = unsupported + def wakeup(): Unit = unsupported def currentLag(topicPartition: TopicPartition): OptionalLong = unsupported def enforceRebalance(reason: String): Unit = unsupported } diff --git a/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/RebalanceListener1Spec.scala b/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/RebalanceListener1Spec.scala index 68018e82..128275fa 100644 --- a/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/RebalanceListener1Spec.scala +++ b/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/RebalanceListener1Spec.scala @@ -38,7 +38,9 @@ class RebalanceListener1Spec extends AnyFreeSpec with Matchers { listener1.onPartitionsRevoked(partitions.s).run(consumer) mustBe Try(()) - listener1.onPartitionsLost(partitions.s).effectAs[IO].run(consumer) mustBe Try(()) + val rebalanceCallback: RebalanceCallback[IO,Unit] = listener1.onPartitionsLost(partitions.s) + + rebalanceCallback.run(consumer) mustBe Try(()) } } @@ -98,7 +100,7 @@ object RebalanceListener1Spec { } yield a // do not need to save the offsets since these partitions are probably owned by other consumers already - def onPartitionsLost(partitions: Nes[TopicPartition]) = RebalanceCallback.empty + def onPartitionsLost(partitions: Nes[TopicPartition]): RebalanceCallback[F,Unit] = RebalanceCallback.empty } def readOffsetsFromExternalStore[F[_]: Applicative]( diff --git a/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/SerialListenersTest.scala b/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/SerialListenersTest.scala index a9ca1b0c..a4412889 100644 --- a/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/SerialListenersTest.scala +++ b/skafka/src/test/scala/com/evolutiongaming/skafka/consumer/SerialListenersTest.scala @@ -309,7 +309,7 @@ object SerialListenersTest { Map.empty[TopicPartitionJ, OffsetAndMetadataJ].asJava } - def metrics() = Map.empty[MetricName, Metric].asJava + def metrics(): MapJ[MetricName, Metric] = Map.empty.asJava def partitionsFor(topic: String) = List.empty[PartitionInfo].asJava diff --git a/skafka/src/test/scala/com/evolutiongaming/skafka/producer/ProducerSendSpec.scala b/skafka/src/test/scala/com/evolutiongaming/skafka/producer/ProducerSendSpec.scala index 002b7332..19dc3063 100644 --- a/skafka/src/test/scala/com/evolutiongaming/skafka/producer/ProducerSendSpec.scala +++ b/skafka/src/test/scala/com/evolutiongaming/skafka/producer/ProducerSendSpec.scala @@ -1,6 +1,6 @@ package com.evolutiongaming.skafka.producer -import java.util.concurrent.CompletableFuture +import java.util.concurrent.{CompletableFuture, Future => FutureJ} import java.util.{Map => MapJ} import cats.effect.{Async, Concurrent, Deferred, IO, Sync} import cats.implicits._ @@ -61,17 +61,17 @@ class ProducerSendSpec extends AsyncFunSuite with Matchers { def partitionsFor(topic: String) = Nil.asJava - def metrics() = Map.empty[MetricName, Metric].asJava + def metrics(): MapJ[MetricName, Metric] = Map.empty.asJava def close() = {} def close(timeout: java.time.Duration) = {} - def send(record: ProducerRecordJ[Bytes, Bytes]) = { + def send(record: ProducerRecordJ[Bytes, Bytes]): FutureJ[RecordMetadataJ] = { CompletableFuture.completedFuture(metadata.asJava) } - def send(record: ProducerRecordJ[Bytes, Bytes], callback: Callback) = { + def send(record: ProducerRecordJ[Bytes, Bytes], callback: Callback): FutureJ[RecordMetadataJ] = { val a = for { f <- block } yield { diff --git a/skafka/src/test/scala/com/evolutiongaming/skafka/producer/ProducerSpec.scala b/skafka/src/test/scala/com/evolutiongaming/skafka/producer/ProducerSpec.scala index 34fea81a..79bddd12 100644 --- a/skafka/src/test/scala/com/evolutiongaming/skafka/producer/ProducerSpec.scala +++ b/skafka/src/test/scala/com/evolutiongaming/skafka/producer/ProducerSpec.scala @@ -1,7 +1,7 @@ package com.evolutiongaming.skafka.producer import java.util -import java.util.concurrent.CompletableFuture +import java.util.concurrent.{CompletableFuture, Future => FutureJ} import cats.arrow.FunctionK import cats.data.{NonEmptyMap => Nem} @@ -17,7 +17,7 @@ import org.apache.kafka.clients.consumer.{ ConsumerGroupMetadata => ConsumerGroupMetadataJ, OffsetAndMetadata => OffsetAndMetadataJ } -import org.apache.kafka.clients.producer.{Callback, Producer => ProducerJ, ProducerRecord => ProducerRecordJ} +import org.apache.kafka.clients.producer.{Callback, Producer => ProducerJ, ProducerRecord => ProducerRecordJ, RecordMetadata => RecordMetadataJ} import org.apache.kafka.common.{Metric, MetricName, TopicPartition => TopicPartitionJ} import scala.jdk.CollectionConverters._ @@ -85,7 +85,7 @@ class ProducerSpec extends AnyWordSpec with Matchers { } "partitions" in new Scope { - verify(producer.partitions(topic)) { result: List[PartitionInfo] => + verify(producer.partitions(topic)) { (result: List[PartitionInfo]) => result shouldEqual Nil partitionsFor shouldEqual topic } @@ -101,7 +101,7 @@ class ProducerSpec extends AnyWordSpec with Matchers { "Producer.empty" should { - implicit val empty = Producer.empty[IO] + implicit val empty: Producer[IO] = Producer.empty[IO] "initTransactions" in { verify(Producer[IO].initTransactions) { _ => } @@ -178,15 +178,15 @@ class ProducerSpec extends AnyWordSpec with Matchers { Nil.asJava } - def metrics() = Map.empty[MetricName, Metric].asJava + def metrics(): util.Map[MetricName, Metric] = Map.empty.asJava def close() = {} def close(timeout: java.time.Duration) = {} - def send(record: ProducerRecordJ[Bytes, Bytes]) = completableFuture + def send(record: ProducerRecordJ[Bytes, Bytes]): FutureJ[RecordMetadataJ] = completableFuture - def send(record: ProducerRecordJ[Bytes, Bytes], callback: Callback) = { + def send(record: ProducerRecordJ[Bytes, Bytes], callback: Callback): FutureJ[RecordMetadataJ] = { callback.onCompletion(metadata.asJava, null) completableFuture } @@ -195,7 +195,7 @@ class ProducerSpec extends AnyWordSpec with Matchers { } val producer: Producer[IO] = { - implicit val measureDuration = MeasureDuration.empty[IO] + implicit val measureDuration: MeasureDuration[IO] = MeasureDuration.empty[IO] Producer .fromProducerJ2[IO](jProducer.pure[IO]) .allocated diff --git a/tests/src/test/scala/com/evolutiongaming/skafka/ProducerConsumerSpec.scala b/tests/src/test/scala/com/evolutiongaming/skafka/ProducerConsumerSpec.scala index 3c781c8c..d359d093 100644 --- a/tests/src/test/scala/com/evolutiongaming/skafka/ProducerConsumerSpec.scala +++ b/tests/src/test/scala/com/evolutiongaming/skafka/ProducerConsumerSpec.scala @@ -203,9 +203,9 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match } yield () } - def onPartitionsRevoked(partitions: Nes[TopicPartition]) = RebalanceCallback.empty + def onPartitionsRevoked(partitions: Nes[TopicPartition]): RebalanceCallback[IO, Unit] = RebalanceCallback.empty - def onPartitionsLost(partitions: Nes[TopicPartition]) = RebalanceCallback.empty + def onPartitionsLost(partitions: Nes[TopicPartition]): RebalanceCallback[IO, Unit] = RebalanceCallback.empty } } @@ -311,7 +311,7 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match ) } yield a - def onPartitionsLost(partitions: Nes[TopicPartition]) = RebalanceCallback.empty + def onPartitionsLost(partitions: Nes[TopicPartition]): RebalanceCallback[IO, Unit] = RebalanceCallback.empty } } @@ -386,11 +386,11 @@ class ProducerConsumerSpec extends AnyFunSuite with BeforeAndAfterAll with Match new RebalanceListener1WithConsumer[IO] { def onPartitionsAssigned(partitions: Nes[TopicPartition]) = - partitions.foldMapM(consumer.seek(_, Offset.unsafe(1))) + partitions.foldMapM(this.consumer.seek(_, Offset.unsafe(1))) - def onPartitionsRevoked(partitions: Nes[TopicPartition]) = RebalanceCallback.empty + def onPartitionsRevoked(partitions: Nes[TopicPartition]): RebalanceCallback[IO, Unit] = RebalanceCallback.empty - def onPartitionsLost(partitions: Nes[TopicPartition]) = RebalanceCallback.empty + def onPartitionsLost(partitions: Nes[TopicPartition]): RebalanceCallback[IO, Unit] = RebalanceCallback.empty } } diff --git a/version.sbt b/version.sbt index 408a5ef6..8ac3cf41 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "16.0.4-SNAPSHOT" +ThisBuild / version := "16.0.5-SNAPSHOT"