Skip to content

Commit

Permalink
Use ExecutionContext instead of Executor and refactoring towards …
Browse files Browse the repository at this point in the history
…`Resource` (#33)
  • Loading branch information
sideeffffect authored and jendakol committed May 23, 2019
1 parent 1c61764 commit 946dddc
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.avast.grpc.jsonbridge

import java.lang.reflect.Method
import java.util.concurrent.Executor

import cats.effect.Async
import cats.syntax.all._
Expand All @@ -16,65 +15,40 @@ import io.grpc.inprocess.{InProcessChannelBuilder, InProcessServerBuilder}
import io.grpc.stub.AbstractStub

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.language.{existentials, higherKinds}
import scala.util.control.NonFatal

class ReflectionGrpcJsonBridge[F[_]](services: ServerServiceDefinition*)(implicit executor: Executor, F: Async[F])
class ReflectionGrpcJsonBridge[F[_]](services: ServerServiceDefinition*)(implicit ec: ExecutionContext, F: Async[F])
extends GrpcJsonBridge[F]
with AutoCloseable
with StrictLogging {

import com.avast.grpc.jsonbridge.ReflectionGrpcJsonBridge._

def this(grpcServer: io.grpc.Server)(implicit executor: Executor, F: Async[F]) = this(grpcServer.getImmutableServices.asScala: _*)
def this(grpcServer: io.grpc.Server)(implicit ec: ExecutionContext, F: Async[F]) = this(grpcServer.getImmutableServices.asScala: _*)

private val inProcessServiceName = s"HttpWrapper-${System.nanoTime()}"

private val inProcessServer = {
val b = InProcessServerBuilder.forName(inProcessServiceName).executor(executor)
val b = InProcessServerBuilder.forName(inProcessServiceName).executor(ec.execute(_))
services.foreach(b.addService)
b.build().start()
}

private val inProcessChannel = InProcessChannelBuilder.forName(inProcessServiceName).executor(executor).build()
private val inProcessChannel = InProcessChannelBuilder.forName(inProcessServiceName).executor(ec.execute(_)).build()

override def close(): Unit = {
inProcessChannel.shutdownNow()
inProcessServer.shutdownNow()
()
}

// JSON body and headers to a response (fail status or JSON response)
protected type HandlerFunc = (String, Map[String, String]) => F[Either[Status, String]]

// map from full method name to a function that invokes that method
protected val handlersPerMethod: Map[String, HandlerFunc] =
inProcessServer.getImmutableServices.asScala.flatMap { ssd =>
ssd.getMethods.asScala
.filter(isSupportedMethod)
.map { method =>
val requestMessagePrototype = getRequestMessagePrototype(method)
val javaMethodName = getJavaMethodName(method)

val futureStubCtor = createFutureStubCtor(ssd, inProcessChannel)

val javaMethod: Method = {
val futureStub: AbstractStub[_] = futureStubCtor()
futureStub.getClass.getDeclaredMethod(javaMethodName, requestMessagePrototype.getClass)
}

val execute = executeRequest[F](futureStubCtor, javaMethod) _

val handler: HandlerFunc = (json, headers) => {
parseRequest(json, requestMessagePrototype) match {
case Right(req) => execute(req, headers).map(resp => Right(printer.print(resp)))
case Left(status) => F.pure(Left(status))
}
}

(method.getMethodDescriptor.getFullMethodName, handler)
}
}.toMap
protected val handlersPerMethod: Map[String, HandlerFunc[F]] =
inProcessServer.getImmutableServices.asScala
.flatMap(createServiceHandlers(inProcessChannel)(_))
.toMap

override def invoke(methodName: GrpcMethodName, body: String, headers: Map[String, String]): F[Either[Status, String]] =
handlersPerMethod.get(methodName.fullName) match {
Expand Down Expand Up @@ -107,56 +81,73 @@ class ReflectionGrpcJsonBridge[F[_]](services: ServerServiceDefinition*)(implici

object ReflectionGrpcJsonBridge extends StrictLogging {

// JSON body and headers to a response (fail status or JSON response)
private type HandlerFunc[F[_]] = (String, Map[String, String]) => F[Either[Status, String]]

private val parser: JsonFormat.Parser = JsonFormat.parser()

private val printer: JsonFormat.Printer = {
JsonFormat.printer().includingDefaultValueFields().omittingInsignificantWhitespace()
}

private def createFutureStubCtor(ssd: ServerServiceDefinition, inProcessChannel: Channel): () => AbstractStub[_] = {
val method = getServiceGeneratedClass(ssd.getServiceDescriptor).getDeclaredMethod("newFutureStub", classOf[Channel])
private def createFutureStubCtor(sd: ServiceDescriptor, inProcessChannel: Channel): () => AbstractStub[_] = {
val serviceGeneratedClass = Class.forName {
if (sd.getName.startsWith("grpc.")) "io." + sd.getName + "Grpc" else sd.getName + "Grpc"
}
val method = serviceGeneratedClass.getDeclaredMethod("newFutureStub", classOf[Channel])

() =>
method.invoke(null, inProcessChannel).asInstanceOf[AbstractStub[_]]
}

private def createServiceHandlers[F[_]](inProcessChannel: ManagedChannel)(
ssd: ServerServiceDefinition)(implicit ec: ExecutionContext, F: Async[F]): Map[String, HandlerFunc[F]] = {
val futureStubCtor = createFutureStubCtor(ssd.getServiceDescriptor, inProcessChannel)
ssd.getMethods.asScala
.filter(isSupportedMethod)
.map(createHandler(futureStubCtor)(_))
.toMap
}

private def createHandler[F[_]](futureStubCtor: () => AbstractStub[_])(
method: ServerMethodDefinition[_, _])(implicit ec: ExecutionContext, F: Async[F]): (String, HandlerFunc[F]) = {
val requestMessagePrototype = getRequestMessagePrototype(method)
val javaMethod = futureStubCtor().getClass
.getDeclaredMethod(getJavaMethodName(method), requestMessagePrototype.getClass)
val execute = executeRequest[F](futureStubCtor, javaMethod) _

val handler: HandlerFunc[F] = (json, headers) => {
parseRequest(json, requestMessagePrototype) match {
case Right(req) => execute(req, headers).map(resp => Right(printer.print(resp)))
case Left(status) => F.pure(Left(status))
}
}
(method.getMethodDescriptor.getFullMethodName, handler)
}

private def executeRequest[F[_]](futureStubCtor: () => AbstractStub[_], method: Method)(req: Message, headers: Map[String, String])(
implicit executor: Executor,
implicit ec: ExecutionContext,
F: Async[F]): F[MessageOrBuilder] = {

def createMetadata(): Metadata = {
val metaData = {
val md = new Metadata()
headers.foreach { case (k, v) => md.put(Metadata.Key.of(k, Metadata.ASCII_STRING_MARSHALLER), v) }
md
}

val stubWithHeaders = JavaGenericHelper.attachHeaders(futureStubCtor(), createMetadata())

executeRequest(stubWithHeaders, method, req)
}

private def executeRequest[F[_]](stubWithHeaders: AbstractStub[_], method: Method, req: Message)(implicit executor: Executor,
F: Async[F]): F[MessageOrBuilder] = {
val stubWithHeaders = JavaGenericHelper.attachHeaders(futureStubCtor(), metaData)
fromListenableFuture(F.delay {
method.invoke(stubWithHeaders, req).asInstanceOf[ListenableFuture[MessageOrBuilder]]
})
}

private def isSupportedMethod(d: ServerMethodDefinition[_, _]): Boolean = d.getMethodDescriptor.getType == MethodType.UNARY

private def getServiceGeneratedClass(sd: ServiceDescriptor): Class[_] = {
Class.forName {
if (sd.getName.startsWith("grpc.")) "io." + sd.getName + "Grpc" else sd.getName + "Grpc"
}
}

private def fromListenableFuture[F[_], A](flf: F[ListenableFuture[A]])(implicit executor: Executor, F: Async[F]): F[A] = flf.flatMap {
private def fromListenableFuture[F[_], A](flf: F[ListenableFuture[A]])(implicit ec: ExecutionContext, F: Async[F]): F[A] = flf.flatMap {
lf =>
F.async { cb =>
Futures.addCallback(lf, new FutureCallback[A] {
def onFailure(t: Throwable): Unit = cb(Left(t))
def onSuccess(result: A): Unit = cb(Right(result))
}, executor)
}, ec.execute(_))
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,21 +1,20 @@
package com.avast.grpc.jsonbridge

import java.util.concurrent.{ExecutorService, Executors}

import cats.effect.IO
import com.avast.grpc.jsonbridge.GrpcJsonBridge.GrpcMethodName
import io.grpc.Status
import io.grpc.inprocess.InProcessServerBuilder
import org.scalatest.{fixture, Matchers, Outcome}

import scala.concurrent.ExecutionContext.Implicits.global

class ReflectionGrpcJsonBridgeTest extends fixture.FlatSpec with Matchers {

case class FixtureParam(bridge: GrpcJsonBridge[IO])

override protected def withFixture(test: OneArgTest): Outcome = {
val channelName = InProcessServerBuilder.generateName
val server = InProcessServerBuilder.forName(channelName).addService(new TestServiceImpl()).build
implicit val executor: ExecutorService = Executors.newSingleThreadExecutor()
val bridge = new ReflectionGrpcJsonBridge[IO](server)
try {
test(FixtureParam(bridge))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package com.avast.grpc.jsonbrige.http4s

import java.util.concurrent.{Executor, Executors}

import cats.data.NonEmptyList
import cats.effect.IO
import com.avast.grpc.jsonbridge._
Expand All @@ -10,12 +8,11 @@ import org.http4s.{Charset, Header, Headers, MediaType, Method, Request, Uri}
import org.scalatest.FunSuite
import org.scalatest.concurrent.ScalaFutures

import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.Random

class Http4sTest extends FunSuite with ScalaFutures {

implicit val executor: Executor = Executors.newSingleThreadExecutor()

test("basic") {
val service = Http4s(Configuration.Default)(new ReflectionGrpcJsonBridge[IO](TestServiceImpl.bindService()))

Expand Down

0 comments on commit 946dddc

Please sign in to comment.