Skip to content

Commit

Permalink
Merge pull request #5017 from IntersectMBO/coot/connection-manager-state
Browse files Browse the repository at this point in the history
Modify connection manager state data type
  • Loading branch information
coot authored Dec 11, 2024
2 parents a3ab8fc + e73179a commit cb7af71
Show file tree
Hide file tree
Showing 20 changed files with 1,045 additions and 779 deletions.
16 changes: 16 additions & 0 deletions network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ module Network.Mux.Bearer.AttenuatedChannel
, Size
, SuccessOrFailure (..)
, Attenuation (..)
, QueueChannel
, newAttenuatedChannel
, echoQueueChannel
, newConnectedAttenuatedChannelPair
, attenuationChannelAsBearer
-- * Trace
Expand Down Expand Up @@ -60,6 +63,19 @@ data QueueChannel m = QueueChannel {
qcWrite :: StrictTVar m (Maybe (StrictTQueue m Message))
}


-- A `QueueChannel` which receives what is written to it.
--
echoQueueChannel :: MonadSTM m => STM m (QueueChannel m)
echoQueueChannel = do
q <- newTQueue
v <- newTVar (Just q)
return QueueChannel {
qcRead = v,
qcWrite = v
}


--
-- QueueChannel API
--
Expand Down
2 changes: 2 additions & 0 deletions ouroboros-network-framework/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
`unregister{Inbound,Outbound}Connection` to `release{Inbound,Outbound}Connection`.
`AssertionLocation` constructors were renamed as well.
* Added `RawBearer` API (see https://github.com/IntersectMBO/ouroboros-network/pull/4395)
* Connection manager is using `ConnectionId`s to identify connections, this
affects its API.

### Non-breaking changes

Expand Down
2 changes: 1 addition & 1 deletion ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ bidirectionalExperiment
muxBundle
res <-
releaseOutboundConnection
connectionManager remoteAddr
connectionManager connId
case res of
UnsupportedState inState -> do
traceWith debugTracer ( "initiator-loop"
Expand Down
2 changes: 2 additions & 0 deletions ouroboros-network-framework/ouroboros-network-framework.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ library
Ouroboros.Network.Channel
Ouroboros.Network.ConnectionHandler
Ouroboros.Network.ConnectionId
Ouroboros.Network.ConnectionManager.ConnMap
Ouroboros.Network.ConnectionManager.Core
Ouroboros.Network.ConnectionManager.InformationChannel
Ouroboros.Network.ConnectionManager.State
Ouroboros.Network.ConnectionManager.Types
Ouroboros.Network.Context
Ouroboros.Network.Driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ mkSnocket scheduleMap = do
)

. getSchedule
<$> (getScheduleMap scheduleMap)
<$> getScheduleMap scheduleMap
v <- newTVarIO inboundSchedule
return $ Snocket {
getLocalAddr,
Expand All @@ -449,10 +449,10 @@ mkSnocket scheduleMap = do
$> x

getLocalAddr (FD v) =
fdLocalAddress <$> atomically (readTVar v)
fdLocalAddress <$> readTVarIO v

getRemoteAddr (FD v) = do
mbRemote <- fdRemoteAddress <$> atomically (readTVar v)
mbRemote <- fdRemoteAddress <$> readTVarIO v
case mbRemote of
Nothing -> throwIO InvalidArgumentError
Just addr -> pure addr
Expand Down Expand Up @@ -830,7 +830,7 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =

Right (Just (Disconnected {})) -> pure ()

Right (Just (Connected _ _ _)) -> do
Right (Just (Connected connId _ _)) -> do
threadDelay (either id id (seActiveDelay conn))
-- if this outbound connection is not
-- executed within inbound connection,
Expand All @@ -850,11 +850,11 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =
-- successful.
void $
releaseInboundConnection
connectionManager addr
connectionManager connId

res <-
releaseOutboundConnection
connectionManager addr
connectionManager connId
case res of
UnsupportedState st ->
throwIO (UnsupportedStateError
Expand All @@ -879,17 +879,20 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =
(Accepted fd' addr', acceptNext) -> do
thread <-
async $ do
localAddress <- getLocalAddr snocket fd'
let connId = ConnectionId { localAddress,
remoteAddress = addr' }
labelThisThread ("th-inbound-"
++ show (getTestAddress addr))
Just conn' <-
fdScheduleEntry
<$> atomically (readTVar (fdState fd'))
<$> readTVarIO (fdState fd')
when (addr /= addr' && seIdx conn /= seIdx conn') $
throwIO (MismatchedScheduleEntry (addr, seIdx conn)
(addr', seIdx conn'))
_ <-
includeInboundConnection
connectionManager maxBound fd' addr
connectionManager maxBound fd' connId
t <- getMonotonicTime

let activeDelay = either id id (seActiveDelay conn)
Expand All @@ -902,11 +905,11 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =
threadDelay x
_ <-
promotedToWarmRemote
connectionManager addr
connectionManager connId
threadDelay y
_ <-
demotedToColdRemote
connectionManager addr
connectionManager connId
return ()
)
(threadDelay activeDelay)
Expand All @@ -930,7 +933,7 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =
-- TODO: should we run 'unregisterInboundConnection' depending on 'seActiveDelay'
void $
releaseInboundConnection
connectionManager addr
connectionManager connId
go (thread : threads) acceptNext conns'
(AcceptFailure err, _acceptNext) ->
throwIO err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ multinodeExperiment inboundTrTracer trTracer inboundTracer debugTracer cmTracer
m <- readTVar connVar
check (Map.member (connId remoteAddr) m)
writeTVar connVar (Map.delete (connId remoteAddr) m)
void (releaseOutboundConnection cm remoteAddr)
void (releaseOutboundConnection cm (connId remoteAddr))
go (Map.delete remoteAddr connMap)
RunMiniProtocols remoteAddr reqs -> do
atomically $ do
Expand Down Expand Up @@ -1039,6 +1039,7 @@ prop_connection_manager_valid_transitions_racy
(Fixed rnd) serverAcc (ArbDataFlow dataFlow)
defaultBearerInfo mns@(MultiNodeScript events attenuationMap) =
exploreSimTrace id sim $ \_ trace ->
counterexample (ppTrace trace) $
validate_transitions mns trace
where
sim :: IOSim s ()
Expand Down Expand Up @@ -1159,12 +1160,13 @@ prop_connection_manager_valid_transition_order (Fixed rnd) serverAcc (ArbDataFlo
in tabulate "ConnectionEvents" (map showConnectionEvents events)
. counterexample (ppScript mns)
. counterexample (Trace.ppTrace show show abstractTransitionEvents)
. counterexample (ppTrace trace)
. bifoldMap
( \ case
MainReturn {} -> mempty
_ -> All False
)
(verifyAbstractTransitionOrder True)
(verifyAbstractTransitionOrder id True)
. fmap (map ttTransition)
. groupConns id abstractStateIsFinalTransition
$ abstractTransitionEvents
Expand Down Expand Up @@ -1204,7 +1206,7 @@ prop_connection_manager_valid_transition_order_racy (Fixed rnd) serverAcc (ArbDa
MainReturn {} -> mempty
_ -> All False
)
(verifyAbstractTransitionOrder True)
(verifyAbstractTransitionOrder id True)
. fmap (map ttTransition)
. groupConns id abstractStateIsFinalTransition
$ abstractTransitionEvents
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import Data.ByteString.Lazy (ByteString)
import Data.Foldable (traverse_)
import Data.Functor (void)
import Data.Map qualified as Map
import Data.Maybe (isNothing)
import Data.Set (Set)
import Data.Set qualified as Set
import Text.Printf
Expand All @@ -50,6 +51,7 @@ import Ouroboros.Network.Snocket
import Simulation.Network.Snocket

import Network.Mux as Mx
import Network.Mux.Types as Mx
import Network.TypedProtocol.Codec.CBOR
import Network.TypedProtocol.Core
import Network.TypedProtocol.Peer
Expand Down Expand Up @@ -81,6 +83,8 @@ tests =
prop_connect_to_not_listening_socket
, testProperty "simultaneous_open"
prop_simultaneous_open
, testProperty "self connect"
prop_self_connect
]

type TestAddr = TestAddress Int
Expand Down Expand Up @@ -543,6 +547,47 @@ prop_simultaneous_open defaultBearerInfo =
snocket getState
wait clientAsync


-- | Check that when we bind both outbound and inbound socket to the same
-- address, and connect the outbound to inbound:
--
-- * accept loop never returns
-- * the outbound socket acts as a mirror
--
-- This is how socket API behaves on Linux.
--
prop_self_connect :: ByteString -> Property
prop_self_connect payload =
runSimOrThrow sim
where
addr :: TestAddress Int
addr = TestAddress 0

sim :: forall s. IOSim s Property
sim =
withSnocket nullTracer noAttenuation Map.empty
$ \snocket _getState ->
withAsync (runServer addr snocket
(close snocket) acceptOne return)
$ \serverThread -> do
bracket (openToConnect snocket addr)
(close snocket)
$ \fd -> do
bind snocket fd addr
connect snocket fd addr
bearer <- getBearer makeFDBearer 10 nullTracer fd
let channel = bearerAsChannel bearer (MiniProtocolNum 0) InitiatorDir
send channel payload
payload' <- recv channel
threadDelay 1
serverResult <- atomically $
(Just <$> waitSTM serverThread)
`orElse`
pure Nothing
return $ Just payload === payload'
.&&. isNothing serverResult


--
-- Utils
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,10 @@ makeConnectionHandler muxTracer singMuxMode
--


-- | 'ConnectionHandlerTrace' is embedded into 'ConnectionManagerTrace' with
-- 'Ouroboros.Network.ConnectionManager.Types.ConnectionHandlerTrace'
-- constructor. It already includes 'ConnectionId' so we don't need to take
-- care of it here.
-- | 'ConnectionHandlerTrace' is embedded into
-- 'Ouroboros.Network.ConnectionManager.Core.Trace' with
-- 'Ouroboros.Network.ConnectionManager.Types.TrConnectionHandler' constructor.
-- It already includes 'ConnectionId' so we don't need to take care of it here.
--
-- TODO: when 'Handshake' will get its own tracer, independent of 'Mux', it
-- should be embedded into 'ConnectionHandlerTrace'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ data ConnectionId addr = ConnectionId {
--
-- /Note:/ we relay on the fact that `remoteAddress` is an order
-- preserving map (which allows us to use `Map.mapKeysMonotonic` in some
-- cases).
-- cases. We also relay on this particular order in
-- `Ouroboros.Network.ConnectionManager.State.liveConnections`
--
instance Ord addr => Ord (ConnectionId addr) where
conn `compare` conn' =
Expand Down
Loading

0 comments on commit cb7af71

Please sign in to comment.