Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Perf tests #19

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 11 additions & 12 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,18 @@ val rtmLite = project
.settings(sharedPublishSettings: _*)
.enablePlugins(GitVersioning)

/*
// Deprecated
val rtm = project
.in(file("rtm"))
.settings(name := "scala-slack-rtm")
.settings(compileSettings)
.configs(IntegrationTest)
.settings(Defaults.itSettings: _*)
.settings(libraryDependencies ++= Seq(Dependencies.ScalaTest, Dependencies.ScalaTestIt))
val rtmLiteBenchmark = project
.in(file("rtm_lite_benchmark"))
.settings(name := "scala-slack-rtm-lite-benchmark")
// .settings(compileSettings)
// .configs(IntegrationTest)
// .settings(Defaults.itSettings: _*)
// .settings(libraryDependencies ++= Seq(Dependencies.ScalaTest, Dependencies.ScalaTestIt))
.dependsOn(rtmLite)
.settings(sharedPublishSettings: _*)
.enablePlugins(GitVersioning)
*/
// .settings(sharedPublishSettings: _*)
// .enablePlugins(GitVersioning)
.enablePlugins(JmhPlugin)


val web = project
.in(file("web"))
Expand Down
3 changes: 2 additions & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.9.3")
addSbtPlugin("org.foundweekends" % "sbt-bintray" % "0.5.1")
addSbtPlugin("com.dwijnand" % "sbt-travisci" % "1.1.1")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0")
addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.1.0")
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.3.4")
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.github.agaro1121.rtmlite.client
import akka.NotUsed
import akka.http.scaladsl.model.ws
import akka.http.scaladsl.model.ws.TextMessage
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.{Flow, JsonFraming}
import com.github.agaro1121.core.utils.JsonUtils
import com.github.agaro1121.sharedevents.models
import io.circe.syntax._
Expand All @@ -13,7 +13,7 @@ import com.github.agaro1121.sharedevents.marshalling.GeneralEventEncoders.Messag
import com.github.agaro1121.sharedevents.marshalling.GeneralEventDecoders.MessageDecoder
import cats.syntax.either._ //for calling `map` on [[Either]] in Scala v2.11.x

private[client] trait AkkaStreamsComponents {
trait AkkaStreamsComponents {

val wsMessage2Json: Flow[ws.Message, Either[ParsingFailure, Json], NotUsed] =
Flow[ws.Message]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package com.github.agaro1121.rtmlite

import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.model.ws.TextMessage
import akka.http.scaladsl.model.ws
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{JsonFraming, Sink, Source}
import com.github.agaro1121.rtmlite.client.AkkaStreamsComponents
import com.github.agaro1121.sharedevents.models.Message
import org.scalatest.{Matchers, WordSpecLike}
import io.circe.syntax._
import com.github.agaro1121.sharedevents.marshalling.GeneralEventEncoders._
import org.scalatest.concurrent.{PatienceConfiguration, ScalaFutures}

import scala.concurrent.{Await, Future}
import concurrent.duration._
import scala.io.StdIn
import org.scalatest.time._

class AkkaStreamsComponentsTest extends WordSpecLike with Matchers with AkkaStreamsComponents
with ScalaFutures {

implicit override val patienceConfig: PatienceConfig =
PatienceConfig(timeout = Span(3, Minutes), interval = Span(20, Millis))


"Stuff" should {
"just work" in {
implicit val a = ActorSystem()
implicit val ma = ActorMaterializer()
import a.dispatcher

def timedFuture[T](future: Future[T]) = {
val start = System.currentTimeMillis()
future.onComplete({
case _ => println(s"Future took ${System.currentTimeMillis() - start} ms")
})
future
}

val m = Message(None, "ChannelId", "UserId", "someText", "ts", None, None)
val wsMessage: ws.Message = TextMessage(m.asJson.toString())

val source: Source[ws.Message, NotUsed] =
Source.fromIterator(() => Stream.continually(wsMessage).take(100000000).iterator)

/*val result = Await.result(source
.via(wsMessage2SlackMessage)
.runWith(Sink.seq),
5 seconds
)*/

val f = source.via(wsMessage2SlackMessage).to(Sink.ignore)

whenReady(
// timedFuture(source.via(wsMessage2SlackMessage).runWith(Sink.ignore))
timedFuture(source.via(wsMessage2Json).via(json2SlackMessage).runWith(Sink.ignore))
){
println
}


}
}

}
151 changes: 151 additions & 0 deletions rtm_lite_benchmark/src/main/scala/sample/HelloWorldTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package sample

import java.util.concurrent.TimeUnit

import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.http.scaladsl.model.ws
import akka.http.scaladsl.model.ws.TextMessage
import akka.http.scaladsl.model.ws.TextMessage.Strict
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Flow, JsonFraming, Keep, RunnableGraph, Sink, Source}
import com.github.agaro1121.core.utils.JsonUtils
import com.github.agaro1121.rtmlite.client.AkkaStreamsComponents
import com.github.agaro1121.sharedevents.marshalling.{GeneralEventDecoders, GeneralEventEncoders}
import com.github.agaro1121.sharedevents.models
import com.github.agaro1121.sharedevents.models.Message
import com.typesafe.config.ConfigFactory
import io.circe.parser.parse
import io.circe.{Json, ParsingFailure}
import org.openjdk.jmh.annotations._
import io.circe.syntax._

import scala.concurrent.duration._
import scala.concurrent.{Await, Future}

object HelloWorldTest {
val numOfMessages = 1000
}

@State(Scope.Benchmark)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@BenchmarkMode(Array(Mode.Throughput))
class HelloWorldTest extends AkkaStreamsComponents with GeneralEventEncoders with GeneralEventDecoders {
import HelloWorldTest._

val configs = ConfigFactory.parseString(
"""
akka.actor.default-dispatcher {
executor = "fork-join-executor"
fork-join-executor {
parallelism-factor = 1
}
}
"""
)

implicit val a = ActorSystem("testingSystem", configs)
implicit val ma = ActorMaterializer()
import a.dispatcher


val m = Message(None, "ChannelId", "UserId", "someText", "ts", None, None)

val json1 = TextMessage("""{"some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage","some":"garbage"}""")
val wsMessage: ws.Message = TextMessage(m.asJson.toString())
def pickMessage(n: Int): ws.Message = n match {
case n if n % 2 == 0 => wsMessage
case _ => json1
}

val source: Source[ws.Message, NotUsed] =
// Source.repeat(wsMessage).take(numOfMessages)
Source.fromIterator(() => Iterator.tabulate(numOfMessages)(n => pickMessage(n)))

val wsMessage2Json2: Flow[ws.Message, Either[ParsingFailure, Json], NotUsed] =
Flow[ws.Message]
.collect {
case Strict(json) =>
parse(json).map(JsonUtils.convertTypeFieldToCapitalCamel)
}

val json2SlackMessage2: Flow[Either[ParsingFailure, Json], models.Message, NotUsed] =
Flow[Either[ParsingFailure, Json]]
.map {
_.flatMap(_.as[models.Message])
}.collect {
case Right(msg) => msg
}

val graph1: RunnableGraph[Future[Done]] =
source
.via(wsMessage2SlackMessage)
.toMat(Sink.ignore)(Keep.right)

val graph2: RunnableGraph[Future[Done]] =
source
.via(wsMessage2Json).via(json2SlackMessage)
.toMat(Sink.ignore)(Keep.right)

val graph3: RunnableGraph[Future[Done]] =
source
.via(wsMessage2Json2).via(json2SlackMessage)
.toMat(Sink.ignore)(Keep.right)

val graph4: RunnableGraph[Future[Done]] =
source
.via(wsMessage2Json).via(json2SlackMessage2)
.toMat(Sink.ignore)(Keep.right)

val graph5: RunnableGraph[Future[Done]] =
source
.via(wsMessage2Json2).via(json2SlackMessage2)
.toMat(Sink.ignore)(Keep.right)

val graph6: RunnableGraph[Future[Done]] =
source
.via(wsMessage2Json2).async.via(json2SlackMessage2)
.toMat(Sink.ignore)(Keep.right)

@TearDown
def shutdown(): Unit = {
Await.result(a.terminate(), 5.seconds)
}

@Benchmark
@OperationsPerInvocation(1000)
def withAsync(): Unit = {
Await.result(graph1.run(), Duration.Inf)
}

@Benchmark
@OperationsPerInvocation(1000)
def withoutAsync(): Unit = {
Await.result(graph2.run(), Duration.Inf)
}

@Benchmark
@OperationsPerInvocation(1000)
def withWsMessage2Json2(): Unit = {
Await.result(graph3.run(), Duration.Inf)
}

@Benchmark
@OperationsPerInvocation(1000)
def withJson2SlackMessage2(): Unit = {
Await.result(graph4.run(), Duration.Inf)
}

@Benchmark
@OperationsPerInvocation(1000)
def withBoth2(): Unit = {
Await.result(graph5.run(), Duration.Inf)
}

@Benchmark
@OperationsPerInvocation(1000)
def withBothAndAsync(): Unit = {
Await.result(graph6.run(), Duration.Inf)
}

}