Skip to content
This repository has been archived by the owner on Oct 19, 2024. It is now read-only.

Commit

Permalink
[grpc-client] Handle bidirectional streams correctly (#314)
Browse files Browse the repository at this point in the history
This is a breaking change: The handler for bidirectional streams is returns two
conduits now, instead of one. This enables the client to correctly tackle the
concurrent nature of the client to server stream and the server to client
stream.

Each response in the server-to-client stream is no longer wrapped in GRpcReply,
any error during parsing the stream is thrown in IO.

Other connection related errors are returned in the result value of the conduit
corresponding to the server-to-client Conduit.

Note: The client didn't and still doesn't handle any errors that the server
might indicate using headers or trailers, e.g. grpc-status or the HTTP status
code. This commit also adds TODO comments to handle these.
  • Loading branch information
akshaymankar authored Jun 1, 2021
1 parent 1c9f75a commit 2018d12
Showing 1 changed file with 39 additions and 35 deletions.
74 changes: 39 additions & 35 deletions grpc/client/src/Mu/GRpc/Client/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import Control.Concurrent.Async
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TMChan
import Control.Concurrent.STM.TMVar
import Control.Exception (throwIO)
import Control.Monad.IO.Class
import Data.Avro
import qualified Data.ByteString.Char8 as BS
Expand All @@ -38,7 +39,7 @@ import Network.GRPC.Client.Helpers
import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput)
import Network.HTTP2 (ErrorCode)
import Network.HTTP2.Client (ClientError, ClientIO, TooMuchConcurrency,
runExceptT)
runExceptT, ExceptT)

import Mu.Adapter.ProtoBuf.Via
import Mu.GRpc.Avro
Expand Down Expand Up @@ -304,47 +305,50 @@ conduitFromChannel chan promise = go

instance ( KnownName name
, GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r
, handler ~ (CompressMode -> IO (ConduitT v (GRpcReply r) IO ())) )
, handler ~ (CompressMode -> IO (ConduitT v Void IO (), ConduitT () r IO (GRpcReply ()))))
=> GRpcMethodCall p ('Method name '[ 'ArgStream aname vref ]
('RetStream rref)) handler where
gRpcMethodCall rpc _ client compress
= do -- Create a new TMChan
inchan <- newTMChanIO :: IO (TMChan (GRpcReply r))
outchan <- newTMChanIO :: IO (TMChan v)
var <- newEmptyTMVarIO -- if full, this means an error
= do serverChan <- newTMChanIO :: IO (TMChan r)
clientChan <- newTMChanIO :: IO (TMChan v)
finalReply <- newEmptyTMVarIO :: IO (TMVar (GRpcReply ()))
-- Start executing the client in another thread
-- TODO: Is there anything that makes sure that this thread doesn't keep running forever?
_ <- async $ do
v <- simplifyResponse $
buildGRpcReply3 <$>
rawGeneralStream
@_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r)
rpc client
() (\_ ievent -> do -- on the first iteration, say that everything is OK
_ <- liftIO $ atomically $ tryPutTMVar var (GRpcOk ())
case ievent of
RecvMessage o -> liftIO $ atomically $ writeTMChan inchan (GRpcOk $ unGRpcOWTy(Proxy @p) (Proxy @rref) o)
Invalid e -> liftIO $ atomically $ writeTMChan inchan (GRpcErrorString (show e))
_ -> pure () )
() (\_ -> do
nextVal <- liftIO $ atomically $ readTMChan outchan
case nextVal of
Nothing -> pure ((), Finalize)
Just v -> pure ((), SendMessage compress (buildGRpcIWTy (Proxy @p) (Proxy @vref) v)))
case v of
GRpcOk () -> liftIO $ atomically $ closeTMChan inchan
_ -> liftIO $ atomically $ putTMVar var v
-- This conduit feeds information to the other thread
let go = do err <- liftIO $ atomically $ takeTMVar var
case err of
GRpcOk _ -> go2
e -> yield $ (\_ -> error "this should never happen") <$> e
go2 = do nextOut <- await
case nextOut of
Just v -> do liftIO $ atomically $ writeTMChan outchan v
go2
Nothing -> do r <- liftIO $ atomically $ tryReadTMChan inchan
case r of
Nothing -> pure () -- both are empty, end
Just Nothing -> go2
Just (Just nextIn) -> yield nextIn >> go2
pure go
() (incomingEventConsumer serverChan)
() (outgoingEventProducer clientChan)
liftIO $ atomically $ putTMVar finalReply v
let clientConduit = do
sinkTMChan clientChan
liftIO . atomically . closeTMChan $ clientChan
serverConduit = do
sourceTMChan serverChan
liftIO . atomically . readTMVar $ finalReply
pure (clientConduit, serverConduit)
where
incomingEventConsumer :: TMChan r -> () -> IncomingEvent (GRpcOWTy p rref r) () -> ExceptT ClientError IO ()
incomingEventConsumer serverChan _ ievent =
case ievent of
RecvMessage o -> do
liftIO $ atomically $ writeTMChan serverChan (unGRpcOWTy (Proxy @p) (Proxy @rref) o)
Invalid e -> liftIO $ do
atomically $ closeTMChan serverChan
throwIO e
Trailers _ ->
-- TODO: Read the trailers and use them to make the 'finalReply'
liftIO $ atomically $ closeTMChan serverChan
Headers _ ->
-- TODO: Read the headers and use them to make the 'finalReply'
pure ()

outgoingEventProducer :: TMChan v -> () -> ExceptT ClientError IO ((), OutgoingEvent (GRpcIWTy p vref v) ())
outgoingEventProducer clientChan _ = do
nextVal <- liftIO $ atomically $ readTMChan clientChan
case nextVal of
Nothing -> pure ((), Finalize)
Just v -> pure ((), SendMessage compress (buildGRpcIWTy (Proxy @p) (Proxy @vref) v))

0 comments on commit 2018d12

Please sign in to comment.