Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify connection manager state data type #5017

Merged
merged 15 commits into from
Dec 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions network-mux/src/Network/Mux/Bearer/AttenuatedChannel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ module Network.Mux.Bearer.AttenuatedChannel
, Size
, SuccessOrFailure (..)
, Attenuation (..)
, QueueChannel
, newAttenuatedChannel
, echoQueueChannel
, newConnectedAttenuatedChannelPair
, attenuationChannelAsBearer
-- * Trace
Expand Down Expand Up @@ -60,6 +63,19 @@ data QueueChannel m = QueueChannel {
qcWrite :: StrictTVar m (Maybe (StrictTQueue m Message))
}


-- A `QueueChannel` which receives what is written to it.
--
echoQueueChannel :: MonadSTM m => STM m (QueueChannel m)
echoQueueChannel = do
q <- newTQueue
v <- newTVar (Just q)
return QueueChannel {
qcRead = v,
qcWrite = v
}


--
-- QueueChannel API
--
Expand Down
2 changes: 2 additions & 0 deletions ouroboros-network-framework/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
`unregister{Inbound,Outbound}Connection` to `release{Inbound,Outbound}Connection`.
`AssertionLocation` constructors were renamed as well.
* Added `RawBearer` API (see https://github.com/IntersectMBO/ouroboros-network/pull/4395)
* Connection manager is using `ConnectionId`s to identify connections, this
affects its API.

### Non-breaking changes

Expand Down
2 changes: 1 addition & 1 deletion ouroboros-network-framework/demo/connection-manager.hs
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ bidirectionalExperiment
muxBundle
res <-
releaseOutboundConnection
connectionManager remoteAddr
connectionManager connId
case res of
UnsupportedState inState -> do
traceWith debugTracer ( "initiator-loop"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@ library
Ouroboros.Network.Channel
Ouroboros.Network.ConnectionHandler
Ouroboros.Network.ConnectionId
Ouroboros.Network.ConnectionManager.ConnMap
Ouroboros.Network.ConnectionManager.Core
Ouroboros.Network.ConnectionManager.InformationChannel
Ouroboros.Network.ConnectionManager.State
Ouroboros.Network.ConnectionManager.Types
Ouroboros.Network.Context
Ouroboros.Network.Driver
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ mkSnocket scheduleMap = do
)

. getSchedule
<$> (getScheduleMap scheduleMap)
<$> getScheduleMap scheduleMap
v <- newTVarIO inboundSchedule
return $ Snocket {
getLocalAddr,
Expand All @@ -449,10 +449,10 @@ mkSnocket scheduleMap = do
$> x

getLocalAddr (FD v) =
fdLocalAddress <$> atomically (readTVar v)
fdLocalAddress <$> readTVarIO v

getRemoteAddr (FD v) = do
mbRemote <- fdRemoteAddress <$> atomically (readTVar v)
mbRemote <- fdRemoteAddress <$> readTVarIO v
case mbRemote of
Nothing -> throwIO InvalidArgumentError
Just addr -> pure addr
Expand Down Expand Up @@ -830,7 +830,7 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =

Right (Just (Disconnected {})) -> pure ()

Right (Just (Connected _ _ _)) -> do
Right (Just (Connected connId _ _)) -> do
threadDelay (either id id (seActiveDelay conn))
-- if this outbound connection is not
-- executed within inbound connection,
Expand All @@ -850,11 +850,11 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =
-- successful.
void $
releaseInboundConnection
connectionManager addr
connectionManager connId

res <-
releaseOutboundConnection
connectionManager addr
connectionManager connId
case res of
UnsupportedState st ->
throwIO (UnsupportedStateError
Expand All @@ -879,17 +879,20 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =
(Accepted fd' addr', acceptNext) -> do
thread <-
async $ do
localAddress <- getLocalAddr snocket fd'
let connId = ConnectionId { localAddress,
remoteAddress = addr' }
labelThisThread ("th-inbound-"
++ show (getTestAddress addr))
Just conn' <-
fdScheduleEntry
<$> atomically (readTVar (fdState fd'))
<$> readTVarIO (fdState fd')
when (addr /= addr' && seIdx conn /= seIdx conn') $
throwIO (MismatchedScheduleEntry (addr, seIdx conn)
(addr', seIdx conn'))
_ <-
includeInboundConnection
connectionManager maxBound fd' addr
connectionManager maxBound fd' connId
t <- getMonotonicTime

let activeDelay = either id id (seActiveDelay conn)
Expand All @@ -902,11 +905,11 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =
threadDelay x
_ <-
promotedToWarmRemote
connectionManager addr
connectionManager connId
threadDelay y
_ <-
demotedToColdRemote
connectionManager addr
connectionManager connId
return ()
)
(threadDelay activeDelay)
Expand All @@ -930,7 +933,7 @@ prop_valid_transitions (Fixed rnd) (SkewedBool bindToLocalAddress) scheduleMap =
-- TODO: should we run 'unregisterInboundConnection' depending on 'seActiveDelay'
void $
releaseInboundConnection
connectionManager addr
connectionManager connId
go (thread : threads) acceptNext conns'
(AcceptFailure err, _acceptNext) ->
throwIO err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -884,7 +884,7 @@ multinodeExperiment inboundTrTracer trTracer inboundTracer debugTracer cmTracer
m <- readTVar connVar
check (Map.member (connId remoteAddr) m)
writeTVar connVar (Map.delete (connId remoteAddr) m)
void (releaseOutboundConnection cm remoteAddr)
void (releaseOutboundConnection cm (connId remoteAddr))
go (Map.delete remoteAddr connMap)
RunMiniProtocols remoteAddr reqs -> do
atomically $ do
Expand Down Expand Up @@ -1039,6 +1039,7 @@ prop_connection_manager_valid_transitions_racy
(Fixed rnd) serverAcc (ArbDataFlow dataFlow)
defaultBearerInfo mns@(MultiNodeScript events attenuationMap) =
exploreSimTrace id sim $ \_ trace ->
counterexample (ppTrace trace) $
validate_transitions mns trace
where
sim :: IOSim s ()
Expand Down Expand Up @@ -1159,12 +1160,13 @@ prop_connection_manager_valid_transition_order (Fixed rnd) serverAcc (ArbDataFlo
in tabulate "ConnectionEvents" (map showConnectionEvents events)
. counterexample (ppScript mns)
. counterexample (Trace.ppTrace show show abstractTransitionEvents)
. counterexample (ppTrace trace)
. bifoldMap
( \ case
MainReturn {} -> mempty
_ -> All False
)
(verifyAbstractTransitionOrder True)
(verifyAbstractTransitionOrder id True)
. fmap (map ttTransition)
. groupConns id abstractStateIsFinalTransition
$ abstractTransitionEvents
Expand Down Expand Up @@ -1204,7 +1206,7 @@ prop_connection_manager_valid_transition_order_racy (Fixed rnd) serverAcc (ArbDa
MainReturn {} -> mempty
_ -> All False
)
(verifyAbstractTransitionOrder True)
(verifyAbstractTransitionOrder id True)
. fmap (map ttTransition)
. groupConns id abstractStateIsFinalTransition
$ abstractTransitionEvents
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import Data.ByteString.Lazy (ByteString)
import Data.Foldable (traverse_)
import Data.Functor (void)
import Data.Map qualified as Map
import Data.Maybe (isNothing)
import Data.Set (Set)
import Data.Set qualified as Set
import Text.Printf
Expand All @@ -50,6 +51,7 @@ import Ouroboros.Network.Snocket
import Simulation.Network.Snocket

import Network.Mux as Mx
import Network.Mux.Types as Mx
import Network.TypedProtocol.Codec.CBOR
import Network.TypedProtocol.Core
import Network.TypedProtocol.Peer
Expand Down Expand Up @@ -81,6 +83,8 @@ tests =
prop_connect_to_not_listening_socket
, testProperty "simultaneous_open"
prop_simultaneous_open
, testProperty "self connect"
prop_self_connect
]

type TestAddr = TestAddress Int
Expand Down Expand Up @@ -543,6 +547,47 @@ prop_simultaneous_open defaultBearerInfo =
snocket getState
wait clientAsync


-- | Check that when we bind both outbound and inbound socket to the same
-- address, and connect the outbound to inbound:
--
-- * accept loop never returns
-- * the outbound socket acts as a mirror
--
-- This is how socket API behaves on Linux.
--
prop_self_connect :: ByteString -> Property
prop_self_connect payload =
runSimOrThrow sim
where
addr :: TestAddress Int
addr = TestAddress 0

sim :: forall s. IOSim s Property
sim =
withSnocket nullTracer noAttenuation Map.empty
$ \snocket _getState ->
withAsync (runServer addr snocket
(close snocket) acceptOne return)
$ \serverThread -> do
bracket (openToConnect snocket addr)
(close snocket)
$ \fd -> do
bind snocket fd addr
connect snocket fd addr
bearer <- getBearer makeFDBearer 10 nullTracer fd
let channel = bearerAsChannel bearer (MiniProtocolNum 0) InitiatorDir
send channel payload
payload' <- recv channel
threadDelay 1
serverResult <- atomically $
(Just <$> waitSTM serverThread)
`orElse`
pure Nothing
return $ Just payload === payload'
.&&. isNothing serverResult


--
-- Utils
--
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,10 +411,10 @@ makeConnectionHandler muxTracer singMuxMode
--


-- | 'ConnectionHandlerTrace' is embedded into 'ConnectionManagerTrace' with
-- 'Ouroboros.Network.ConnectionManager.Types.ConnectionHandlerTrace'
-- constructor. It already includes 'ConnectionId' so we don't need to take
-- care of it here.
-- | 'ConnectionHandlerTrace' is embedded into
-- 'Ouroboros.Network.ConnectionManager.Core.Trace' with
-- 'Ouroboros.Network.ConnectionManager.Types.TrConnectionHandler' constructor.
-- It already includes 'ConnectionId' so we don't need to take care of it here.
--
-- TODO: when 'Handshake' will get its own tracer, independent of 'Mux', it
-- should be embedded into 'ConnectionHandlerTrace'.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ data ConnectionId addr = ConnectionId {
--
-- /Note:/ we relay on the fact that `remoteAddress` is an order
-- preserving map (which allows us to use `Map.mapKeysMonotonic` in some
-- cases).
-- cases. We also relay on this particular order in
-- `Ouroboros.Network.ConnectionManager.State.liveConnections`
--
instance Ord addr => Ord (ConnectionId addr) where
conn `compare` conn' =
Expand Down
Loading
Loading