Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement graceful shutdown in Fluffy #2645

Merged
merged 4 commits into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 78 additions & 40 deletions fluffy/fluffy.nim
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func optionToOpt[T](o: Option[T]): Opt[T] =
else:
Opt.none(T)

proc run(config: PortalConf) {.raises: [CatchableError].} =
proc run(config: PortalConf): PortalNode {.raises: [CatchableError].} =
setupLogging(config.logLevel, config.logStdout, none(OutFile))

notice "Launching Fluffy", version = fullVersionStr, cmdParams = commandLineParams()
Expand Down Expand Up @@ -185,26 +185,30 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =
quit 1

## Start metrics HTTP server
if config.metricsEnabled:
let
address = config.metricsAddress
port = config.metricsPort
url = "http://" & $address & ":" & $port & "/metrics"

server = MetricsHttpServerRef.new($address, port).valueOr:
error "Could not instantiate metrics HTTP server", url, error
node.metricsServer =
if config.metricsEnabled:
let
address = config.metricsAddress
port = config.metricsPort
url = "http://" & $address & ":" & $port & "/metrics"

server = MetricsHttpServerRef.new($address, port).valueOr:
error "Could not instantiate metrics HTTP server", url, error
quit QuitFailure

info "Starting metrics HTTP server", url
try:
waitFor server.start()
except MetricsError as exc:
fatal "Could not start metrics HTTP server",
url, error_msg = exc.msg, error_name = exc.name
quit QuitFailure

info "Starting metrics HTTP server", url
try:
waitFor server.start()
except MetricsError as exc:
fatal "Could not start metrics HTTP server",
url, error_msg = exc.msg, error_name = exc.name
quit QuitFailure

## Start discovery v5 protocol and the Portal node.
d.start()
Opt.some(server)
else:
Opt.none(MetricsHttpServerRef)

## Start the Portal node.
node.start()

## Start the JSON-RPC APIs
Expand Down Expand Up @@ -235,24 +239,32 @@ proc run(config: PortalConf) {.raises: [CatchableError].} =

rpcServer.start()

if config.rpcEnabled:
let
ta = initTAddress(config.rpcAddress, config.rpcPort)
rpcHttpServer = RpcHttpServer.new()
# Note: Set maxRequestBodySize to 4MB instead of 1MB as there are blocks
# that reach that limit (in hex, for gossip method).
rpcHttpServer.addHttpServer(ta, maxRequestBodySize = 4 * 1_048_576)

setupRpcServer(rpcHttpServer)

if config.wsEnabled:
let
ta = initTAddress(config.rpcAddress, config.wsPort)
rpcWsServer = newRpcWebSocketServer(ta, compression = config.wsCompression)

setupRpcServer(rpcWsServer)

runForever()
node.rpcHttpServer =
if config.rpcEnabled:
let
ta = initTAddress(config.rpcAddress, config.rpcPort)
rpcHttpServer = RpcHttpServer.new()
# Note: Set maxRequestBodySize to 4MB instead of 1MB as there are blocks
# that reach that limit (in hex, for gossip method).
rpcHttpServer.addHttpServer(ta, maxRequestBodySize = 4 * 1_048_576)
setupRpcServer(rpcHttpServer)

Opt.some(rpcHttpServer)
else:
Opt.none(RpcHttpServer)

node.rpcWsServer =
if config.wsEnabled:
let
ta = initTAddress(config.rpcAddress, config.wsPort)
rpcWsServer = newRpcWebSocketServer(ta, compression = config.wsCompression)
setupRpcServer(rpcWsServer)

Opt.some(rpcWsServer)
else:
Opt.none(RpcWebSocketServer)

return node

when isMainModule:
{.pop.}
Expand All @@ -262,6 +274,32 @@ when isMainModule:
)
{.push raises: [].}

case config.cmd
of PortalCmd.noCommand:
run(config)
let node =
case config.cmd
of PortalCmd.noCommand:
run(config)

# Ctrl+C handling
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
try:
setupForeignThreadGc()
except Exception as exc:
raiseAssert exc.msg # shouldn't happen

notice "Shutting down after having received SIGINT"
node.state = PortalNodeState.Stopping

try:
setControlCHook(controlCHandler)
except Exception as exc: # TODO Exception
warn "Cannot set ctrl-c handler", msg = exc.msg

while node.state == PortalNodeState.Running:
try:
poll()
except CatchableError as e:
warn "Exception in poll()", exc = e.name, err = e.msg

waitFor node.stop()
5 changes: 3 additions & 2 deletions fluffy/network/beacon/beacon_light_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,10 @@ proc start*(lightClient: LightClient) =
info "Starting beacon light client", trusted_block_root = lightClient.trustedBlockRoot
lightClient.manager.start()

proc stop*(lightClient: LightClient) =
proc stop*(lightClient: LightClient) {.async: (raises: []).} =
info "Stopping beacon light client"
discard lightClient.manager.stop()

await lightClient.manager.stop()

proc resetToFinalizedHeader*(
lightClient: LightClient,
Expand Down
10 changes: 5 additions & 5 deletions fluffy/network/beacon/beacon_light_client_manager.nim
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type
GetBoolCallback* = proc(): bool {.gcsafe, raises: [].}
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [].}

LightClientManager* = object
LightClientManager* = ref object
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to change this to a ref object so that the stop proc can become async

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And making LightClientManager in the stop call a var did not help?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that and it didn't work. This was the error: Error: 'self' is of type <var LightClientManager> which cannot be captured as it would violate memory safety, declared here: /home/user/development/status-im/nimbus-eth1/fluffy/network/beacon/beacon_light_client_manager.nim(323, 12); using '-d:nimNoLentIterators' helps in some cases. Consider using a <ref var LightClientManager> which can be captured.

network: BeaconNetwork
rng: ref HmacDrbgContext
getTrustedBlockRoot: GetTrustedBlockRootCallback
Expand Down Expand Up @@ -315,13 +315,13 @@ proc loop(self: LightClientManager) {.async: (raises: [CancelledError]).} =
didLatestSyncTaskProgress = didProgress,
)

proc start*(self: var LightClientManager) =
proc start*(self: LightClientManager) =
## Start light client manager's loop.
doAssert self.loopFuture == nil
self.loopFuture = self.loop()

proc stop*(self: var LightClientManager) {.async: (raises: []).} =
proc stop*(self: LightClientManager) {.async: (raises: []).} =
## Stop light client manager's loop.
if self.loopFuture != nil:
await noCancel self.loopFuture.cancelAndWait()
if not self.loopFuture.isNil():
await noCancel(self.loopFuture.cancelAndWait())
self.loopFuture = nil
16 changes: 11 additions & 5 deletions fluffy/network/beacon/beacon_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -381,13 +381,19 @@ proc start*(n: BeaconNetwork) =
n.portalProtocol.start()
n.processContentLoop = processContentLoop(n)

proc stop*(n: BeaconNetwork) =
proc stop*(n: BeaconNetwork) {.async: (raises: []).} =
info "Stopping Portal beacon chain network"

n.portalProtocol.stop()
var futures: seq[Future[void]]
futures.add(n.portalProtocol.stop())

if not n.processContentLoop.isNil:
n.processContentLoop.cancelSoon()
if not n.processContentLoop.isNil():
futures.add(n.processContentLoop.cancelAndWait())

if not n.statusLogLoop.isNil():
n.statusLogLoop.cancelSoon()
futures.add(n.statusLogLoop.cancelAndWait())

await noCancel(allFutures(futures))

n.processContentLoop = nil
n.statusLogLoop = nil
14 changes: 9 additions & 5 deletions fluffy/network/history/history_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -722,13 +722,17 @@ proc start*(n: HistoryNetwork) =
n.statusLogLoop = statusLogLoop(n)
pruneDeprecatedAccumulatorRecords(n.accumulator, n.contentDB)

proc stop*(n: HistoryNetwork) =
proc stop*(n: HistoryNetwork) {.async: (raises: []).} =
info "Stopping Portal execution history network"

n.portalProtocol.stop()
var futures: seq[Future[void]]
futures.add(n.portalProtocol.stop())

if not n.processContentLoop.isNil:
n.processContentLoop.cancelSoon()

futures.add(n.processContentLoop.cancelAndWait())
if not n.statusLogLoop.isNil:
n.statusLogLoop.cancelSoon()
futures.add(n.statusLogLoop.cancelAndWait())
await noCancel(allFutures(futures))

n.processContentLoop = nil
n.statusLogLoop = nil
15 changes: 10 additions & 5 deletions fluffy/network/state/state_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -241,13 +241,18 @@ proc start*(n: StateNetwork) =
n.processContentLoop = processContentLoop(n)
n.statusLogLoop = statusLogLoop(n)

proc stop*(n: StateNetwork) =
proc stop*(n: StateNetwork) {.async: (raises: []).} =
info "Stopping Portal execution state network"

n.portalProtocol.stop()
var futures: seq[Future[void]]
futures.add(n.portalProtocol.stop())

if not n.processContentLoop.isNil():
n.processContentLoop.cancelSoon()

futures.add(n.processContentLoop.cancelAndWait())
if not n.statusLogLoop.isNil():
n.statusLogLoop.cancelSoon()
futures.add(n.statusLogLoop.cancelAndWait())

await noCancel(allFutures(futures))

n.processContentLoop = nil
n.statusLogLoop = nil
19 changes: 13 additions & 6 deletions fluffy/network/wire/portal_protocol.nim
Original file line number Diff line number Diff line change
Expand Up @@ -1708,14 +1708,21 @@ proc start*(p: PortalProtocol) =
for i in 0 ..< concurrentOffers:
p.offerWorkers.add(offerWorker(p))

proc stop*(p: PortalProtocol) =
if not p.revalidateLoop.isNil:
p.revalidateLoop.cancelSoon()
if not p.refreshLoop.isNil:
p.refreshLoop.cancelSoon()
proc stop*(p: PortalProtocol) {.async: (raises: []).} =
var futures: seq[Future[void]]

if not p.revalidateLoop.isNil():
futures.add(p.revalidateLoop.cancelAndWait())
if not p.refreshLoop.isNil():
futures.add(p.refreshLoop.cancelAndWait())

for worker in p.offerWorkers:
worker.cancelSoon()
futures.add(worker.cancelAndWait())

await noCancel(allFutures(futures))

p.revalidateLoop = nil
p.refreshLoop = nil
p.offerWorkers = @[]

proc resolve*(
Expand Down
61 changes: 53 additions & 8 deletions fluffy/portal_node.nim
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import
results,
chronos,
metrics/chronos_httpserver,
json_rpc/rpcserver,
eth/p2p/discoveryv5/protocol,
beacon_chain/spec/forks,
./network_metadata,
Expand All @@ -24,6 +26,11 @@ export
beacon_light_client, history_network, state_network, portal_protocol_config, forks

type
PortalNodeState* = enum
Starting
Running
Stopping

PortalNodeConfig* = object
accumulatorFile*: Opt[string]
disableStateRootValidation*: bool
Expand All @@ -33,6 +40,7 @@ type
storageCapacity*: uint64

PortalNode* = ref object
state*: PortalNodeState
discovery: protocol.Protocol
contentDB: ContentDB
streamManager: StreamManager
Expand All @@ -41,6 +49,9 @@ type
stateNetwork*: Opt[StateNetwork]
beaconLightClient*: Opt[LightClient]
statusLogLoop: Future[void]
metricsServer*: Opt[MetricsHttpServerRef]
rpcHttpServer*: Opt[RpcHttpServer]
rpcWsServer*: Opt[RpcWebSocketServer]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not a big fan of adding these to the PortalNode object.

Conceptually, to be a part of the Portal network as a node you don't require these more user faced features.
PortalNode is also there to be able to easily integrate our portal node in for example nimbus-eth1 binary. And while these are optional, you still end up importing rpcserver and chronos_httpserver.

If we want to make this more streamlined then I would rather create another object called PortalClient that holds all these servers + PortalNode + interacts with anything cli config related.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see your point. Yeah I wasn't sure about this part either. Okay, I'll move them out and clean them up separately for now. I'll keep the PortalClient idea for a separate PR.


# Beacon light client application callbacks triggered when new finalized header
# or optimistic header is available.
Expand Down Expand Up @@ -202,6 +213,8 @@ proc statusLogLoop(n: PortalNode) {.async: (raises: []).} =
proc start*(n: PortalNode) =
debug "Starting Portal node"

n.discovery.start()

if n.beaconNetwork.isSome():
n.beaconNetwork.value.start()
if n.historyNetwork.isSome():
Expand All @@ -214,18 +227,50 @@ proc start*(n: PortalNode) =

n.statusLogLoop = statusLogLoop(n)

proc stop*(n: PortalNode) =
n.state = PortalNodeState.Running

proc stop*(n: PortalNode) {.async: (raises: []).} =
debug "Stopping Portal node"

if n.rpcWsServer.isSome():
let server = n.rpcWsServer.get()
try:
server.stop()
await server.closeWait()
except CatchableError as e:
warn "Failed to stop rpc WS server", exc = e.name, err = e.msg

if n.rpcHttpServer.isSome():
let server = n.rpcHttpServer.get()
try:
await server.stop()
await server.closeWait()
except CatchableError as e:
warn "Failed to stop rpc HTTP server", exc = e.name, err = e.msg

if n.metricsServer.isSome():
let server = n.metricsServer.get()
try:
await server.stop()
await server.close()
except CatchableError as e:
warn "Failed to stop metrics HTTP server", exc = e.name, err = e.msg

var futures: seq[Future[void]]

if n.beaconNetwork.isSome():
n.beaconNetwork.value.stop()
futures.add(n.beaconNetwork.value.stop())
if n.historyNetwork.isSome():
n.historyNetwork.value.stop()
futures.add(n.historyNetwork.value.stop())
if n.stateNetwork.isSome():
n.stateNetwork.value.stop()

futures.add(n.stateNetwork.value.stop())
if n.beaconLightClient.isSome():
n.beaconLightClient.value.stop()
futures.add(n.beaconLightClient.value.stop())
if not n.statusLogLoop.isNil():
futures.add(n.statusLogLoop.cancelAndWait())

await noCancel(allFutures(futures))

if not n.statusLogLoop.isNil:
n.statusLogLoop.cancelSoon()
await n.discovery.closeWait()
n.contentDB.close()
n.statusLogLoop = nil
Loading
Loading