Skip to content

Commit

Permalink
Finally tagless reworked (#20)
Browse files Browse the repository at this point in the history
* Http4s for F: Async
* Akka for F: Effect
* FunctorK for GrpcJsonBridge
  • Loading branch information
jendakol authored Sep 3, 2018
1 parent 4a39204 commit dd0f8a6
Show file tree
Hide file tree
Showing 11 changed files with 186 additions and 119 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@ For requests/responses mapping a [standard GPB <-> JSON mapping](https://develop
It uses Scala macros for creating mapping between runtime-provided service and method names to pregenerated Java gRPC classes. In case you
don't want to use _plain Java API_ you can easily use it together with [Cactus](https://github.com/avast/cactus).

The API is _finally tagless_ (read more e.g. [here](https://www.beyondthelines.net/programming/introduction-to-tagless-final/))
meaning it can use whatever [`F[_]: cats.effect.Effect`](https://typelevel.org/cats-effect/typeclasses/effect.html)
(e.g. `cats.effect.IO`, `monix.eval.Task`).

There are several modules:
1. core - for basic implementation-agnostic usage
1. [http4s](http4s) - integration with [http4s](https://http4s.org/) webserver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import akka.http.scaladsl.model.headers.`Content-Type`
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.{PathMatcher, Route}
import cats.data.NonEmptyList
import cats.effect.Effect
import com.avast.grpc.jsonbridge.GrpcJsonBridge
import com.avast.grpc.jsonbridge.GrpcJsonBridge.GrpcHeader
import com.avast.grpc.jsonbridge.{GrpcJsonBridge, ToTask}
import io.grpc.BindableService
import io.grpc.Status.Code
import monix.eval.Task
import monix.execution.Scheduler

import scala.concurrent.ExecutionContext
import scala.language.higherKinds
import scala.util.control.NonFatal
import scala.util.{Failure, Success}
Expand All @@ -21,9 +24,11 @@ object AkkaHttp {
ContentType.WithMissingCharset(MediaType.applicationWithOpenCharset("json"))
}

def apply[F[_]: ToTask](configuration: Configuration)(bridges: GrpcJsonBridge[F, _ <: BindableService]*)(
implicit sch: Scheduler): Route = {
val services = bridges.map(s => (s.serviceName, s): (String, GrpcJsonBridge[F, _])).toMap
def apply[F[_]: Effect](configuration: Configuration)(bridges: GrpcJsonBridge[F, _ <: BindableService]*)(
implicit ec: ExecutionContext): Route = {
implicit val sch: Scheduler = Scheduler(ec)

val bridgesMap = bridges.map(s => (s.serviceName, s): (String, GrpcJsonBridge[F, _])).toMap

val pathPattern = configuration.pathPrefix
.map {
Expand All @@ -42,12 +47,13 @@ object AkkaHttp {
extractRequest { req =>
req.header[`Content-Type`] match {
case Some(`JsonContentType`) =>
services.get(serviceName) match {
bridgesMap.get(serviceName) match {
case Some(service) =>
entity(as[String]) { json =>
val methodCall = implicitly[ToTask[F]].apply {
service.invokeGrpcMethod(methodName, json, mapHeaders(req.headers))
}.runAsync
val methodCall =
Task.fromEffect {
service.invokeGrpcMethod(methodName, json, mapHeaders(req.headers))
}.runAsync

onComplete(methodCall) {
case Success(Right(r)) =>
Expand All @@ -69,7 +75,7 @@ object AkkaHttp {
}
} ~ get {
path(Segment) { serviceName =>
services.get(serviceName) match {
bridgesMap.get(serviceName) match {
case Some(service) =>
complete(service.methodsNames.mkString("\n"))

Expand All @@ -78,7 +84,7 @@ object AkkaHttp {
}
} ~ get {
path(PathEnd) {
complete(services.values.flatMap(s => s.methodsNames).mkString("\n"))
complete(bridgesMap.values.flatMap(s => s.methodsNames).mkString("\n"))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,18 @@ package com.avast.grpc.jsonbridge.akkahttp

import akka.http.scaladsl.model.HttpHeader.ParsingResult.Ok
import akka.http.scaladsl.model.headers.`Content-Type`
import akka.http.scaladsl.model.{ContentType, HttpHeader, MediaType, StatusCodes}
import akka.http.scaladsl.model._
import akka.http.scaladsl.testkit.ScalatestRouteTest
import cats.data.NonEmptyList
import cats.effect.Effect
import com.avast.grpc.jsonbridge._
import com.avast.grpc.jsonbridge.test.{TestApi, TestApiService}
import com.avast.grpc.jsonbridge.test.TestApi.{GetRequest, GetResponse}
import com.avast.grpc.jsonbridge.test.TestApiServiceGrpc.{TestApiServiceFutureStub, TestApiServiceImplBase}
import com.avast.grpc.jsonbridge.test.{TestApi, TestApiService}
import io.grpc._
import io.grpc.stub.StreamObserver
import monix.eval.Task
import monix.execution.Scheduler
import org.scalatest.FunSuite

import scala.collection.JavaConverters._
Expand All @@ -21,6 +23,9 @@ import scala.util.Random

class AkkaHttpTest extends FunSuite with ScalatestRouteTest {

// this is workaround which solves presence of ExecutionContextExecutor in RouteTest from AKKA
private implicit val taskEff: Effect[Task] = Task.catsEffect(Scheduler.global)

case class MyRequest(names: Seq[String])

case class MyResponse(results: Map[String, Int])
Expand All @@ -40,7 +45,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {
}
}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]()

val route = AkkaHttp[Task](Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global)
val route = AkkaHttp[Task](Configuration.Default)(bridge)

Post(s"/${classOf[TestApiService].getName.replace("$", ".")}/Get", """ { "names": ["abc","def"] } """)
.withHeaders(AkkaHttp.JsonContentType) ~> route ~> check {
Expand All @@ -63,7 +68,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {

val configuration = Configuration.Default.copy(pathPrefix = Some(NonEmptyList.of("abc", "def")))

val route = AkkaHttp(configuration)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global)
val route = AkkaHttp(configuration)(bridge)

Post(s"/abc/def/${classOf[TestApiService].getName.replace("$", ".")}/Get", """ { "names": ["abc","def"] } """)
.withHeaders(AkkaHttp.JsonContentType) ~> route ~> check {
Expand All @@ -82,7 +87,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {
}
}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]()

val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global)
val route = AkkaHttp(Configuration.Default)(bridge)

// empty body
Post(s"/${classOf[TestApiService].getName.replace("$", ".")}/Get", "")
Expand All @@ -103,7 +108,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {
}
}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]()

val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global)
val route = AkkaHttp(Configuration.Default)(bridge)

Post(s"/${classOf[TestApiService].getName.replace("$", ".")}/Get", """ { "names": ["abc","def"] } """)
.withHeaders(AkkaHttp.JsonContentType) ~> route ~> check {
Expand All @@ -114,7 +119,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {
test("provides service description") {
val bridge = new TestApiServiceImplBase {}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]()

val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global)
val route = AkkaHttp(Configuration.Default)(bridge)

Get(s"/${classOf[TestApiService].getName.replace("$", ".")}") ~> route ~> check {
assertResult(StatusCodes.OK)(status)
Expand All @@ -127,7 +132,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {
test("provides services description") {
val bridge = new TestApiServiceImplBase {}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]()

val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global)
val route = AkkaHttp(Configuration.Default)(bridge)

Get("/") ~> route ~> check {
assertResult(StatusCodes.OK)(status)
Expand Down Expand Up @@ -161,7 +166,7 @@ class AkkaHttpTest extends FunSuite with ScalatestRouteTest {
}
)

val route = AkkaHttp(Configuration.Default)(bridge)(implicitly[ToTask[Task]], monix.execution.Scheduler.Implicits.global)
val route = AkkaHttp(Configuration.Default)(bridge)

val Ok(customHeaderToBeSent, _) = HttpHeader.parse("The-Header", headerValue)

Expand Down
4 changes: 3 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ lazy val javaSettings = Seq(

lazy val macroSettings = Seq(
addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full),
addCompilerPlugin("org.spire-math" % "kind-projector" % "0.9.7" cross CrossVersion.binary),
libraryDependencies ++= Seq(
"org.scala-lang" % "scala-reflect" % scalaVersion.value,
"org.scala-lang" % "scala-compiler" % scalaVersion.value
Expand Down Expand Up @@ -112,7 +113,8 @@ lazy val core = (project in file("core")).settings(
"io.grpc" % "grpc-protobuf" % Versions.grpcVersion,
"io.grpc" % "grpc-stub" % Versions.grpcVersion,
"org.typelevel" %% "cats-core" % "1.2.0",
"io.monix" % "monix_2.12" % "3.0.0-RC1",
"io.monix" %% "monix" % "3.0.0-RC1",
"com.kailuowang" %% "mainecoon-core" % "0.6.4",
"com.typesafe.scala-logging" %% "scala-logging" % "3.9.0",
"org.slf4j" % "jul-to-slf4j" % "1.7.25",
"org.slf4j" % "jcl-over-slf4j" % "1.7.25",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,31 @@
package com.avast.grpc.jsonbridge

import cats.effect.{Async, Effect}
import cats.syntax.all._
import com.avast.grpc.jsonbridge.GrpcJsonBridge.GrpcHeader
import com.google.protobuf.Message
import com.google.protobuf.util.JsonFormat
import com.typesafe.scalalogging.StrictLogging
import io.grpc._
import io.grpc.stub.MetadataUtils
import io.grpc.{Metadata, Status, StatusException, StatusRuntimeException}
import monix.eval.Task
import monix.execution.Scheduler

import scala.concurrent.{ExecutionContext, Future}
import scala.language.higherKinds
import scala.util.control.NonFatal

/** This is trait for internal usage. You should not use it directly.
*/
trait GrpcJsonBridgeBase[Stub <: io.grpc.stub.AbstractStub[Stub]] extends StrictLogging {
abstract class GrpcJsonBridgeBase[F[_], Stub <: io.grpc.stub.AbstractStub[Stub]](implicit protected val F: Async[F]) extends StrictLogging {

protected def newFutureStub: Stub
protected val parser: JsonFormat.Parser = JsonFormat.parser()
protected val printer: JsonFormat.Printer = JsonFormat.printer().includingDefaultValueFields().omittingInsignificantWhitespace()

// https://groups.google.com/forum/#!topic/grpc-io/1-KMubq1tuc
protected def withNewClientStub[A](headers: Seq[GrpcHeader])(f: Stub => Future[A])(
implicit ec: ExecutionContext): Task[Either[Status, A]] = {
implicit ec: ExecutionContext): F[Either[Status, A]] = {
val metadata = new Metadata()
headers.foreach(h => metadata.put(Metadata.Key.of(h.name, Metadata.ASCII_STRING_MARSHALLER), h.value))

Expand All @@ -31,8 +35,9 @@ trait GrpcJsonBridgeBase[Stub <: io.grpc.stub.AbstractStub[Stub]] extends Strict
try {
Task
.deferFuture(f(clientFutureStub))
.map(Right(_))
.onErrorRecover {
.to[F](F, Scheduler(ec))
.map(Right(_): Either[Status, A])
.recover {
case e: StatusException if e.getStatus.getCode == Status.Code.UNKNOWN => Left(Status.INTERNAL)
case e: StatusRuntimeException if e.getStatus.getCode == Status.Code.UNKNOWN => Left(Status.INTERNAL)
case e: StatusException => Left(e.getStatus)
Expand All @@ -42,11 +47,11 @@ trait GrpcJsonBridgeBase[Stub <: io.grpc.stub.AbstractStub[Stub]] extends Strict
Left(Status.INTERNAL.withCause(e))
}
} catch {
case e: StatusException if e.getStatus.getCode == Status.Code.UNKNOWN => Task.now(Left(Status.INTERNAL))
case e: StatusRuntimeException if e.getStatus.getCode == Status.Code.UNKNOWN => Task.now(Left(Status.INTERNAL))
case e: StatusException if e.getStatus.getCode == Status.Code.UNKNOWN => F.pure(Left(Status.INTERNAL))
case e: StatusRuntimeException if e.getStatus.getCode == Status.Code.UNKNOWN => F.pure(Left(Status.INTERNAL))
case NonFatal(e) =>
logger.debug("Error while executing the request", e)
Task.now(Left(Status.INTERNAL.withCause(e)))
F.pure(Left(Status.INTERNAL.withCause(e)))
}

// just abandon the stub...
Expand Down
14 changes: 6 additions & 8 deletions core/src/main/scala/com/avast/grpc/jsonbridge/Macros.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class Macros(val c: blackbox.Context) {
import c.universe._

def generateGrpcJsonBridge[F[_], GrpcServiceStub <: BindableService, GrpcClientStub <: AbstractStub[GrpcClientStub]: WeakTypeTag](
interceptors: c.Tree*)(ec: c.Tree, ex: c.Tree, ct: c.Tree, ct2: c.Tree): c.Expr[GrpcJsonBridge[F, GrpcServiceStub]] = {
interceptors: c.Tree*)(ec: c.Tree, ex: c.Tree, ct: c.Tree, ct2: c.Tree, asf: c.Tree): c.Expr[GrpcJsonBridge[F, GrpcServiceStub]] = {

val clientType = weakTypeOf[GrpcClientStub]
val serviceTypeRaw = extractSymbolFromClassTag(ct)
Expand Down Expand Up @@ -51,7 +51,7 @@ class Macros(val c: blackbox.Context) {

val t =
q"""
new _root_.com.avast.grpc.jsonbridge.GrpcJsonBridge[$fType, $serviceTypeRaw] with _root_.com.avast.grpc.jsonbridge.GrpcJsonBridgeBase[$clientType] {
new _root_.com.avast.grpc.jsonbridge.GrpcJsonBridgeBase[$fType, $clientType]()($asf) with _root_.com.avast.grpc.jsonbridge.GrpcJsonBridge[$fType, $serviceTypeRaw] {
import _root_.com.avast.grpc.jsonbridge._
import _root_.cats.instances.future._
import _root_.cats.data._
Expand All @@ -68,17 +68,15 @@ class Macros(val c: blackbox.Context) {
override def invokeGrpcMethod(name: String,
json: => String,
headers: => _root_.scala.Seq[com.avast.grpc.jsonbridge.GrpcJsonBridge.GrpcHeader]): $fType[_root_.scala.Either[_root_.io.grpc.Status, String]] = {
val task = try {
try {
name match {
case ..$methodCases
// unsupported method
case _ => monix.eval.Task.now(_root_.scala.Left(_root_.io.grpc.Status.NOT_FOUND))
case _ => F.pure(_root_.scala.Left(_root_.io.grpc.Status.NOT_FOUND))
}
} catch {
case _root_.scala.util.control.NonFatal(e) => _root_.monix.eval.Task.now(_root_.scala.Left(_root_.io.grpc.Status.INTERNAL))
case _root_.scala.util.control.NonFatal(e) => F.pure(_root_.scala.Left(_root_.io.grpc.Status.INTERNAL))
}

implicitly[_root_.cats.arrow.FunctionK[Task, $fType]].apply(task)
}

override val methodsNames: _root_.scala.Seq[String] = ${methodsNames(serviceType)}
Expand Down Expand Up @@ -110,7 +108,7 @@ class Macros(val c: blackbox.Context) {
cq"""
${firstUpper(name.toString)} =>
(for {
request <- _root_.cats.data.EitherT.fromEither[_root_.monix.eval.Task](fromJson(${request.companion}.getDefaultInstance, json))
request <- _root_.cats.data.EitherT.fromEither[$fType](fromJson(${request.companion}.getDefaultInstance, json))
result <- _root_.cats.data.EitherT {
withNewClientStub(headers) { _.$name(request).asScala(executor).map(toJson(_)) }
}
Expand Down
38 changes: 28 additions & 10 deletions core/src/main/scala/com/avast/grpc/jsonbridge/jsonbridge.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,27 @@ package com.avast.grpc

import java.util.concurrent.Executor

import cats.arrow.FunctionK
import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
import cats.effect.Async
import cats.~>
import com.google.common.util.concurrent._
import io.grpc._
import io.grpc.stub.AbstractStub
import io.grpc.{BindableService, ServerInterceptor}
import monix.eval.Task
import mainecoon.FunctorK

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.concurrent._
import scala.language.experimental.macros
import scala.language.higherKinds
import scala.reflect.ClassTag

package object jsonbridge {

type ToTask[A[_]] = FunctionK[A, Task]

implicit val fkTaskIdentity: FunctionK[Task, Task] = FunctionK.id

implicit class DeriveBridge[GrpcServiceStub <: BindableService](val serviceStub: GrpcServiceStub) extends AnyVal {
def createGrpcJsonBridge[F[_], GrpcClientStub <: AbstractStub[GrpcClientStub]](interceptors: ServerInterceptor*)(
implicit ec: ExecutionContext,
ex: Executor,
ct: ClassTag[GrpcServiceStub],
ct2: ClassTag[F[_]]): GrpcJsonBridge[F, GrpcServiceStub] =
ct2: ClassTag[F[_]],
asf: Async[F]): GrpcJsonBridge[F, GrpcServiceStub] =
macro Macros.generateGrpcJsonBridge[F, GrpcServiceStub, GrpcClientStub]
}

Expand All @@ -41,4 +39,24 @@ package object jsonbridge {
}
}

implicit class GrpcJsonBridgeOps[F[_], Service <: BindableService](val bridge: GrpcJsonBridge[F, Service]) extends AnyVal {
def mapK[G[_]](implicit fToG: ~>[F, G]): GrpcJsonBridge[G, Service] = bridgeFunctorK[Service].mapK(bridge)(fToG)
}

implicit def bridgeFunctorK[Service <: BindableService]: FunctorK[GrpcJsonBridge[?[_], Service]] =
new FunctorK[GrpcJsonBridge[?[_], Service]] {
override def mapK[F[_], G[_]](bridge: GrpcJsonBridge[F, Service])(fToG: ~>[F, G]): GrpcJsonBridge[G, Service] =
new GrpcJsonBridge[G, Service] {
override def invokeGrpcMethod(name: String,
json: => String,
headers: => Seq[GrpcJsonBridge.GrpcHeader]): G[Either[Status, String]] = fToG {
bridge.invokeGrpcMethod(name, json, headers)
}
override def methodsNames: Seq[String] = bridge.methodsNames

override def serviceName: String = bridge.serviceName
override def close(): Unit = bridge.close()
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.avast.grpc.jsonbridge

import cats.~>
import com.avast.cactus.grpc.server.GrpcService
import com.avast.grpc.jsonbridge.internalPackage.MyServiceImpl
import com.avast.grpc.jsonbridge.test.TestApi
Expand All @@ -11,9 +12,10 @@ import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import org.scalatest.FunSuite
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.time.{Milliseconds, Seconds, Span}
import org.scalatest.time._

import scala.collection.JavaConverters._
import scala.concurrent.Future

package internalPackage {
// this is here to test that package from PROTO fgile is used as "serviceName"
Expand Down Expand Up @@ -132,4 +134,16 @@ class GrpcJsonBridgeTest extends FunSuite with ScalaFutures {
assertResult("""{"results":{"name":42}}""")(response)
}

test("functorK") {
assertCompiles(
"""
|val bridge: GrpcJsonBridge[Task, MyServiceImpl] = new MyServiceImpl {
| override def get(request: GetRequest, responseObserver: StreamObserver[GetResponse]): Unit = {}
|}.createGrpcJsonBridge[Task, TestApiServiceFutureStub]()
|
|implicit val taskToFuture: Task ~> Future = new ~>[Task, Future] { override def apply[A](fa: Task[A]): Future[A] = fa.runAsync }
|
|val bridgeFuture: GrpcJsonBridge[Future, MyServiceImpl] = bridge.mapK[Future]
""".stripMargin)
}
}
Loading

0 comments on commit dd0f8a6

Please sign in to comment.