From dd0f8a6dc33b42c19932da704bb2d43caa40cf6b Mon Sep 17 00:00:00 2001 From: jendakol Date: Mon, 3 Sep 2018 13:03:55 +0200 Subject: [PATCH] Finally tagless reworked (#20) * Http4s for F: Async * Akka for F: Effect * FunctorK for GrpcJsonBridge --- README.md | 4 + .../grpc/jsonbridge/akkahttp/AkkaHttp.scala | 26 +++--- .../jsonbridge/akkahttp/AkkaHttpTest.scala | 23 +++-- build.sbt | 4 +- .../grpc/jsonbridge/GrpcJsonBridgeBase.scala | 21 +++-- .../com/avast/grpc/jsonbridge/Macros.scala | 14 ++- .../avast/grpc/jsonbridge/jsonbridge.scala | 38 ++++++--- .../grpc/jsonbridge/GrpcJsonBridgeTest.scala | 16 +++- http4s/README.md | 3 +- .../avast/grpc/jsonbrige/http4s/Http4s.scala | 71 ++++++++-------- .../grpc/jsonbrige/http4s/Http4sTest.scala | 85 +++++++++++-------- 11 files changed, 186 insertions(+), 119 deletions(-) diff --git a/README.md b/README.md index 64bc301c..c3fc2fb1 100644 --- a/README.md +++ b/README.md @@ -10,6 +10,10 @@ For requests/responses mapping a [standard GPB <-> JSON mapping](https://develop It uses Scala macros for creating mapping between runtime-provided service and method names to pregenerated Java gRPC classes. In case you don't want to use _plain Java API_ you can easily use it together with [Cactus](https://github.com/avast/cactus). +The API is _finally tagless_ (read more e.g. [here](https://www.beyondthelines.net/programming/introduction-to-tagless-final/)) +meaning it can use whatever [`F[_]: cats.effect.Effect`](https://typelevel.org/cats-effect/typeclasses/effect.html) +(e.g. `cats.effect.IO`, `monix.eval.Task`). + There are several modules: 1. core - for basic implementation-agnostic usage 1. [http4s](http4s) - integration with [http4s](https://http4s.org/) webserver diff --git a/akka-http/src/main/scala/com/avast/grpc/jsonbridge/akkahttp/AkkaHttp.scala b/akka-http/src/main/scala/com/avast/grpc/jsonbridge/akkahttp/AkkaHttp.scala index eedde95b..8806ebf8 100644 --- a/akka-http/src/main/scala/com/avast/grpc/jsonbridge/akkahttp/AkkaHttp.scala +++ b/akka-http/src/main/scala/com/avast/grpc/jsonbridge/akkahttp/AkkaHttp.scala @@ -5,12 +5,15 @@ import akka.http.scaladsl.model.headers.`Content-Type` import akka.http.scaladsl.server.Directives._ import akka.http.scaladsl.server.{PathMatcher, Route} import cats.data.NonEmptyList +import cats.effect.Effect +import com.avast.grpc.jsonbridge.GrpcJsonBridge import com.avast.grpc.jsonbridge.GrpcJsonBridge.GrpcHeader -import com.avast.grpc.jsonbridge.{GrpcJsonBridge, ToTask} import io.grpc.BindableService import io.grpc.Status.Code +import monix.eval.Task import monix.execution.Scheduler +import scala.concurrent.ExecutionContext import scala.language.higherKinds import scala.util.control.NonFatal import scala.util.{Failure, Success} @@ -21,9 +24,11 @@ object AkkaHttp { ContentType.WithMissingCharset(MediaType.applicationWithOpenCharset("json")) } - def apply[F[_]: ToTask](configuration: Configuration)(bridges: GrpcJsonBridge[F, _ <: BindableService]*)( - implicit sch: Scheduler): Route = { - val services = bridges.map(s => (s.serviceName, s): (String, GrpcJsonBridge[F, _])).toMap + def apply[F[_]: Effect](configuration: Configuration)(bridges: GrpcJsonBridge[F, _ <: BindableService]*)( + implicit ec: ExecutionContext): Route = { + implicit val sch: Scheduler = Scheduler(ec) + + val bridgesMap = bridges.map(s => (s.serviceName, s): (String, GrpcJsonBridge[F, _])).toMap val pathPattern = configuration.pathPrefix .map { @@ -42,12 +47,13 @@ object AkkaHttp { extractRequest { req => req.header[`Content-Type`] match { case Some(`JsonContentType`) => - services.get(serviceName) match { + bridgesMap.get(serviceName) match { case Some(service) => entity(as[String]) { json => - val methodCall = implicitly[ToTask[F]].apply { - service.invokeGrpcMethod(methodName, json, mapHeaders(req.headers)) - }.runAsync + val methodCall = + Task.fromEffect { + service.invokeGrpcMethod(methodName, json, mapHeaders(req.headers)) + }.runAsync onComplete(methodCall) { case Success(Right(r)) => @@ -69,7 +75,7 @@ object AkkaHttp { } } ~ get { path(Segment) { serviceName => - services.get(serviceName) match { + bridgesMap.get(serviceName) match { case Some(service) => complete(service.methodsNames.mkString("\n")) @@ -78,7 +84,7 @@ object AkkaHttp { } } ~ get { path(PathEnd) { - complete(services.values.flatMap(s => s.methodsNames).mkString("\n")) + complete(bridgesMap.values.flatMap(s => s.methodsNames).mkString("\n")) } } } diff --git a/akka-http/src/test/scala/com/avast/grpc/jsonbridge/akkahttp/AkkaHttpTest.scala b/akka-http/src/test/scala/com/avast/grpc/jsonbridge/akkahttp/AkkaHttpTest.scala index 3786722a..775a5f6d 100644 --- a/akka-http/src/test/scala/com/avast/grpc/jsonbridge/akkahttp/AkkaHttpTest.scala +++ b/akka-http/src/test/scala/com/avast/grpc/jsonbridge/akkahttp/AkkaHttpTest.scala @@ -2,16 +2,18 @@ package com.avast.grpc.jsonbridge.akkahttp import akka.http.scaladsl.model.HttpHeader.ParsingResult.Ok import akka.http.scaladsl.model.headers.`Content-Type` -import akka.http.scaladsl.model.{ContentType, HttpHeader, MediaType, StatusCodes} +import akka.http.scaladsl.model._ import akka.http.scaladsl.testkit.ScalatestRouteTest import cats.data.NonEmptyList +import cats.effect.Effect import com.avast.grpc.jsonbridge._ -import com.avast.grpc.jsonbridge.test.{TestApi, TestApiService} import com.avast.grpc.jsonbridge.test.TestApi.{GetRequest, GetResponse} import com.avast.grpc.jsonbridge.test.TestApiServiceGrpc.{TestApiServiceFutureStub, TestApiServiceImplBase} +import com.avast.grpc.jsonbridge.test.{TestApi, TestApiService} import io.grpc._ import io.grpc.stub.StreamObserver import monix.eval.Task +import monix.execution.Scheduler import org.scalatest.FunSuite import scala.collection.JavaConverters._ @@ -21,6 +23,9 @@ import scala.util.Random class AkkaHttpTest extends FunSuite with ScalatestRouteTest { + // this is workaround which solves presence of ExecutionContextExecutor in RouteTest from AKKA + private implicit val taskEff: Effect[Task] = Task.catsEffect(Scheduler.global) + case class MyRequest(names: Seq[String]) case class MyResponse(results: Map[String, Int]) @@ -40,7 +45,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest { } }.createGrpcJsonBridge[Task, TestApiServiceFutureStub]() - val route = AkkaHttp[Task](Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global) + val route = AkkaHttp[Task](Configuration.Default)(bridge) Post(s"/${classOf[TestApiService].getName.replace("$", ".")}/Get", """ { "names": ["abc","def"] } """) .withHeaders(AkkaHttp.JsonContentType) ~> route ~> check { @@ -63,7 +68,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest { val configuration = Configuration.Default.copy(pathPrefix = Some(NonEmptyList.of("abc", "def"))) - val route = AkkaHttp(configuration)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global) + val route = AkkaHttp(configuration)(bridge) Post(s"/abc/def/${classOf[TestApiService].getName.replace("$", ".")}/Get", """ { "names": ["abc","def"] } """) .withHeaders(AkkaHttp.JsonContentType) ~> route ~> check { @@ -82,7 +87,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest { } }.createGrpcJsonBridge[Task, TestApiServiceFutureStub]() - val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global) + val route = AkkaHttp(Configuration.Default)(bridge) // empty body Post(s"/${classOf[TestApiService].getName.replace("$", ".")}/Get", "") @@ -103,7 +108,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest { } }.createGrpcJsonBridge[Task, TestApiServiceFutureStub]() - val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global) + val route = AkkaHttp(Configuration.Default)(bridge) Post(s"/${classOf[TestApiService].getName.replace("$", ".")}/Get", """ { "names": ["abc","def"] } """) .withHeaders(AkkaHttp.JsonContentType) ~> route ~> check { @@ -114,7 +119,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest { test("provides service description") { val bridge = new TestApiServiceImplBase {}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]() - val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global) + val route = AkkaHttp(Configuration.Default)(bridge) Get(s"/${classOf[TestApiService].getName.replace("$", ".")}") ~> route ~> check { assertResult(StatusCodes.OK)(status) @@ -127,7 +132,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest { test("provides services description") { val bridge = new TestApiServiceImplBase {}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]() - val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global) + val route = AkkaHttp(Configuration.Default)(bridge) Get("/") ~> route ~> check { assertResult(StatusCodes.OK)(status) @@ -161,7 +166,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest { } ) - val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global) + val route = AkkaHttp(Configuration.Default)(bridge) val Ok(customHeaderToBeSent, _) = HttpHeader.parse("The-Header", headerValue) diff --git a/build.sbt b/build.sbt index c896e712..639ef05d 100644 --- a/build.sbt +++ b/build.sbt @@ -26,6 +26,7 @@ lazy val javaSettings = Seq( lazy val macroSettings = Seq( addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full), + addCompilerPlugin("org.spire-math" % "kind-projector" % "0.9.7" cross CrossVersion.binary), libraryDependencies ++= Seq( "org.scala-lang" % "scala-reflect" % scalaVersion.value, "org.scala-lang" % "scala-compiler" % scalaVersion.value @@ -112,7 +113,8 @@ lazy val core = (project in file("core")).settings( "io.grpc" % "grpc-protobuf" % Versions.grpcVersion, "io.grpc" % "grpc-stub" % Versions.grpcVersion, "org.typelevel" %% "cats-core" % "1.2.0", - "io.monix" % "monix_2.12" % "3.0.0-RC1", + "io.monix" %% "monix" % "3.0.0-RC1", + "com.kailuowang" %% "mainecoon-core" % "0.6.4", "com.typesafe.scala-logging" %% "scala-logging" % "3.9.0", "org.slf4j" % "jul-to-slf4j" % "1.7.25", "org.slf4j" % "jcl-over-slf4j" % "1.7.25", diff --git a/core/src/main/scala/com/avast/grpc/jsonbridge/GrpcJsonBridgeBase.scala b/core/src/main/scala/com/avast/grpc/jsonbridge/GrpcJsonBridgeBase.scala index 4b3b3a1a..7d7c9055 100644 --- a/core/src/main/scala/com/avast/grpc/jsonbridge/GrpcJsonBridgeBase.scala +++ b/core/src/main/scala/com/avast/grpc/jsonbridge/GrpcJsonBridgeBase.scala @@ -1,19 +1,23 @@ package com.avast.grpc.jsonbridge +import cats.effect.{Async, Effect} +import cats.syntax.all._ import com.avast.grpc.jsonbridge.GrpcJsonBridge.GrpcHeader import com.google.protobuf.Message import com.google.protobuf.util.JsonFormat import com.typesafe.scalalogging.StrictLogging +import io.grpc._ import io.grpc.stub.MetadataUtils -import io.grpc.{Metadata, Status, StatusException, StatusRuntimeException} import monix.eval.Task +import monix.execution.Scheduler import scala.concurrent.{ExecutionContext, Future} +import scala.language.higherKinds import scala.util.control.NonFatal /** This is trait for internal usage. You should not use it directly. */ -trait GrpcJsonBridgeBase[Stub <: io.grpc.stub.AbstractStub[Stub]] extends StrictLogging { +abstract class GrpcJsonBridgeBase[F[_], Stub <: io.grpc.stub.AbstractStub[Stub]](implicit protected val F: Async[F]) extends StrictLogging { protected def newFutureStub: Stub protected val parser: JsonFormat.Parser = JsonFormat.parser() @@ -21,7 +25,7 @@ trait GrpcJsonBridgeBase[Stub <: io.grpc.stub.AbstractStub[Stub]] extends Strict // https://groups.google.com/forum/#!topic/grpc-io/1-KMubq1tuc protected def withNewClientStub[A](headers: Seq[GrpcHeader])(f: Stub => Future[A])( - implicit ec: ExecutionContext): Task[Either[Status, A]] = { + implicit ec: ExecutionContext): F[Either[Status, A]] = { val metadata = new Metadata() headers.foreach(h => metadata.put(Metadata.Key.of(h.name, Metadata.ASCII_STRING_MARSHALLER), h.value)) @@ -31,8 +35,9 @@ trait GrpcJsonBridgeBase[Stub <: io.grpc.stub.AbstractStub[Stub]] extends Strict try { Task .deferFuture(f(clientFutureStub)) - .map(Right(_)) - .onErrorRecover { + .to[F](F, Scheduler(ec)) + .map(Right(_): Either[Status, A]) + .recover { case e: StatusException if e.getStatus.getCode == Status.Code.UNKNOWN => Left(Status.INTERNAL) case e: StatusRuntimeException if e.getStatus.getCode == Status.Code.UNKNOWN => Left(Status.INTERNAL) case e: StatusException => Left(e.getStatus) @@ -42,11 +47,11 @@ trait GrpcJsonBridgeBase[Stub <: io.grpc.stub.AbstractStub[Stub]] extends Strict Left(Status.INTERNAL.withCause(e)) } } catch { - case e: StatusException if e.getStatus.getCode == Status.Code.UNKNOWN => Task.now(Left(Status.INTERNAL)) - case e: StatusRuntimeException if e.getStatus.getCode == Status.Code.UNKNOWN => Task.now(Left(Status.INTERNAL)) + case e: StatusException if e.getStatus.getCode == Status.Code.UNKNOWN => F.pure(Left(Status.INTERNAL)) + case e: StatusRuntimeException if e.getStatus.getCode == Status.Code.UNKNOWN => F.pure(Left(Status.INTERNAL)) case NonFatal(e) => logger.debug("Error while executing the request", e) - Task.now(Left(Status.INTERNAL.withCause(e))) + F.pure(Left(Status.INTERNAL.withCause(e))) } // just abandon the stub... diff --git a/core/src/main/scala/com/avast/grpc/jsonbridge/Macros.scala b/core/src/main/scala/com/avast/grpc/jsonbridge/Macros.scala index 8160d896..a2776c37 100644 --- a/core/src/main/scala/com/avast/grpc/jsonbridge/Macros.scala +++ b/core/src/main/scala/com/avast/grpc/jsonbridge/Macros.scala @@ -15,7 +15,7 @@ class Macros(val c: blackbox.Context) { import c.universe._ def generateGrpcJsonBridge[F[_], GrpcServiceStub <: BindableService, GrpcClientStub <: AbstractStub[GrpcClientStub]: WeakTypeTag]( - interceptors: c.Tree*)(ec: c.Tree, ex: c.Tree, ct: c.Tree, ct2: c.Tree): c.Expr[GrpcJsonBridge[F, GrpcServiceStub]] = { + interceptors: c.Tree*)(ec: c.Tree, ex: c.Tree, ct: c.Tree, ct2: c.Tree, asf: c.Tree): c.Expr[GrpcJsonBridge[F, GrpcServiceStub]] = { val clientType = weakTypeOf[GrpcClientStub] val serviceTypeRaw = extractSymbolFromClassTag(ct) @@ -51,7 +51,7 @@ class Macros(val c: blackbox.Context) { val t = q""" - new _root_.com.avast.grpc.jsonbridge.GrpcJsonBridge[$fType, $serviceTypeRaw] with _root_.com.avast.grpc.jsonbridge.GrpcJsonBridgeBase[$clientType] { + new _root_.com.avast.grpc.jsonbridge.GrpcJsonBridgeBase[$fType, $clientType]()($asf) with _root_.com.avast.grpc.jsonbridge.GrpcJsonBridge[$fType, $serviceTypeRaw] { import _root_.com.avast.grpc.jsonbridge._ import _root_.cats.instances.future._ import _root_.cats.data._ @@ -68,17 +68,15 @@ class Macros(val c: blackbox.Context) { override def invokeGrpcMethod(name: String, json: => String, headers: => _root_.scala.Seq[com.avast.grpc.jsonbridge.GrpcJsonBridge.GrpcHeader]): $fType[_root_.scala.Either[_root_.io.grpc.Status, String]] = { - val task = try { + try { name match { case ..$methodCases // unsupported method - case _ => monix.eval.Task.now(_root_.scala.Left(_root_.io.grpc.Status.NOT_FOUND)) + case _ => F.pure(_root_.scala.Left(_root_.io.grpc.Status.NOT_FOUND)) } } catch { - case _root_.scala.util.control.NonFatal(e) => _root_.monix.eval.Task.now(_root_.scala.Left(_root_.io.grpc.Status.INTERNAL)) + case _root_.scala.util.control.NonFatal(e) => F.pure(_root_.scala.Left(_root_.io.grpc.Status.INTERNAL)) } - - implicitly[_root_.cats.arrow.FunctionK[Task, $fType]].apply(task) } override val methodsNames: _root_.scala.Seq[String] = ${methodsNames(serviceType)} @@ -110,7 +108,7 @@ class Macros(val c: blackbox.Context) { cq""" ${firstUpper(name.toString)} => (for { - request <- _root_.cats.data.EitherT.fromEither[_root_.monix.eval.Task](fromJson(${request.companion}.getDefaultInstance, json)) + request <- _root_.cats.data.EitherT.fromEither[$fType](fromJson(${request.companion}.getDefaultInstance, json)) result <- _root_.cats.data.EitherT { withNewClientStub(headers) { _.$name(request).asScala(executor).map(toJson(_)) } } diff --git a/core/src/main/scala/com/avast/grpc/jsonbridge/jsonbridge.scala b/core/src/main/scala/com/avast/grpc/jsonbridge/jsonbridge.scala index 2dc55b46..bf2e13dd 100644 --- a/core/src/main/scala/com/avast/grpc/jsonbridge/jsonbridge.scala +++ b/core/src/main/scala/com/avast/grpc/jsonbridge/jsonbridge.scala @@ -2,29 +2,27 @@ package com.avast.grpc import java.util.concurrent.Executor -import cats.arrow.FunctionK -import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture} +import cats.effect.Async +import cats.~> +import com.google.common.util.concurrent._ +import io.grpc._ import io.grpc.stub.AbstractStub -import io.grpc.{BindableService, ServerInterceptor} -import monix.eval.Task +import mainecoon.FunctorK -import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.concurrent._ import scala.language.experimental.macros import scala.language.higherKinds import scala.reflect.ClassTag package object jsonbridge { - type ToTask[A[_]] = FunctionK[A, Task] - - implicit val fkTaskIdentity: FunctionK[Task, Task] = FunctionK.id - implicit class DeriveBridge[GrpcServiceStub <: BindableService](val serviceStub: GrpcServiceStub) extends AnyVal { def createGrpcJsonBridge[F[_], GrpcClientStub <: AbstractStub[GrpcClientStub]](interceptors: ServerInterceptor*)( implicit ec: ExecutionContext, ex: Executor, ct: ClassTag[GrpcServiceStub], - ct2: ClassTag[F[_]]): GrpcJsonBridge[F, GrpcServiceStub] = + ct2: ClassTag[F[_]], + asf: Async[F]): GrpcJsonBridge[F, GrpcServiceStub] = macro Macros.generateGrpcJsonBridge[F, GrpcServiceStub, GrpcClientStub] } @@ -41,4 +39,24 @@ package object jsonbridge { } } + implicit class GrpcJsonBridgeOps[F[_], Service <: BindableService](val bridge: GrpcJsonBridge[F, Service]) extends AnyVal { + def mapK[G[_]](implicit fToG: ~>[F, G]): GrpcJsonBridge[G, Service] = bridgeFunctorK[Service].mapK(bridge)(fToG) + } + + implicit def bridgeFunctorK[Service <: BindableService]: FunctorK[GrpcJsonBridge[?[_], Service]] = + new FunctorK[GrpcJsonBridge[?[_], Service]] { + override def mapK[F[_], G[_]](bridge: GrpcJsonBridge[F, Service])(fToG: ~>[F, G]): GrpcJsonBridge[G, Service] = + new GrpcJsonBridge[G, Service] { + override def invokeGrpcMethod(name: String, + json: => String, + headers: => Seq[GrpcJsonBridge.GrpcHeader]): G[Either[Status, String]] = fToG { + bridge.invokeGrpcMethod(name, json, headers) + } + override def methodsNames: Seq[String] = bridge.methodsNames + + override def serviceName: String = bridge.serviceName + override def close(): Unit = bridge.close() + } + } + } diff --git a/core/src/test/scala/com/avast/grpc/jsonbridge/GrpcJsonBridgeTest.scala b/core/src/test/scala/com/avast/grpc/jsonbridge/GrpcJsonBridgeTest.scala index 4772906c..49ab8b5f 100644 --- a/core/src/test/scala/com/avast/grpc/jsonbridge/GrpcJsonBridgeTest.scala +++ b/core/src/test/scala/com/avast/grpc/jsonbridge/GrpcJsonBridgeTest.scala @@ -1,5 +1,6 @@ package com.avast.grpc.jsonbridge +import cats.~> import com.avast.cactus.grpc.server.GrpcService import com.avast.grpc.jsonbridge.internalPackage.MyServiceImpl import com.avast.grpc.jsonbridge.test.TestApi @@ -11,9 +12,10 @@ import monix.eval.Task import monix.execution.Scheduler.Implicits.global import org.scalatest.FunSuite import org.scalatest.concurrent.ScalaFutures -import org.scalatest.time.{Milliseconds, Seconds, Span} +import org.scalatest.time._ import scala.collection.JavaConverters._ +import scala.concurrent.Future package internalPackage { // this is here to test that package from PROTO fgile is used as "serviceName" @@ -132,4 +134,16 @@ class GrpcJsonBridgeTest extends FunSuite with ScalaFutures { assertResult("""{"results":{"name":42}}""")(response) } + test("functorK") { + assertCompiles( + """ + |val bridge: GrpcJsonBridge[Task, MyServiceImpl] = new MyServiceImpl { + | override def get(request: GetRequest, responseObserver: StreamObserver[GetResponse]): Unit = {} + |}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]() + | + |implicit val taskToFuture: Task ~> Future = new ~>[Task, Future] { override def apply[A](fa: Task[A]): Future[A] = fa.runAsync } + | + |val bridgeFuture: GrpcJsonBridge[Future, MyServiceImpl] = bridge.mapK[Future] + """.stripMargin) + } } diff --git a/http4s/README.md b/http4s/README.md index 4b5437d3..d7b6e88a 100644 --- a/http4s/README.md +++ b/http4s/README.md @@ -18,7 +18,6 @@ libraryDependencies += "com.avast.grpc" %% "grpc-json-bridge-http4s" % "x.x.x" ## Usage ```scala -import cats.effect.IO import com.avast.grpc.jsonbridge.GrpcJsonBridge import com.avast.grpc.jsonbridge.test.TestApiServiceGrpc.TestApiServiceImplBase import com.avast.grpc.jsonbrige.http4s.{Configuration, Http4s} @@ -28,7 +27,7 @@ implicit val scheduler: monix.execution.Scheduler = ??? val bridge: GrpcJsonBridge[Task, TestApiServiceImplBase] = ??? // see core module docs for info about creating the bridge -val service: HttpService[IO] = Http4s(Configuration.Default)(bridge) +val service: HttpService[Task] = Http4s(Configuration.Default)(bridge) ``` See [official docs](https://http4s.org/v0.18/dsl/) for learn about following steps. diff --git a/http4s/src/main/scala/com/avast/grpc/jsonbrige/http4s/Http4s.scala b/http4s/src/main/scala/com/avast/grpc/jsonbrige/http4s/Http4s.scala index 69410a77..ec87b265 100644 --- a/http4s/src/main/scala/com/avast/grpc/jsonbrige/http4s/Http4s.scala +++ b/http4s/src/main/scala/com/avast/grpc/jsonbrige/http4s/Http4s.scala @@ -1,15 +1,14 @@ package com.avast.grpc.jsonbrige.http4s import cats.data.NonEmptyList -import cats.effect.IO +import cats.effect._ +import cats.syntax.all._ +import com.avast.grpc.jsonbridge.GrpcJsonBridge import com.avast.grpc.jsonbridge.GrpcJsonBridge.GrpcHeader -import com.avast.grpc.jsonbridge.{GrpcJsonBridge, ToTask} import com.typesafe.scalalogging.StrictLogging import io.grpc.Status.Code import io.grpc.{BindableService, Status => GrpcStatus} -import monix.execution.Scheduler -import org.http4s.dsl.impl.Root -import org.http4s.dsl.io._ +import org.http4s.dsl.Http4sDsl import org.http4s.headers.{`Content-Type`, `WWW-Authenticate`} import org.http4s.server.middleware.{CORS, CORSConfig} import org.http4s.{Challenge, Header, Headers, HttpService, MediaType, Response} @@ -18,9 +17,11 @@ import scala.language.higherKinds object Http4s extends StrictLogging { - def apply[F[_]: ToTask](configuration: Configuration)(bridges: GrpcJsonBridge[F, _ <: BindableService]*)( - implicit sch: Scheduler): HttpService[IO] = { - val services = bridges.map(s => (s.serviceName, s): (String, GrpcJsonBridge[F, _])).toMap + def apply[F[_]: Sync](configuration: Configuration)(bridges: GrpcJsonBridge[F, _ <: BindableService]*): HttpService[F] = { + implicit val h: Http4sDsl[F] = Http4sDsl[F] + import h._ + + val bridgesMap = bridges.map(s => (s.serviceName, s): (String, GrpcJsonBridge[F, _])).toMap val pathPrefix = configuration.pathPrefix .map(_.foldLeft[Path](Root)(_ / _)) @@ -28,9 +29,9 @@ object Http4s extends StrictLogging { logger.info(s"Creating HTTP4S service proxying gRPC services: ${bridges.map(_.serviceName).mkString("[", ", ", "]")}") - val http4sService = HttpService[IO] { - case _ @ GET -> `pathPrefix` / serviceName if serviceName.nonEmpty => - services.get(serviceName) match { + val http4sService = HttpService[F] { + case _ @GET -> `pathPrefix` / serviceName if serviceName.nonEmpty => + bridgesMap.get(serviceName) match { case Some(service) => Ok { service.methodsNames.mkString("\n") @@ -39,8 +40,8 @@ object Http4s extends StrictLogging { case None => NotFound(s"Service '$serviceName' not found") } - case _ @ GET -> `pathPrefix` => - Ok { services.values.flatMap(s => s.methodsNames).mkString("\n") } + case _ @GET -> `pathPrefix` => + Ok { bridgesMap.values.flatMap(s => s.methodsNames).mkString("\n") } case request @ POST -> `pathPrefix` / serviceName / methodName => val headers = request.headers @@ -48,15 +49,11 @@ object Http4s extends StrictLogging { case Some(Header(_, contentTypeValue)) => `Content-Type`.parse(contentTypeValue) match { case Right(`Content-Type`(MediaType.`application/json`, _)) => - services.get(serviceName) match { + bridgesMap.get(serviceName) match { case Some(service) => request .as[String] - .map(service.invokeGrpcMethod(methodName, _, mapHeaders(headers))) - .flatMap { f => - val future = implicitly[ToTask[F]].apply(f).runAsync - IO.fromFuture(IO(future)) - } + .flatMap(service.invokeGrpcMethod(methodName, _, mapHeaders(headers))) .flatMap { case Right(resp) => Ok(resp, `Content-Type`(MediaType.`application/json`)) case Left(st) => mapStatus(st, configuration) @@ -84,22 +81,26 @@ object Http4s extends StrictLogging { }.toSeq } - private def mapStatus(s: GrpcStatus, configuration: Configuration): IO[Response[IO]] = s.getCode match { - case Code.NOT_FOUND => NotFound() - case Code.INTERNAL => InternalServerError() - case Code.INVALID_ARGUMENT => BadRequest() - case Code.FAILED_PRECONDITION => BadRequest() - case Code.CANCELLED => RequestTimeout() - case Code.UNAVAILABLE => ServiceUnavailable() - case Code.DEADLINE_EXCEEDED => RequestTimeout() - case Code.UNAUTHENTICATED => Unauthorized(configuration.wwwAuthenticate) - case Code.PERMISSION_DENIED => Forbidden() - case Code.UNIMPLEMENTED => NotImplemented() - case Code.RESOURCE_EXHAUSTED => TooManyRequests() - case Code.ABORTED => InternalServerError() - case Code.DATA_LOSS => InternalServerError() - - case _ => InternalServerError() + private def mapStatus[F[_]: Sync](s: GrpcStatus, configuration: Configuration)(implicit h: Http4sDsl[F]): F[Response[F]] = { + import h._ + + s.getCode match { + case Code.NOT_FOUND => NotFound() + case Code.INTERNAL => InternalServerError() + case Code.INVALID_ARGUMENT => BadRequest() + case Code.FAILED_PRECONDITION => BadRequest() + case Code.CANCELLED => RequestTimeout() + case Code.UNAVAILABLE => ServiceUnavailable() + case Code.DEADLINE_EXCEEDED => RequestTimeout() + case Code.UNAUTHENTICATED => Unauthorized(configuration.wwwAuthenticate) + case Code.PERMISSION_DENIED => Forbidden() + case Code.UNIMPLEMENTED => NotImplemented() + case Code.RESOURCE_EXHAUSTED => TooManyRequests() + case Code.ABORTED => InternalServerError() + case Code.DATA_LOSS => InternalServerError() + + case _ => InternalServerError() + } } } diff --git a/http4s/src/test/scala/com/avast/grpc/jsonbrige/http4s/Http4sTest.scala b/http4s/src/test/scala/com/avast/grpc/jsonbrige/http4s/Http4sTest.scala index 0bc4a94e..86e802f3 100644 --- a/http4s/src/test/scala/com/avast/grpc/jsonbrige/http4s/Http4sTest.scala +++ b/http4s/src/test/scala/com/avast/grpc/jsonbrige/http4s/Http4sTest.scala @@ -3,7 +3,6 @@ package com.avast.grpc.jsonbrige.http4s import java.util.concurrent.{Executor, Executors} import cats.data.NonEmptyList -import cats.effect.IO import com.avast.grpc.jsonbridge._ import com.avast.grpc.jsonbridge.test.TestApi.{GetRequest, GetResponse} import com.avast.grpc.jsonbridge.test.TestApiServiceGrpc.{TestApiServiceFutureStub, TestApiServiceImplBase} @@ -47,19 +46,21 @@ class Http4sTest extends FunSuite with ScalaFutures { val Some(response) = service .apply( - Request[IO]( + Request[Task]( method = Method.POST, uri = Uri.fromString(s"${classOf[TestApiService].getName.replace("$", ".")}/Get").getOrElse(fail()) ).withBody(""" { "names": ["abc","def"] } """) - .unsafeRunSync() + .runAsync + .futureValue .withContentType(`Content-Type`(MediaType.`application/json`, Charset.`UTF-8`)) ) .value - .unsafeRunSync() + .runAsync + .futureValue assertResult(org.http4s.Status.Ok)(response.status) - assertResult("""{"results":{"name":42}}""")(response.as[String].unsafeRunSync()) + assertResult("""{"results":{"name":42}}""")(response.as[String].runAsync.futureValue) assertResult( Headers( @@ -84,18 +85,20 @@ class Http4sTest extends FunSuite with ScalaFutures { val Some(response) = service .apply( - Request[IO](method = Method.POST, - uri = Uri.fromString(s"/abc/def/${classOf[TestApiService].getName.replace("$", ".")}/Get").getOrElse(fail())) + Request[Task](method = Method.POST, + uri = Uri.fromString(s"/abc/def/${classOf[TestApiService].getName.replace("$", ".")}/Get").getOrElse(fail())) .withBody(""" { "names": ["abc","def"] } """) - .unsafeRunSync() + .runAsync + .futureValue .withContentType(`Content-Type`(MediaType.`application/json`)) ) .value - .unsafeRunSync() + .runAsync + .futureValue assertResult(org.http4s.Status.Ok)(response.status) - assertResult("""{"results":{"name":42}}""")(response.as[String].unsafeRunSync()) + assertResult("""{"results":{"name":42}}""")(response.as[String].runAsync.futureValue) assertResult( Headers( @@ -119,14 +122,16 @@ class Http4sTest extends FunSuite with ScalaFutures { { // empty body val Some(response) = service .apply( - Request[IO](method = Method.POST, - uri = Uri.fromString(s"${classOf[TestApiService].getName.replace("$", ".")}/Get").getOrElse(fail())) + Request[Task](method = Method.POST, + uri = Uri.fromString(s"${classOf[TestApiService].getName.replace("$", ".")}/Get").getOrElse(fail())) .withBody("") - .unsafeRunSync() + .runAsync + .futureValue .withContentType(`Content-Type`(MediaType.`application/json`)) ) .value - .unsafeRunSync() + .runAsync + .futureValue assertResult(org.http4s.Status.BadRequest)(response.status) } @@ -134,13 +139,15 @@ class Http4sTest extends FunSuite with ScalaFutures { { val Some(response) = service .apply( - Request[IO](method = Method.POST, - uri = Uri.fromString(s"${classOf[TestApiService].getName.replace("$", ".")}/Get").getOrElse(fail())) + Request[Task](method = Method.POST, + uri = Uri.fromString(s"${classOf[TestApiService].getName.replace("$", ".")}/Get").getOrElse(fail())) .withBody(""" { "names": ["abc","def"] } """) - .unsafeRunSync() + .runAsync + .futureValue ) .value - .unsafeRunSync() + .runAsync + .futureValue assertResult(org.http4s.Status.BadRequest)(response.status) } @@ -157,14 +164,15 @@ class Http4sTest extends FunSuite with ScalaFutures { val Some(response) = service .apply( - Request[IO](method = Method.POST, - uri = Uri.fromString(s"${classOf[TestApiService].getName.replace("$", ".")}/Get").getOrElse(fail())) + Request[Task](method = Method.POST, + uri = Uri.fromString(s"${classOf[TestApiService].getName.replace("$", ".")}/Get").getOrElse(fail())) .withBody(""" { "names": ["abc","def"] } """) - .unsafeRunSync() + .runAsync + .futureValue .withContentType(`Content-Type`(MediaType.`application/json`)) ) .value - .unsafeToFuture() + .runAsync .futureValue assertResult(org.http4s.Status.Forbidden)(response.status) @@ -177,15 +185,16 @@ class Http4sTest extends FunSuite with ScalaFutures { val Some(response) = service .apply( - Request[IO](method = Method.GET, uri = Uri.fromString(s"${classOf[TestApiService].getName.replace("$", ".")}").getOrElse(fail())) + Request[Task](method = Method.GET, uri = Uri.fromString(s"${classOf[TestApiService].getName.replace("$", ".")}").getOrElse(fail())) ) .value - .unsafeRunSync() + .runAsync + .futureValue assertResult(org.http4s.Status.Ok)(response.status) assertResult("com.avast.grpc.jsonbridge.test.TestApiService/Get\ncom.avast.grpc.jsonbridge.test.TestApiService/Get2")( - response.as[String].unsafeRunSync()) + response.as[String].runAsync.futureValue) } test("provides services info") { @@ -195,15 +204,16 @@ class Http4sTest extends FunSuite with ScalaFutures { val Some(response) = service .apply( - Request[IO](method = Method.GET, uri = Uri.fromString("/").getOrElse(fail())) + Request[Task](method = Method.GET, uri = Uri.fromString("/").getOrElse(fail())) ) .value - .unsafeRunSync() + .runAsync + .futureValue assertResult(org.http4s.Status.Ok)(response.status) assertResult("com.avast.grpc.jsonbridge.test.TestApiService/Get\ncom.avast.grpc.jsonbridge.test.TestApiService/Get2")( - response.as[String].unsafeRunSync()) + response.as[String].runAsync.futureValue) } test("passes user headers") { @@ -214,6 +224,7 @@ class Http4sTest extends FunSuite with ScalaFutures { val bridge = new TestApiServiceImplBase { override def get(request: GetRequest, responseObserver: StreamObserver[TestApi.GetResponse]): Unit = { + // NOTE: there is exception with this failed assert in the log; however it's fina - it's really missing during the first call assertResult(headerValue)(ctxKey.get()) assertResult(Seq("abc", "def"))(request.getNamesList.asScala) @@ -236,16 +247,18 @@ class Http4sTest extends FunSuite with ScalaFutures { { val Some(response) = service .apply( - Request[IO]( + Request[Task]( method = Method.POST, uri = Uri.fromString(s"${classOf[TestApiService].getName.replace("$", ".")}/Get").getOrElse(fail()), headers = Headers(Header("The-Header", headerValue)) ).withBody(""" { "names": ["abc","def"] } """) - .unsafeRunSync() + .runAsync + .futureValue .withContentType(`Content-Type`(MediaType.`application/json`)) ) .value - .unsafeRunSync() + .runAsync + .futureValue assertResult(org.http4s.Status.Ok)(response.status) } @@ -253,14 +266,16 @@ class Http4sTest extends FunSuite with ScalaFutures { { val Some(response) = service .apply( - Request[IO](method = Method.POST, - uri = Uri.fromString(s"${classOf[TestApiService].getName.replace("$", ".")}/Get").getOrElse(fail())) + Request[Task](method = Method.POST, + uri = Uri.fromString(s"${classOf[TestApiService].getName.replace("$", ".")}/Get").getOrElse(fail())) .withBody(""" { "names": ["abc","def"] } """) - .unsafeRunSync() + .runAsync + .futureValue .withContentType(`Content-Type`(MediaType.`application/json`)) ) .value - .unsafeRunSync() + .runAsync + .futureValue assertResult(org.http4s.Status.InternalServerError)(response.status) // because of failed assertResult }