Skip to content

Commit

Permalink
Make single notification pushes strict in their HTTP body
Browse files Browse the repository at this point in the history
The websockets library does not support streaming
(jaspervdj/websockets#119). So, there is no
value in superficially transforming data to a stream that later won't be
streamed. However, WebSocketsData enables us to be polymorphic in
the message type, i.e. let the endpoints type decide what to use. This
would make streaming easier to enable (if it's ever implemented by the
library) and safes us from some nasty conversion code.
  • Loading branch information
supersven committed Feb 3, 2022
1 parent 0e8ce3c commit 24ba13f
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 37 deletions.
35 changes: 8 additions & 27 deletions services/cannon/src/Cannon/API/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,13 @@ import Cannon.App
import qualified Cannon.Dict as D
import Cannon.Types
import Cannon.WS
import Conduit
import Control.Monad.Catch
import Data.Aeson (encode)
import qualified Data.ByteString as S
import qualified Data.ByteString.Lazy as L
import Data.Conduit.List
import Data.Id hiding (client)
import Gundeck.Types
import Gundeck.Types.BulkPush
import Imports
import Network.WebSockets
import Servant
import Servant.Conduit ()
import System.Logger.Class (msg, val)
Expand All @@ -45,11 +42,6 @@ import Wire.API.ErrorDescription
import Wire.API.Routes.MultiVerb
import Wire.API.Routes.Named

newtype PushNotificationStream = PushNotificationStream
{ getPushNotificationStream :: ConduitT () ByteString (ResourceT WS) ()
}
deriving newtype (FromSourceIO ByteString)

type InternalAPI =
"i"
:> ( Named
Expand All @@ -66,7 +58,7 @@ type InternalAPI =
( "push"
:> Capture "user" UserId
:> Capture "conn" ConnId
:> StreamBody NoFraming OctetStream PushNotificationStream
:> ReqBody '[JSON] Text
:> MultiVerb
'POST
'[JSON]
Expand Down Expand Up @@ -103,18 +95,15 @@ internalServer =
:<|> Named @"bulk-push-notifications" bulkPushHandler
:<|> Named @"check-presence" checkPresenceHandler

pushHandler :: UserId -> ConnId -> PushNotificationStream -> Cannon (Maybe ())
pushHandler :: UserId -> ConnId -> Text -> Cannon (Maybe ())
pushHandler user conn body =
singlePush body (PushTarget user conn) >>= \case
PushStatusOk -> pure $ Just ()
PushStatusGone -> pure Nothing

-- | Take a serialized 'Notification' string and send it to the 'PushTarget'.
singlePush :: PushNotificationStream -> PushTarget -> Cannon PushStatus
singlePush = singlePush' . getPushNotificationStream

singlePush' :: ConduitM () ByteString (ResourceT WS) () -> PushTarget -> Cannon PushStatus
singlePush' notificationC (PushTarget usrid conid) = do
-- | Take notification @n@ and send it to the 'PushTarget'.
singlePush :: (WebSocketsData a) => a -> PushTarget -> Cannon PushStatus
singlePush n (PushTarget usrid conid) = do
let k = mkKey usrid conid
d <- clients
LC.debug $ client (key2bytes k) . msg (val "push")
Expand All @@ -127,9 +116,7 @@ singlePush' notificationC (PushTarget usrid conid) = do
e <- wsenv
runWS e $ do
catchAll
( runConduitRes $
notificationC .| (sendMsgConduit k x >> pure PushStatusOk)
)
(runWS e (sendMsg n k x) >> pure PushStatusOk)
(const (terminate k x >> pure PushStatusGone))

bulkPushHandler :: BulkPushRequest -> Cannon BulkPushResponse
Expand All @@ -138,10 +125,7 @@ bulkPushHandler (BulkPushRequest ns) =
where
doNotify :: Notification -> [PushTarget] -> Cannon [PushStatus]
doNotify (encode -> notification) =
mapConcurrentlyCannon
( singlePush'
(sourceLbs notification)
)
mapConcurrentlyCannon (singlePush notification)
compileResp ::
(Notification, [PushTarget]) ->
[PushStatus] ->
Expand All @@ -155,6 +139,3 @@ checkPresenceHandler u c = do
if registered
then pure $ Just ()
else pure Nothing

sourceLbs :: Monad m => L.ByteString -> ConduitT i S.ByteString m ()
sourceLbs = sourceList . L.toChunks
19 changes: 9 additions & 10 deletions services/cannon/src/Cannon/WS.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ module Cannon.WS
mkKey,
key2bytes,
client,
sendMsgConduit,
sendMsg,
)
where

Expand Down Expand Up @@ -229,17 +229,16 @@ sendMsgIO :: (WebSocketsData a) => a -> Websocket -> IO ()
sendMsgIO m c =
recoverAll retry3x $ const $ sendBinaryData (connection c) m

sendMsgConduit :: Key -> Websocket -> ConduitT ByteString Void (ResourceT WS) ()
sendMsgConduit k c = do
m <- sinkLazy
lift $ traceLog m
liftIO $ sendMsgIO m c
sendMsg :: (WebSocketsData a) => a -> Key -> Websocket -> WS ()
sendMsg message k c = do
traceLog message
liftIO $ sendMsgIO message c
where
traceLog :: L.ByteString -> (ResourceT WS) ()
traceLog m = lift $ trace $ client kb . msg (logMsg m)
traceLog :: (WebSocketsData a) => a -> WS ()
traceLog m = trace $ client kb . msg (logMsg m)

logMsg :: L.ByteString -> Builder
logMsg m = val "sendMsgConduit: \"" +++ L.take 128 m +++ val "...\""
logMsg :: (WebSocketsData a) => a -> Builder
logMsg m = val "sendMsgConduit: \"" +++ L.take 128 (toLazyByteString m) +++ val "...\""

kb = key2bytes k

Expand Down

0 comments on commit 24ba13f

Please sign in to comment.