diff --git a/grpc/client/src/Mu/GRpc/Client/Internal.hs b/grpc/client/src/Mu/GRpc/Client/Internal.hs index caa9897c..85ebf00c 100644 --- a/grpc/client/src/Mu/GRpc/Client/Internal.hs +++ b/grpc/client/src/Mu/GRpc/Client/Internal.hs @@ -21,6 +21,7 @@ import Control.Concurrent.Async import Control.Concurrent.STM (atomically) import Control.Concurrent.STM.TMChan import Control.Concurrent.STM.TMVar +import Control.Exception (throwIO) import Control.Monad.IO.Class import Data.Avro import qualified Data.ByteString.Char8 as BS @@ -38,7 +39,7 @@ import Network.GRPC.Client.Helpers import Network.GRPC.HTTP2.Encoding (GRPCInput, GRPCOutput) import Network.HTTP2 (ErrorCode) import Network.HTTP2.Client (ClientError, ClientIO, TooMuchConcurrency, - runExceptT) + runExceptT, ExceptT) import Mu.Adapter.ProtoBuf.Via import Mu.GRpc.Avro @@ -304,47 +305,50 @@ conduitFromChannel chan promise = go instance ( KnownName name , GRpcInputWrapper p vref v, GRpcOutputWrapper p rref r - , handler ~ (CompressMode -> IO (ConduitT v (GRpcReply r) IO ())) ) + , handler ~ (CompressMode -> IO (ConduitT v Void IO (), ConduitT () r IO (GRpcReply ())))) => GRpcMethodCall p ('Method name '[ 'ArgStream aname vref ] ('RetStream rref)) handler where gRpcMethodCall rpc _ client compress - = do -- Create a new TMChan - inchan <- newTMChanIO :: IO (TMChan (GRpcReply r)) - outchan <- newTMChanIO :: IO (TMChan v) - var <- newEmptyTMVarIO -- if full, this means an error + = do serverChan <- newTMChanIO :: IO (TMChan r) + clientChan <- newTMChanIO :: IO (TMChan v) + finalReply <- newEmptyTMVarIO :: IO (TMVar (GRpcReply ())) -- Start executing the client in another thread + -- TODO: Is there anything that makes sure that this thread doesn't keep running forever? _ <- async $ do v <- simplifyResponse $ buildGRpcReply3 <$> rawGeneralStream @_ @(GRpcIWTy p vref v) @(GRpcOWTy p rref r) rpc client - () (\_ ievent -> do -- on the first iteration, say that everything is OK - _ <- liftIO $ atomically $ tryPutTMVar var (GRpcOk ()) - case ievent of - RecvMessage o -> liftIO $ atomically $ writeTMChan inchan (GRpcOk $ unGRpcOWTy(Proxy @p) (Proxy @rref) o) - Invalid e -> liftIO $ atomically $ writeTMChan inchan (GRpcErrorString (show e)) - _ -> pure () ) - () (\_ -> do - nextVal <- liftIO $ atomically $ readTMChan outchan - case nextVal of - Nothing -> pure ((), Finalize) - Just v -> pure ((), SendMessage compress (buildGRpcIWTy (Proxy @p) (Proxy @vref) v))) - case v of - GRpcOk () -> liftIO $ atomically $ closeTMChan inchan - _ -> liftIO $ atomically $ putTMVar var v - -- This conduit feeds information to the other thread - let go = do err <- liftIO $ atomically $ takeTMVar var - case err of - GRpcOk _ -> go2 - e -> yield $ (\_ -> error "this should never happen") <$> e - go2 = do nextOut <- await - case nextOut of - Just v -> do liftIO $ atomically $ writeTMChan outchan v - go2 - Nothing -> do r <- liftIO $ atomically $ tryReadTMChan inchan - case r of - Nothing -> pure () -- both are empty, end - Just Nothing -> go2 - Just (Just nextIn) -> yield nextIn >> go2 - pure go + () (incomingEventConsumer serverChan) + () (outgoingEventProducer clientChan) + liftIO $ atomically $ putTMVar finalReply v + let clientConduit = do + sinkTMChan clientChan + liftIO . atomically . closeTMChan $ clientChan + serverConduit = do + sourceTMChan serverChan + liftIO . atomically . readTMVar $ finalReply + pure (clientConduit, serverConduit) + where + incomingEventConsumer :: TMChan r -> () -> IncomingEvent (GRpcOWTy p rref r) () -> ExceptT ClientError IO () + incomingEventConsumer serverChan _ ievent = + case ievent of + RecvMessage o -> do + liftIO $ atomically $ writeTMChan serverChan (unGRpcOWTy (Proxy @p) (Proxy @rref) o) + Invalid e -> liftIO $ do + atomically $ closeTMChan serverChan + throwIO e + Trailers _ -> + -- TODO: Read the trailers and use them to make the 'finalReply' + liftIO $ atomically $ closeTMChan serverChan + Headers _ -> + -- TODO: Read the headers and use them to make the 'finalReply' + pure () + + outgoingEventProducer :: TMChan v -> () -> ExceptT ClientError IO ((), OutgoingEvent (GRpcIWTy p vref v) ()) + outgoingEventProducer clientChan _ = do + nextVal <- liftIO $ atomically $ readTMChan clientChan + case nextVal of + Nothing -> pure ((), Finalize) + Just v -> pure ((), SendMessage compress (buildGRpcIWTy (Proxy @p) (Proxy @vref) v))