From 3a9270c5b981990f8400e42745e98e501e1b41f1 Mon Sep 17 00:00:00 2001 From: Allen Hill Date: Thu, 5 Dec 2024 17:04:06 -0800 Subject: [PATCH 1/3] Refactor heartbeat to shutdown cleanly From ZMQ docs: "zmq_proxy() runs in the current thread and returns only if/when the current context is closed." The heartbeat socket doesn't need to be global, as nothing else touches it. BUT, if we create the heartbeat socket in a `Context` that has a global ref, we can close the context, which will cause zmq_proxy to return and then that thread to end/finish. Doing that before shutting down helps avoid a segfault on shutdown. --- Project.toml | 2 +- src/handlers.jl | 3 +++ src/heartbeat.jl | 29 +++++++++++++++++++---------- src/init.jl | 7 ++----- 4 files changed, 25 insertions(+), 16 deletions(-) diff --git a/Project.toml b/Project.toml index 88aca27b..995839b0 100644 --- a/Project.toml +++ b/Project.toml @@ -26,5 +26,5 @@ Conda = "1" JSON = "0.18,0.19,0.20,0.21,1" MbedTLS = "0.5,0.6,0.7,1" SoftGlobalScope = "1" -ZMQ = "1" +ZMQ = "1.3" julia = "1.6" diff --git a/src/handlers.jl b/src/handlers.jl index 4e84f709..150f1307 100644 --- a/src/handlers.jl +++ b/src/handlers.jl @@ -189,6 +189,9 @@ function connect_request(socket, msg) end function shutdown_request(socket, msg) + # stop heartbeat thread by closing the context + close(zmq_proxy_context[]) + send_ipython(requests[], msg_reply(msg, "shutdown_reply", msg.content)) sleep(0.1) # short delay (like in ipykernel), to hopefully ensure shutdown_reply is sent diff --git a/src/heartbeat.jl b/src/heartbeat.jl index c0e58e4b..e11991fa 100644 --- a/src/heartbeat.jl +++ b/src/heartbeat.jl @@ -7,10 +7,13 @@ import Libdl const threadid = zeros(Int, 128) # sizeof(uv_thread_t) <= 8 on Linux, OSX, Win -const zmq_proxy = Ref(C_NULL) +const zmq_proxy_context = Ref{Context}() # entry point for new thread -function heartbeat_thread(sock::Ptr{Cvoid}) +function heartbeat_thread(heartbeat_addr::Cstring) + zmq_proxy_context[] = Context() + heartbeat = Socket(zmq_proxy_context[], ROUTER) + GC.@preserve heartbeat_addr bind(heartbeat, unsafe_string(heartbeat_addr)) @static if VERSION ≥ v"1.9.0-DEV.1588" # julia#46609 # julia automatically "adopts" this thread because # we entered a Julia cfunction. We then have to enable @@ -19,14 +22,20 @@ function heartbeat_thread(sock::Ptr{Cvoid}) # (see julia#47196) ccall(:jl_gc_safe_enter, Int8, ()) end - ccall(zmq_proxy[], Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}), - sock, sock, C_NULL) - nothing + ret = ZMQ.lib.zmq_proxy(heartbeat, heartbeat, C_NULL) + @static if VERSION ≥ v"1.9.0-DEV.1588" # julia#46609 + # julia automatically "adopts" this thread because + # we entered a Julia cfunction. We then have to enable + # a GC "safe" region to prevent us from grabbing the + # GC lock with the call to zmq_proxy, which never returns. + # (see julia#47196) + ccall(:jl_gc_safe_leave, Int8, ()) + end + return ret end -function start_heartbeat(sock) - zmq_proxy[] = Libdl.dlsym(Libdl.dlopen(ZMQ.libzmq), :zmq_proxy) - heartbeat_c = @cfunction(heartbeat_thread, Cvoid, (Ptr{Cvoid},)) - ccall(:uv_thread_create, Cint, (Ptr{Int}, Ptr{Cvoid}, Ptr{Cvoid}), - threadid, heartbeat_c, sock) +function start_heartbeat(heartbeat_addr) + heartbeat_c = @cfunction(heartbeat_thread, Cint, (Cstring,)) + ccall(:uv_thread_create, Cint, (Ptr{Int}, Ptr{Cvoid}, Cstring), + threadid, heartbeat_c, heartbeat_addr) end diff --git a/src/init.jl b/src/init.jl index 231a33d7..c613c3f9 100644 --- a/src/init.jl +++ b/src/init.jl @@ -24,7 +24,6 @@ const publish = Ref{Socket}() const raw_input = Ref{Socket}() const requests = Ref{Socket}() const control = Ref{Socket}() -const heartbeat = Ref{Socket}() const profile = Dict{String,Any}() const read_stdout = Ref{Base.PipeEndpoint}() const read_stderr = Ref{Base.PipeEndpoint}() @@ -87,21 +86,19 @@ function init(args) raw_input[] = Socket(ROUTER) requests[] = Socket(ROUTER) control[] = Socket(ROUTER) - heartbeat[] = Socket(ROUTER) sep = profile["transport"]=="ipc" ? "-" : ":" bind(publish[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["iopub_port"])") bind(requests[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["shell_port"])") bind(control[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["control_port"])") bind(raw_input[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["stdin_port"])") - bind(heartbeat[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["hb_port"])") + start_heartbeat("$(profile["transport"])://$(profile["ip"])$(sep)$(profile["hb_port"])") # associate a lock with each socket so that multi-part messages # on a given socket don't get inter-mingled between tasks. - for s in (publish[], raw_input[], requests[], control[], heartbeat[]) + for s in (publish[], raw_input[], requests[], control[]) socket_locks[s] = ReentrantLock() end - start_heartbeat(heartbeat[]) if capture_stdout read_stdout[], = redirect_stdout() redirect_stdout(IJuliaStdio(stdout,"stdout")) From f5b8fbebb119243ad76272ee48084e3509f46746 Mon Sep 17 00:00:00 2001 From: Allen Hill Date: Tue, 10 Dec 2024 17:41:18 -0800 Subject: [PATCH 2/3] Update src/heartbeat.jl Co-authored-by: Steven G. Johnson --- src/heartbeat.jl | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/heartbeat.jl b/src/heartbeat.jl index e11991fa..e73bbe40 100644 --- a/src/heartbeat.jl +++ b/src/heartbeat.jl @@ -24,11 +24,7 @@ function heartbeat_thread(heartbeat_addr::Cstring) end ret = ZMQ.lib.zmq_proxy(heartbeat, heartbeat, C_NULL) @static if VERSION ≥ v"1.9.0-DEV.1588" # julia#46609 - # julia automatically "adopts" this thread because - # we entered a Julia cfunction. We then have to enable - # a GC "safe" region to prevent us from grabbing the - # GC lock with the call to zmq_proxy, which never returns. - # (see julia#47196) + # leave safe region if zmq_proxy returns (when context is closed) ccall(:jl_gc_safe_leave, Int8, ()) end return ret From 328571914d0e833b75f1d4c951459aa7199e8c37 Mon Sep 17 00:00:00 2001 From: Allen Hill Date: Wed, 18 Dec 2024 12:07:49 -0800 Subject: [PATCH 3/3] Create heartbeat and context in `init.jl` --- src/handlers.jl | 2 +- src/heartbeat.jl | 14 +++++--------- src/init.jl | 7 ++++++- 3 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/handlers.jl b/src/handlers.jl index 150f1307..52630953 100644 --- a/src/handlers.jl +++ b/src/handlers.jl @@ -190,7 +190,7 @@ end function shutdown_request(socket, msg) # stop heartbeat thread by closing the context - close(zmq_proxy_context[]) + close(heartbeat_context[]) send_ipython(requests[], msg_reply(msg, "shutdown_reply", msg.content)) diff --git a/src/heartbeat.jl b/src/heartbeat.jl index e73bbe40..7ae55415 100644 --- a/src/heartbeat.jl +++ b/src/heartbeat.jl @@ -7,13 +7,9 @@ import Libdl const threadid = zeros(Int, 128) # sizeof(uv_thread_t) <= 8 on Linux, OSX, Win -const zmq_proxy_context = Ref{Context}() # entry point for new thread -function heartbeat_thread(heartbeat_addr::Cstring) - zmq_proxy_context[] = Context() - heartbeat = Socket(zmq_proxy_context[], ROUTER) - GC.@preserve heartbeat_addr bind(heartbeat, unsafe_string(heartbeat_addr)) +function heartbeat_thread(heartbeat::Ptr{Cvoid}) @static if VERSION ≥ v"1.9.0-DEV.1588" # julia#46609 # julia automatically "adopts" this thread because # we entered a Julia cfunction. We then have to enable @@ -30,8 +26,8 @@ function heartbeat_thread(heartbeat_addr::Cstring) return ret end -function start_heartbeat(heartbeat_addr) - heartbeat_c = @cfunction(heartbeat_thread, Cint, (Cstring,)) - ccall(:uv_thread_create, Cint, (Ptr{Int}, Ptr{Cvoid}, Cstring), - threadid, heartbeat_c, heartbeat_addr) +function start_heartbeat(heartbeat) + heartbeat_c = @cfunction(heartbeat_thread, Cint, (Ptr{Cvoid},)) + ccall(:uv_thread_create, Cint, (Ptr{Int}, Ptr{Cvoid}, Ptr{Cvoid}), + threadid, heartbeat_c, heartbeat) end diff --git a/src/init.jl b/src/init.jl index c613c3f9..66521b9f 100644 --- a/src/init.jl +++ b/src/init.jl @@ -24,6 +24,8 @@ const publish = Ref{Socket}() const raw_input = Ref{Socket}() const requests = Ref{Socket}() const control = Ref{Socket}() +const heartbeat = Ref{Socket}() +const heartbeat_context = Ref{Context}() const profile = Dict{String,Any}() const read_stdout = Ref{Base.PipeEndpoint}() const read_stderr = Ref{Base.PipeEndpoint}() @@ -86,12 +88,14 @@ function init(args) raw_input[] = Socket(ROUTER) requests[] = Socket(ROUTER) control[] = Socket(ROUTER) + heartbeat_context[] = Context() + heartbeat = Socket(heartbeat_context[], ROUTER) sep = profile["transport"]=="ipc" ? "-" : ":" bind(publish[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["iopub_port"])") bind(requests[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["shell_port"])") bind(control[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["control_port"])") bind(raw_input[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["stdin_port"])") - start_heartbeat("$(profile["transport"])://$(profile["ip"])$(sep)$(profile["hb_port"])") + bind(heartbeat[], "$(profile["transport"])://$(profile["ip"])$(sep)$(profile["hb_port"])") # associate a lock with each socket so that multi-part messages # on a given socket don't get inter-mingled between tasks. @@ -99,6 +103,7 @@ function init(args) socket_locks[s] = ReentrantLock() end + start_heartbeat(heartbeat[]) if capture_stdout read_stdout[], = redirect_stdout() redirect_stdout(IJuliaStdio(stdout,"stdout"))