Skip to content

Commit

Permalink
Merge pull request #4693 from karknu/karknu/keepalive_start
Browse files Browse the repository at this point in the history
KeepAlive Client collect samples on first packet
  • Loading branch information
coot authored Oct 27, 2023
2 parents 7631338 + 1728830 commit 80e823a
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 55 deletions.
2 changes: 2 additions & 0 deletions ouroboros-network-protocols/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

### Non-breaking changes

* Make it possible for KeepAlive client to collect a rtt sample for the first packet.

## 0.5.3.0 -- 2023-10-26

### Non-breaking changes
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE DataKinds #-}
{-# LANGUAGE GADTs #-}
{-# LANGUAGE ScopedTypeVariables #-}

module Ouroboros.Network.Protocol.KeepAlive.Client
( KeepAliveClient (..)
, KeepAliveClientSt (..)
, keepAliveClientPeer
) where

Expand All @@ -11,31 +13,40 @@ import Network.TypedProtocol.Core
import Ouroboros.Network.Protocol.KeepAlive.Type


data KeepAliveClient m a where
newtype KeepAliveClient m a = KeepAliveClient (m (KeepAliveClientSt m a))

data KeepAliveClientSt m a where
SendMsgKeepAlive
:: Cookie
-> (m (KeepAliveClient m a))
-> KeepAliveClient m a
-> m (KeepAliveClientSt m a)
-> KeepAliveClientSt m a

SendMsgDone
:: m a
-> KeepAliveClient m a
-> KeepAliveClientSt m a


-- | Interpret a particular client action sequence into the client side of the
-- 'KeepAlive' protocol.
--
keepAliveClientPeer
:: MonadThrow m
:: forall m a. MonadThrow m
=> KeepAliveClient m a
-> Peer KeepAlive AsClient StClient m a

keepAliveClientPeer (SendMsgDone mresult) =
Yield (ClientAgency TokClient) MsgDone $
Effect (Done TokDone <$> mresult)

keepAliveClientPeer (SendMsgKeepAlive cookieReq next) =
Yield (ClientAgency TokClient) (MsgKeepAlive cookieReq) $
Await (ServerAgency TokServer) $ \(MsgKeepAliveResponse cookieRsp) ->
if cookieReq == cookieRsp then Effect $ keepAliveClientPeer <$> next
else Effect $ throwIO $ KeepAliveCookieMissmatch cookieReq cookieRsp
keepAliveClientPeer (KeepAliveClient client) =
Effect $ keepAliveClientStPeer <$> client
where

keepAliveClientStPeer
:: KeepAliveClientSt m a
-> Peer KeepAlive AsClient StClient m a

keepAliveClientStPeer (SendMsgDone mresult) =
Yield (ClientAgency TokClient) MsgDone $
Effect (Done TokDone <$> mresult)

keepAliveClientStPeer (SendMsgKeepAlive cookieReq next) =
Yield (ClientAgency TokClient) (MsgKeepAlive cookieReq) $
Await (ServerAgency TokServer) $ \(MsgKeepAliveResponse cookieRsp) ->
if cookieReq == cookieRsp then Effect $ keepAliveClientStPeer <$> next
else Effect $ throwIO $ KeepAliveCookieMissmatch cookieReq cookieRsp
Original file line number Diff line number Diff line change
@@ -1,19 +1,28 @@
{-# LANGUAGE NamedFieldPuns #-}
{-# LANGUAGE RankNTypes #-}

module Ouroboros.Network.Protocol.KeepAlive.Direct where

import Ouroboros.Network.Protocol.KeepAlive.Client
import Ouroboros.Network.Protocol.KeepAlive.Server

direct :: Monad m

direct :: forall a b m. Monad m
=> KeepAliveServer m a
-> KeepAliveClient m b
-> m (a, b)
direct KeepAliveServer { recvMsgDone }
(SendMsgDone mdone) =
(,) <$> recvMsgDone <*> mdone
direct KeepAliveServer { recvMsgKeepAlive }
(SendMsgKeepAlive _cookie mclient) = do
server <- recvMsgKeepAlive
client <- mclient
direct server client
direct srv (KeepAliveClient clientM) = do
go srv =<< clientM
where
go :: Monad m
=> KeepAliveServer m a
-> KeepAliveClientSt m b
-> m (a, b)
go KeepAliveServer { recvMsgDone }
(SendMsgDone mdone) =
(,) <$> recvMsgDone <*> mdone
go KeepAliveServer { recvMsgKeepAlive }
(SendMsgKeepAlive _cookie mclient) = do
server <- recvMsgKeepAlive
client <- mclient
go server client
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ keepAliveClientApply :: forall acc m. Monad m
-> Int
-- ^ count of number of requests
-> KeepAliveClient m acc
keepAliveClientApply f = go
keepAliveClientApply f aa an = KeepAliveClient $ return (go aa an)
where
go :: acc -> Int -> KeepAliveClient m acc
go :: acc -> Int -> KeepAliveClientSt m acc
go acc n
| n <= 0
= SendMsgDone (pure acc)
Expand Down
1 change: 1 addition & 0 deletions ouroboros-network/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

### Non-breaking changes

* Update KeepAlive client to collect a rtt sample for the first packet.
* Less aggresive churning of established and known peers.
* Add peer sharing to wireshark dissector.
* Adds ledger peers to diffusion simulation
Expand Down
48 changes: 21 additions & 27 deletions ouroboros-network/src/Ouroboros/Network/KeepAlive.hs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ keepAliveClient
-> KeepAliveClient m ()
keepAliveClient tracer inRng controlMessageSTM peer dqCtx KeepAliveInterval { keepAliveInterval } =
let (cookie, rng) = random inRng in
SendMsgKeepAlive (Cookie cookie) (go rng Nothing)
KeepAliveClient $ do
startTime <- getMonotonicTime
return $ SendMsgKeepAlive (Cookie cookie) (go rng startTime)
where
payloadSize = 2

Expand All @@ -70,40 +72,32 @@ keepAliveClient tracer inRng controlMessageSTM peer dqCtx KeepAliveInterval { ke
then return Continue
else retry

go :: StdGen -> Maybe Time -> m (KeepAliveClient m ())
go rng startTime_m = do
go :: StdGen -> Time -> m (KeepAliveClientSt m ())
go rng startTime = do
endTime <- getMonotonicTime
case startTime_m of
Just startTime -> do
let rtt = diffTime endTime startTime
sample = fromSample startTime endTime payloadSize
gsv' <- atomically $ do
m <- readTVar dqCtx
assert (peer `M.member` m) $ do
let (gsv', m') = M.updateLookupWithKey
(\_ a -> if sampleTime a == Time 0 -- Ignore the initial dummy value
then Just sample
else Just $ sample <> a
) peer m
writeTVar dqCtx m'
return $ fromJust gsv'
traceWith tracer $ AddSample peer rtt gsv'

Nothing -> return ()

let keepAliveInterval' = case startTime_m of
Just _ -> keepAliveInterval
Nothing -> 0 -- The first time we send a packet directly.

delayVar <- registerDelay keepAliveInterval'
let rtt = diffTime endTime startTime
sample = fromSample startTime endTime payloadSize
gsv' <- atomically $ do
m <- readTVar dqCtx
assert (peer `M.member` m) $ do
let (gsv', m') = M.updateLookupWithKey
(\_ a -> if sampleTime a == Time 0 -- Ignore the initial dummy value
then Just sample
else Just $ sample <> a
) peer m
writeTVar dqCtx m'
return $ fromJust gsv'
traceWith tracer $ AddSample peer rtt gsv'

delayVar <- registerDelay keepAliveInterval
decision <- atomically (decisionSTM delayVar)
now <- getMonotonicTime
case decision of
-- 'decisionSTM' above cannot return 'Quiesce'
Quiesce -> error "keepAliveClient: impossible happened"
Continue ->
let (cookie, rng') = random rng in
pure (SendMsgKeepAlive (Cookie cookie) $ go rng' $ Just now)
pure (SendMsgKeepAlive (Cookie cookie) $ go rng' now)
Terminate -> pure (SendMsgDone (pure ()))


Expand Down

0 comments on commit 80e823a

Please sign in to comment.