-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhasrocket.hs
executable file
·133 lines (97 loc) · 3.57 KB
/
hasrocket.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/env ./execthirdline.sh
-- compile and run within a docker image
-- set -e && executable=`basename -s .hs ${1}` && docker run -it -v $(pwd):/work agocorona/transient:05-02-2017 bash -c "cabal install mono-traversable unagi-chan && ghc /work/${1} && /work/${executable} ${2} ${3}"
-- transient application for the websocket shootout
-- https://github.com/hashrocket/websocket-shootout
{-#LANGUAGE OverloadedStrings, ScopedTypeVariables #-}
module Main where
import Transient.Internals
import Transient.Move
--import Transient.EVars
import Control.Applicative
import Transient.Logged
import Transient.Move.Utils
--import Data.Text hiding (empty)
import Control.Monad.IO.Class
import qualified Data.Aeson as Aeson
import qualified Network.WebSockets.Connection as WS
import qualified Data.ByteString.Lazy.Char8 as BS
import Data.Containers
import System.IO.Unsafe
import System.Mem.StableName
import Control.Concurrent
import Data.IORef
import qualified Data.Map as M
import Control.Exception
import Control.Monad
import qualified Control.Concurrent.Chan.Unagi as Unagi
rmap= unsafePerformIO $ newIORef M.empty
data Msg = Echo | Broadcast BS.ByteString
main= keep' . freeThreads $ do
broad <- newEVar
-- clients <- liftIO $ newIORef [] -- (M.empty)
initNode $ apisample broad
apisample broad= api $ watchBroadcast broad <|> process broad
process broad= do
msg <- paramVal
processMessage broad msg
processMessage broad msg= do
Aeson.Object obj <- emptyIfNothing $ Aeson.decode msg
Aeson.String typ <- emptyIfNothing $ Data.Containers.lookup "type" obj
case typ of
"echo" -> return msg
"broadcast" -> do
let res = Aeson.encode $ insertMap "type" "broadcastResult" obj
writeEVar broad msg
return res
watchBroadcast broad= threads 0 $ readEVar broad
emptyIfNothing= Transient . return
data EVar a= EVar (Unagi.InChan ( StreamData a))
readEVar :: EVar a -> TransIO a
readEVar (EVar ref1)= do
tchan <- liftIO $ Unagi.dupChan ref1
mx <- parallel $ Unagi.readChan tchan `catch` \(e :: SomeException) -> error $ show e
case mx of
SError e -> finish $ Just e
SMore x -> return x
newEVar :: TransIO (EVar a)
newEVar = Transient $ do
(ref, _) <- liftIO $ Unagi.newChan
return . Just $ EVar ref
writeEVar (EVar ref1) x= liftIO $ do
Unagi.writeChan ref1 $ SMore x
-- alternate, without broadcast channels
--apisample5 clients = Cloud $ do
-- Connection _(Just (Node2Web conn )) _ _ _ _ _ _ <- getSData <|> error "ERRROR"
-- msg <- paramVal
-- processMessage conn msg
-- <|> do
-- Connection _(Just (Node2Web conn )) _ _ _ _ _ _ <- getSData <|> error "ERRROR"
-- liftIO . atomicModifyIORef clients $ \m -> ( conn :m , ())
-- where
-- processMessage conn msg= do
-- case parseMsg msg of
---- Nothing -> error "NOTHING" -- WS.sendClose conn ("Invalid message" :: BS.ByteString)
--
-- Just Echo -> liftIO $ WS.sendTextData conn msg
--
-- Just (Broadcast res) -> do
--
-- cs <- liftIO $ readIORef clients
-- liftIO $ mapM (flip WS.sendTextData msg) cs -- !> (length cs)
-- liftIO $ WS.sendTextData conn res
--
--
--parseMsg :: BS.ByteString -> Maybe Msg
--parseMsg msg = do
-- Aeson.Object obj <- Aeson.decode msg
-- Aeson.String typ <- Data.Containers.lookup "type" obj
--
-- case typ of
-- "echo" -> Just Echo
--
-- "broadcast" -> let
-- res = Aeson.encode (insertMap "type" "broadcastResult" obj)
-- in Just (Broadcast res)
--
-- _ -> Nothing