diff --git a/build.sbt b/build.sbt index 87c69a0..fa08837 100644 --- a/build.sbt +++ b/build.sbt @@ -33,19 +33,18 @@ val rtmLite = project .settings(sharedPublishSettings: _*) .enablePlugins(GitVersioning) -/* -// Deprecated -val rtm = project - .in(file("rtm")) - .settings(name := "scala-slack-rtm") - .settings(compileSettings) - .configs(IntegrationTest) - .settings(Defaults.itSettings: _*) - .settings(libraryDependencies ++= Seq(Dependencies.ScalaTest, Dependencies.ScalaTestIt)) +val rtmLiteBenchmark = project + .in(file("rtm_lite_benchmark")) + .settings(name := "scala-slack-rtm-lite-benchmark") +// .settings(compileSettings) +// .configs(IntegrationTest) +// .settings(Defaults.itSettings: _*) +// .settings(libraryDependencies ++= Seq(Dependencies.ScalaTest, Dependencies.ScalaTestIt)) .dependsOn(rtmLite) - .settings(sharedPublishSettings: _*) - .enablePlugins(GitVersioning) -*/ +// .settings(sharedPublishSettings: _*) +// .enablePlugins(GitVersioning) + .enablePlugins(JmhPlugin) + val web = project .in(file("web")) diff --git a/project/plugins.sbt b/project/plugins.sbt index f1ead33..f009be8 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -4,4 +4,5 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.9.3") addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.1") addSbtPlugin("com.dwijnand" % "sbt-travisci" % "1.1.1") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0") \ No newline at end of file +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0") +addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.3.4") diff --git a/rtm_lite/src/main/scala/com/github/agaro1121/rtmlite/client/AkkaStreamsComponents.scala b/rtm_lite/src/main/scala/com/github/agaro1121/rtmlite/client/AkkaStreamsComponents.scala index 54bdbe7..58e94ee 100644 --- a/rtm_lite/src/main/scala/com/github/agaro1121/rtmlite/client/AkkaStreamsComponents.scala +++ b/rtm_lite/src/main/scala/com/github/agaro1121/rtmlite/client/AkkaStreamsComponents.scala @@ -3,7 +3,7 @@ package com.github.agaro1121.rtmlite.client import akka.NotUsed import akka.http.scaladsl.model.ws import akka.http.scaladsl.model.ws.TextMessage -import akka.stream.scaladsl.Flow +import akka.stream.scaladsl.{Flow, JsonFraming} import com.github.agaro1121.core.utils.JsonUtils import com.github.agaro1121.sharedevents.models import io.circe.syntax._ @@ -13,7 +13,7 @@ import com.github.agaro1121.sharedevents.marshalling.GeneralEventEncoders.Messag import com.github.agaro1121.sharedevents.marshalling.GeneralEventDecoders.MessageDecoder import cats.syntax.either._ //for calling `map` on [[Either]] in Scala v2.11.x -private[client] trait AkkaStreamsComponents { +trait AkkaStreamsComponents { val wsMessage2Json: Flow[ws.Message, Either[ParsingFailure, Json], NotUsed] = Flow[ws.Message] diff --git a/rtm_lite/src/test/scala/com/github/agaro1121/rtmlite/AkkaStreamsComponentsTest.scala b/rtm_lite/src/test/scala/com/github/agaro1121/rtmlite/AkkaStreamsComponentsTest.scala new file mode 100644 index 0000000..9b6f10f --- /dev/null +++ b/rtm_lite/src/test/scala/com/github/agaro1121/rtmlite/AkkaStreamsComponentsTest.scala @@ -0,0 +1,67 @@ +package com.github.agaro1121.rtmlite + +import akka.NotUsed +import akka.actor.ActorSystem +import akka.http.scaladsl.model.ws.TextMessage +import akka.http.scaladsl.model.ws +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{JsonFraming, Sink, Source} +import com.github.agaro1121.rtmlite.client.AkkaStreamsComponents +import com.github.agaro1121.sharedevents.models.Message +import org.scalatest.{Matchers, WordSpecLike} +import io.circe.syntax._ +import com.github.agaro1121.sharedevents.marshalling.GeneralEventEncoders._ +import org.scalatest.concurrent.{PatienceConfiguration, ScalaFutures} + +import scala.concurrent.{Await, Future} +import concurrent.duration._ +import scala.io.StdIn +import org.scalatest.time._ + +class AkkaStreamsComponentsTest extends WordSpecLike with Matchers with AkkaStreamsComponents + with ScalaFutures { + + implicit override val patienceConfig: PatienceConfig = + PatienceConfig(timeout = Span(3, Minutes), interval = Span(20, Millis)) + + + "Stuff" should { + "just work" in { + implicit val a = ActorSystem() + implicit val ma = ActorMaterializer() + import a.dispatcher + + def timedFuture[T](future: Future[T]) = { + val start = System.currentTimeMillis() + future.onComplete({ + case _ => println(s"Future took ${System.currentTimeMillis() - start} ms") + }) + future + } + + val m = Message(None, "ChannelId", "UserId", "someText", "ts", None, None) + val wsMessage: ws.Message = TextMessage(m.asJson.toString()) + + val source: Source[ws.Message, NotUsed] = + Source.fromIterator(() => Stream.continually(wsMessage).take(100000000).iterator) + + /*val result = Await.result(source + .via(wsMessage2SlackMessage) + .runWith(Sink.seq), + 5 seconds + )*/ + + val f = source.via(wsMessage2SlackMessage).to(Sink.ignore) + + whenReady( +// timedFuture(source.via(wsMessage2SlackMessage).runWith(Sink.ignore)) + timedFuture(source.via(wsMessage2Json).via(json2SlackMessage).runWith(Sink.ignore)) + ){ + println + } + + + } + } + +} diff --git a/rtm_lite_benchmark/src/main/scala/sample/HelloWorldTest.scala b/rtm_lite_benchmark/src/main/scala/sample/HelloWorldTest.scala new file mode 100644 index 0000000..4d53a61 --- /dev/null +++ b/rtm_lite_benchmark/src/main/scala/sample/HelloWorldTest.scala @@ -0,0 +1,151 @@ +package sample + +import java.util.concurrent.TimeUnit + +import akka.{Done, NotUsed} +import akka.actor.ActorSystem +import akka.http.scaladsl.model.ws +import akka.http.scaladsl.model.ws.TextMessage +import akka.http.scaladsl.model.ws.TextMessage.Strict +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Flow, JsonFraming, Keep, RunnableGraph, Sink, Source} +import com.github.agaro1121.core.utils.JsonUtils +import com.github.agaro1121.rtmlite.client.AkkaStreamsComponents +import com.github.agaro1121.sharedevents.marshalling.{GeneralEventDecoders, GeneralEventEncoders} +import com.github.agaro1121.sharedevents.models +import com.github.agaro1121.sharedevents.models.Message +import com.typesafe.config.ConfigFactory +import io.circe.parser.parse +import io.circe.{Json, ParsingFailure} +import org.openjdk.jmh.annotations._ +import io.circe.syntax._ + +import scala.concurrent.duration._ +import scala.concurrent.{Await, Future} + +object HelloWorldTest { + val numOfMessages = 1000 +} + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@BenchmarkMode(Array(Mode.Throughput)) +class HelloWorldTest extends AkkaStreamsComponents with GeneralEventEncoders with GeneralEventDecoders { + import HelloWorldTest._ + + val configs = ConfigFactory.parseString( + """ + akka.actor.default-dispatcher { + executor = "fork-join-executor" + fork-join-executor { + parallelism-factor = 1 + } + } + """ + ) + + implicit val a = ActorSystem("testingSystem", configs) + implicit val ma = ActorMaterializer() + import a.dispatcher + + + val m = Message(None, "ChannelId", "UserId", "someText", "ts", None, None) + + val json1 = TextMessage("""{"some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage"}""") + val wsMessage: ws.Message = TextMessage(m.asJson.toString()) + def pickMessage(n: Int): ws.Message = n match { + case n if n % 2 == 0 => wsMessage + case _ => json1 + } + + val source: Source[ws.Message, NotUsed] = + // Source.repeat(wsMessage).take(numOfMessages) + Source.fromIterator(() => Iterator.tabulate(numOfMessages)(n => pickMessage(n))) + + val wsMessage2Json2: Flow[ws.Message, Either[ParsingFailure, Json], NotUsed] = + Flow[ws.Message] + .collect { + case Strict(json) => + parse(json).map(JsonUtils.convertTypeFieldToCapitalCamel) + } + + val json2SlackMessage2: Flow[Either[ParsingFailure, Json], models.Message, NotUsed] = + Flow[Either[ParsingFailure, Json]] + .map { + _.flatMap(_.as[models.Message]) + }.collect { + case Right(msg) => msg + } + + val graph1: RunnableGraph[Future[Done]] = + source + .via(wsMessage2SlackMessage) + .toMat(Sink.ignore)(Keep.right) + + val graph2: RunnableGraph[Future[Done]] = + source + .via(wsMessage2Json).via(json2SlackMessage) + .toMat(Sink.ignore)(Keep.right) + + val graph3: RunnableGraph[Future[Done]] = + source + .via(wsMessage2Json2).via(json2SlackMessage) + .toMat(Sink.ignore)(Keep.right) + + val graph4: RunnableGraph[Future[Done]] = + source + .via(wsMessage2Json).via(json2SlackMessage2) + .toMat(Sink.ignore)(Keep.right) + + val graph5: RunnableGraph[Future[Done]] = + source + .via(wsMessage2Json2).via(json2SlackMessage2) + .toMat(Sink.ignore)(Keep.right) + + val graph6: RunnableGraph[Future[Done]] = + source + .via(wsMessage2Json2).async.via(json2SlackMessage2) + .toMat(Sink.ignore)(Keep.right) + + @TearDown + def shutdown(): Unit = { + Await.result(a.terminate(), 5.seconds) + } + + @Benchmark + @OperationsPerInvocation(1000) + def withAsync(): Unit = { + Await.result(graph1.run(), Duration.Inf) + } + + @Benchmark + @OperationsPerInvocation(1000) + def withoutAsync(): Unit = { + Await.result(graph2.run(), Duration.Inf) + } + + @Benchmark + @OperationsPerInvocation(1000) + def withWsMessage2Json2(): Unit = { + Await.result(graph3.run(), Duration.Inf) + } + + @Benchmark + @OperationsPerInvocation(1000) + def withJson2SlackMessage2(): Unit = { + Await.result(graph4.run(), Duration.Inf) + } + + @Benchmark + @OperationsPerInvocation(1000) + def withBoth2(): Unit = { + Await.result(graph5.run(), Duration.Inf) + } + + @Benchmark + @OperationsPerInvocation(1000) + def withBothAndAsync(): Unit = { + Await.result(graph6.run(), Duration.Inf) + } + +}