-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdistributedKeyValueServant.hs
executable file
·174 lines (130 loc) · 5.18 KB
/
distributedKeyValueServant.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
#!/usr/bin/env ./execthirdline.sh
-- compile it with ghcjs and execute it with runghc
-- set -e && port=`echo ${3} | awk -F/ '{print $(3)}'` && docker run -it -p ${port}:${port} -v $(pwd):/work agocorona/transient:05-02-2017 bash -c "runghc /work/${1} ${2} ${3}"
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeOperators #-}
-- servant with a distributed key-value store managed by transient. further doc not available yet. sorry
module Main where
import Control.Applicative
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar
--import Control.DeepSeq
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.State
import Control.Monad.Trans.Except
import Data.Aeson
import Data.ByteString.Lazy as DBL hiding (elemIndex, length,empty)
import Data.Hashable
import Data.IORef
import Data.List
import qualified Data.Map as M
import Data.Maybe
import qualified Data.Text as DT
import Data.Typeable
import Data.UUID
import Data.UUID.Aeson
import Data.UUID.V4
import GHC.Generics
import Network.Wai
import Network.Wai.Handler.Warp hiding (run)
import Servant.Server hiding (Handler)
import Servant.API
import System.IO
import System.IO.Unsafe
import Transient.Base
import Transient.Internals
import Transient.Move
import Transient.Move.Utils
import Data.Dynamic
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM
import Servant.Server.Internal.Handler
newtype VendorId = VendorId UUID
deriving(Eq, Ord, Read, Show,FromHttpApiData)
newtype ItemId = ItemId UUID
deriving(Eq, Ord,Read,Show, FromHttpApiData)
type ItemApi =
"item" :> Get '[JSON] [Item] :<|>
"item" :> Capture "itemId" ItemId :> Capture "vendorId" VendorId :> Get '[JSON] Item
itemApi :: Proxy ItemApi
itemApi = Proxy
-- * app
--instance FromHttpApiData UUID where
-- parseUrlPiece t = case fromText t of
-- Just u -> Right u
-- Nothing -> Left "Invalid UUID"
run :: IO ()
run = do
let port = 3000
settings =
setPort port $
setBeforeMainLoop (hPutStrLn stderr ("listening on port " ++ show port)) defaultSettings
runSettings settings =<< mkApp
mkApp :: IO Application
mkApp = return $ serve itemApi server
server :: Server ItemApi
server = getItems :<|> getItemById
-- type Handler = ExceptT ServantErr IO
getItems :: Handler [Item]
getItems = return [exampleItem]
getItemById :: ItemId -> VendorId -> Handler Item
-- getItemById i v = query i v
getItemById i v= do
Just hand <- liftIO $ readIORef rhandler
hand (i, v)
rhandler = unsafePerformIO $ newIORef Nothing
setHandlerQuery f= writeIORef rhandler $ Just f
--query i v= liftIO $ do
-- tv <- newMVar $ toDyn (i, v)
-- atomically $ writeTChan rquery tv
-- takeMVar tv >>= return . fromJust . fromDynamic
exampleItem :: Item
exampleItem = Item 0 "example item"
data Item
= Item {
itemId :: Int,
itemText :: String
}
deriving (Eq, Show, Generic, Read)
instance ToJSON Item
instance FromJSON Item
hashmap :: Cloud (M.Map (VendorId, ItemId) Int)
hashmap = onAll (return $ M.fromList [((VendorId . fromJust $ fromText "bacd5f20-8b46-4790-b93f-73c47b8def72", ItemId . fromJust $ fromText "db6af727-1007-4cae-bd24-f653b1c6e94e"), 10)])
-- ((VendorId . fromJust $ fromText "8f833732-a199-4a74-aa55-a6cd7b19ab66", ItemId . fromJust $ fromText "d6693304-3849-4e69-ae31-1421ea320de4"), 20)])
rquery :: TChan (MVar Dynamic)
rquery= unsafePerformIO $ newTChanIO
main :: IO ()
main = do
keep' $ (async run >> empty) <|> initNode (inputNodes <|> cluster)
return ()
cluster= do
-- localIO $ print $ length nodes
-- tv <- onAll . waitEvents . atomically $ readTChan rquery
--(i@(ItemId iid), v@(VendorId vid)) <- localIO $ do
-- d <- takeMVar tv
-- return $ fromJust $ fromDynamic d
tv <- onAll $ liftIO newEmptyMVar
(i@(ItemId iid), v@(VendorId vid)) <- local $ react setHandlerQuery (takeMVar tv)
let h = abs $ hash $ toString iid ++ toString vid
localIO $ print $ "hash" ++ show h
node <- local $ do
nodes <- getNodes
return () !> ("numnodes", length nodes)
let num = h `rem` length nodes
let node= sort nodes !! num
return node !> ("calling node",node)
m <- hashmap
quant <- runAt node $ local $ do
return () !> ("accessing", (v,i), "map=",m)
return $ M.lookup (v, i) m
--localIO $ putMVar tv $ toDyn $ case quant of
-- Just q -> (Item q "Item 1")
-- Nothing -> (Item 0 "Item Unknown")
return $ case quant of
Just q -> (Item q "Item 1")
Nothing -> (Item 0 "Item Unknown")