diff --git a/Project.toml b/Project.toml index bcf4394c2d..d23e868c23 100644 --- a/Project.toml +++ b/Project.toml @@ -18,6 +18,7 @@ Markdown = "d6f4376e-aef5-505a-96c1-9c027394607a" MsgPack = "99f44e22-a591-53d1-9472-aa23ef4bd671" Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" REPL = "3fa0cd96-eef1-5676-8a61-b3b8758bbffb" +Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" RelocatableFolders = "05181044-ff0b-4ac5-8273-598c1e38db00" Sockets = "6462fe0b-24de-5631-8697-dd941f90decc" Tables = "bd369af6-aec1-5ad0-b16a-f7cc5008161c" diff --git a/sample/ChildProcesses_benchmark.jl b/sample/ChildProcesses_benchmark.jl new file mode 100644 index 0000000000..1895fa94db --- /dev/null +++ b/sample/ChildProcesses_benchmark.jl @@ -0,0 +1,198 @@ +### A Pluto.jl notebook ### +# v0.17.7 + +using Markdown +using InteractiveUtils + +# ╔═╡ 03cbe047-96c6-456f-8adb-8d80390a1742 +md""" +# ChildProcesses Benchmark + +To run this you must be running ChildProcesses enabled Pluto (so you can `Distributed`). +""" + +# ╔═╡ 8e626d50-b8a1-4353-9193-9297d91d6352 +import Distributed + +# ╔═╡ 808241a9-c7ea-4ca0-9a8b-74658c50a48a +import BenchmarkTools + +# ╔═╡ 9b582916-0fd7-4024-9877-1014417c545e +# Act like you a package, quick! +ChildProcesses = @eval Main module ChildProcesses2 + include("../src/evaluation/ChildProcessesNotebook.jl") +end + +# ╔═╡ f2e2a5de-57dd-4ae7-9860-efa693ef786a + + +# ╔═╡ 7a3e1cc4-b188-42f2-96c7-931f20ccfa34 +expr_to_test = quote + $(fill(1, (1000, 1000))) +end + +# ╔═╡ 1326fab3-c8ef-4b0d-a17e-36cbcb7a5275 +md"## Distributed.remotecall_eval" + +# ╔═╡ 28be2c7a-e2b2-4666-a367-062e3f72ec77 +function distributed_benchmark(expr_to_test) + (pid,) = Distributed.addprocs(1) + + benchmark = BenchmarkTools.@benchmark begin + Distributed.remotecall_eval(Main, [$pid], $expr_to_test) + end + + Distributed.rmprocs([pid]) + benchmark +end + +# ╔═╡ 11635702-a887-4548-a6b8-55a1c7b08ee7 +distributed_benchmark(quote $(1) end) + +# ╔═╡ d77fb79a-7105-4321-9151-84d9367c12dc +distributed_benchmark(quote $(fill(1, (1000))) end) + +# ╔═╡ 3af58ffa-1d51-42b8-b70e-83a160011ede +distributed_benchmark(quote $(fill(1, (1000, 1000))) end) + +# ╔═╡ 7bb5fa42-5f7d-4e83-a55c-ed435f4d4746 +distributed_benchmark(quote $(fill(1, (1000, 1000, 10))) end) + +# ╔═╡ 0a632e40-73b2-4816-b846-089af80f76cc +md"## ChildProcesses.call" + +# ╔═╡ ba51da9b-44a0-41e6-bd37-b05eed451906 +function childprocess_benchmark(expr_to_test) + process = ChildProcesses.create_child_process() + + benchmark = BenchmarkTools.@benchmark begin + $ChildProcesses.call($process, $expr_to_test) + end + + kill(process) + benchmark +end + +# ╔═╡ f303a2eb-d4ff-438d-b01a-45bc6a62bf87 +childprocess_benchmark(quote $(1) end) + +# ╔═╡ 8f6bbed5-e259-4841-a9bc-113df6f13676 +childprocess_benchmark(quote $(fill(1, (1000))) end) + +# ╔═╡ 5c8e1389-94c3-4abc-a9fd-698e7013bcf6 +childprocess_benchmark(quote $(fill(1, (1000, 1000))) end) + +# ╔═╡ ef907db1-721d-4695-8752-8fe44db1a0a5 +childprocess_benchmark(quote $(fill(1, (1000, 1000, 10))) end) + +# ╔═╡ 00000000-0000-0000-0000-000000000001 +PLUTO_PROJECT_TOML_CONTENTS = """ +[deps] +BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" +Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" + +[compat] +BenchmarkTools = "~1.2.2" +""" + +# ╔═╡ 00000000-0000-0000-0000-000000000002 +PLUTO_MANIFEST_TOML_CONTENTS = """ +# This file is machine-generated - editing it directly is not advised + +[[BenchmarkTools]] +deps = ["JSON", "Logging", "Printf", "Profile", "Statistics", "UUIDs"] +git-tree-sha1 = "940001114a0147b6e4d10624276d56d531dd9b49" +uuid = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" +version = "1.2.2" + +[[Dates]] +deps = ["Printf"] +uuid = "ade2ca70-3891-5945-98fb-dc099432e06a" + +[[Distributed]] +deps = ["Random", "Serialization", "Sockets"] +uuid = "8ba89e20-285c-5b6f-9357-94700520ee1b" + +[[JSON]] +deps = ["Dates", "Mmap", "Parsers", "Unicode"] +git-tree-sha1 = "8076680b162ada2a031f707ac7b4953e30667a37" +uuid = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" +version = "0.21.2" + +[[Libdl]] +uuid = "8f399da3-3557-5675-b5ff-fb832c97cbdb" + +[[LinearAlgebra]] +deps = ["Libdl"] +uuid = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" + +[[Logging]] +uuid = "56ddb016-857b-54e1-b83d-db4d58db5568" + +[[Mmap]] +uuid = "a63ad114-7e13-5084-954f-fe012c677804" + +[[Parsers]] +deps = ["Dates"] +git-tree-sha1 = "92f91ba9e5941fc781fecf5494ac1da87bdac775" +uuid = "69de0a69-1ddd-5017-9359-2bf0b02dc9f0" +version = "2.2.0" + +[[Printf]] +deps = ["Unicode"] +uuid = "de0858da-6303-5e67-8744-51eddeeeb8d7" + +[[Profile]] +deps = ["Printf"] +uuid = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79" + +[[Random]] +deps = ["Serialization"] +uuid = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" + +[[SHA]] +uuid = "ea8e919c-243c-51af-8825-aaa63cd721ce" + +[[Serialization]] +uuid = "9e88b42a-f829-5b0c-bbe9-9e923198166b" + +[[Sockets]] +uuid = "6462fe0b-24de-5631-8697-dd941f90decc" + +[[SparseArrays]] +deps = ["LinearAlgebra", "Random"] +uuid = "2f01184e-e22b-5df5-ae63-d93ebab69eaf" + +[[Statistics]] +deps = ["LinearAlgebra", "SparseArrays"] +uuid = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" + +[[UUIDs]] +deps = ["Random", "SHA"] +uuid = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" + +[[Unicode]] +uuid = "4ec0a83e-493e-50e2-b9ac-8f72acf5a8f5" +""" + +# ╔═╡ Cell order: +# ╟─03cbe047-96c6-456f-8adb-8d80390a1742 +# ╠═8e626d50-b8a1-4353-9193-9297d91d6352 +# ╠═808241a9-c7ea-4ca0-9a8b-74658c50a48a +# ╠═9b582916-0fd7-4024-9877-1014417c545e +# ╟─f2e2a5de-57dd-4ae7-9860-efa693ef786a +# ╠═7a3e1cc4-b188-42f2-96c7-931f20ccfa34 +# ╟─1326fab3-c8ef-4b0d-a17e-36cbcb7a5275 +# ╟─28be2c7a-e2b2-4666-a367-062e3f72ec77 +# ╠═11635702-a887-4548-a6b8-55a1c7b08ee7 +# ╠═d77fb79a-7105-4321-9151-84d9367c12dc +# ╠═3af58ffa-1d51-42b8-b70e-83a160011ede +# ╠═7bb5fa42-5f7d-4e83-a55c-ed435f4d4746 +# ╟─0a632e40-73b2-4816-b846-089af80f76cc +# ╟─ba51da9b-44a0-41e6-bd37-b05eed451906 +# ╠═f303a2eb-d4ff-438d-b01a-45bc6a62bf87 +# ╠═8f6bbed5-e259-4841-a9bc-113df6f13676 +# ╠═5c8e1389-94c3-4abc-a9fd-698e7013bcf6 +# ╠═ef907db1-721d-4695-8752-8fe44db1a0a5 +# ╟─00000000-0000-0000-0000-000000000001 +# ╟─00000000-0000-0000-0000-000000000002 diff --git a/src/Pluto.jl b/src/Pluto.jl index e6c3ff3bb5..724502bd4d 100644 --- a/src/Pluto.jl +++ b/src/Pluto.jl @@ -37,6 +37,7 @@ const JULIA_VERSION_STR = 'v' * string(VERSION) include("./notebook/PathHelpers.jl") include("./notebook/Export.jl") include("./Configuration.jl") +include("./evaluation/ChildProcesses.jl") include("./evaluation/Tokens.jl") include("./evaluation/Throttled.jl") diff --git a/src/evaluation/ChildProcesses.jl b/src/evaluation/ChildProcesses.jl new file mode 100644 index 0000000000..828fb0c4d2 --- /dev/null +++ b/src/evaluation/ChildProcesses.jl @@ -0,0 +1,5 @@ +ChildProcesses = @eval Main module ChildProcesses + +include("./ChildProcessesNotebook.jl") + +end \ No newline at end of file diff --git a/src/evaluation/ChildProcessesNotebook.jl b/src/evaluation/ChildProcessesNotebook.jl new file mode 100644 index 0000000000..e51d235974 --- /dev/null +++ b/src/evaluation/ChildProcessesNotebook.jl @@ -0,0 +1,797 @@ +### A Pluto.jl notebook ### +# v0.17.7 + +using Markdown +using InteractiveUtils + +# ╔═╡ 4c4b77dc-8545-4922-9897-110fa67c99f4 +md"# ChildProcesses" + +# ╔═╡ 934d18d4-936e-4ee0-aa6e-86aa6f66774c +juliapath() = joinpath(Sys.BINDIR::String, Base.julia_exename()) + +# ╔═╡ 4df77979-0c35-4139-aa5e-da43c1de3a77 +if VERSION >= v"1.7.0" + function get_task_error(t::Task) + throw("TODO") + end +else + function get_task_error(t::Task) + CapturedException(Base.catch_stack(t)[end]...) + end +end + +# ╔═╡ 6925ffbb-fd9e-402c-a483-c78f28f892a5 +import Serialization + +# ╔═╡ 8c97d5cb-e346-4b94-b2a1-4fb5ff093bd5 +import UUIDs: UUID, uuid4 + +# ╔═╡ 87612815-7981-4a69-a65b-4465f48c9fd9 +struct ReadMessages + io::IO +end + +# ╔═╡ de680a32-e053-4655-a1cd-111d81a037e6 +md""" +## Child process side +""" + +# ╔═╡ ce2acdd1-b4bb-4a8f-8346-a56dfa55f56b +Base.@kwdef mutable struct ParentProcess + parent_to_child=stdin + child_to_parent=stdout + channels::Dict{UUID,AbstractChannel}=Dict{UUID,AbstractChannel}() + messaging_lock=ReentrantLock() +end + +# ╔═╡ 87182605-f5a1-46a7-968f-bdfb9d0e08fa +function setup_parent_process() + # Redirect things written to stdout to stderr, + # stderr will showup in the Pluto with_terminal view. + real_stdout = stdout + redirect_stdout(stderr) + + # Just take away stdin, just in case + real_stdin = stdin + redirect_stdin() + + ParentProcess( + parent_to_child=real_stdin, + child_to_parent=real_stdout, + ) +end + +# ╔═╡ b05be9c7-8cc1-47f9-8baa-db66ac83c24f +Base.@kwdef struct ParentChannel + process::ParentProcess + channel_id::UUID + result_channel::AbstractChannel +end + +# ╔═╡ 17cb5a65-2a72-4e7d-8fba-901452b2c19f +md""" +## Parent process side +""" + +# ╔═╡ abe897ff-6e86-40eb-a1a6-2918b6c3c5a7 +struct LocalSandbox end + +# ╔═╡ 884e103a-8925-477d-a264-f15d02a49aa9 +md""" +## ChildChannel + +I guess this is similar to Distributed.ChildChannel. +It communicates all actions that happen on it (take!, put!, close) to the other side. There is a local channel stored that is used as buffer. +""" + +# ╔═╡ 1d63f09f-dfb2-40dd-beb5-45125cb19006 +md"## Message types" + +# ╔═╡ fc0ca4b5-e03b-4c08-b43d-913ee12269c7 +abstract type Message end + +# ╔═╡ 3fd22ac5-e1c9-42a2-8ae1-7cc4e154764a +struct JustExecuteThis <: Message + expr +end + +# ╔═╡ 3431051e-55ce-46c1-a0f5-364662f5c77b +MessageChannel = AbstractChannel{Message} + +# ╔═╡ 85920c27-d8a6-4b5c-93b6-41daa5866f9d +Base.@kwdef mutable struct ChildProcess + process::Base.AbstractPipe + channels::Dict{UUID,MessageChannel}=Dict{UUID,MessageChannel}() + + "Lock to make sure there are no messages being sent intervowen" + messaging_lock=ReentrantLock() +end + +# ╔═╡ 63531683-e295-4ba6-811f-63b0d384ba0f +Base.@kwdef struct ChildProcessException <: Exception + process::ChildProcess + captured::CapturedException +end + +# ╔═╡ e6b3d1d9-0245-4dc8-a0a5-5831c254479b +Base.@kwdef struct ProcessExitedException <: Exception + process::ChildProcess +end + +# ╔═╡ 2a538f97-a9d1-4dcf-a098-5b1e5a8df3ae +function Base.showerror(io::IO, error::ProcessExitedException) + print(io, "ChildProcessExitedException: Child process has exited") +end + +# ╔═╡ 9ba8ee0c-c80c-41d9-8739-11e6ef5d3c15 +function Base.showerror(io::IO, error::ChildProcessException) + print(io, "Child process has an error:") + Base.showerror(io, error.captured) +end + +# ╔═╡ 368ccd31-1681-4a4a-a103-2b9afc0813ee +Base.kill(process::ChildProcess, signal::Int) = Base.kill(process.process, signal) + +# ╔═╡ af4f0720-f7f7-4d8a-bd8c-8cf5abaf10a0 +Base.kill(process::ChildProcess) = Base.kill(process.process) + +# ╔═╡ d4da756e-fa81-49d8-8c39-d76f9d15e96f +Base.process_running(p::ChildProcess) = Base.process_running(p.process) + +# ╔═╡ e787d8bd-499d-4a41-8913-62b3d9346748 +Base.wait(process::ChildProcess) = Base.wait(process.process) + +# ╔═╡ b8865d63-19fa-4438-87a2-ccb531bd73a4 +Base.@kwdef struct ChildChannel{T} <: AbstractChannel{T} + process::ChildProcess + id::UUID=uuid4() + local_channel::AbstractChannel{T} +end + +# ╔═╡ f81ff6f7-f230-4373-b60b-76e8a0eba929 +Base.@kwdef struct Envelope + channel_id::UUID + message::Message +end + +# ╔═╡ adfb2d12-07a0-4d5e-8e24-1676c07107c7 +struct CreateChildChannelMessage <: Message + expr +end + +# ╔═╡ e8256f1f-c778-4fb3-a2d3-12da0e1cb3da +struct BaseMessage <: Message + value::Any +end + +# ╔═╡ f9270f05-fa22-4027-a2ca-7db61d057c56 +struct ErrorMessage <: Message + error::CapturedException +end + +# ╔═╡ 0ba318bc-7b4c-4d0f-a46c-2f9dca756926 +struct ChannelPushMessage <: Message + value::Any +end + +# ╔═╡ c83f2936-3768-4724-9c5c-335d7a4aae03 +struct ChannelCloseMessage <: Message +end + +# ╔═╡ 441c27e4-6884-4887-9cd5-bc55d0c49760 +struct ChannelTakeMessage <: Message +end + +# ╔═╡ 1f868b3c-df9a-4d4d-a6a9-9a196298a3af +md"---" + +# ╔═╡ 77df6f7b-ed8e-442a-9489-873fc392937c +md""" +## SingleTakeChannel() + +Simple channel that will yield "nothing" and nothing else. +""" + +# ╔═╡ 3a62b419-eca2-4de1-89a4-fc9ad6f68372 +Base.@kwdef mutable struct SingleTakeChannel <: AbstractChannel{Nothing} + did_finish=false +end + +# ╔═╡ 61410bd1-8456-4608-92d9-3c863f65b89c +function Base.take!(channel::SingleTakeChannel) + if channel.did_finish + throw("SingleTakeChannel re-used") + end + channel.did_finish = true + nothing +end + +# ╔═╡ 409707ff-b1ea-453d-b933-0a4b1e5f44c8 +function Base.iterate(channel::SingleTakeChannel, _=nothing) + if channel.did_finish + return nothing + else + channel.did_finish = true + (nothing, nothing) + end +end + +# ╔═╡ 0feb758e-38d4-48c0-a5e9-9d1129b1b1b2 +function Base.isopen(channel::SingleTakeChannel) + !channel.did_finish +end + +# ╔═╡ b0bcdbdf-a043-4d25-8582-ed173006e040 +function Base.close(single_channel::SingleTakeChannel) + single_channel.did_finish = true +end + +# ╔═╡ a6e50946-dd3d-4738-adc1-26534e184776 +md""" +## Binary message frame thing + +Functions to send and read binary messages over a stable connection, also functions to serialize to a bytearray. + +Pretty simple right now, might want to add cooler stuff later. +""" + +# ╔═╡ 131a123b-e319-4939-90bf-7fb035ab2e75 +MessageLength = Int + +# ╔═╡ e393ff80-3995-11ec-1217-b3642a509067 +function read_message(stream) + message_length_buffer = Vector{UInt8}(undef, 8) + bytesread = readbytes!(stream, message_length_buffer, 8) + how_long_will_the_message_be = reinterpret(MessageLength, message_length_buffer)[1] + + message_buffer = Vector{UInt8}(undef, how_long_will_the_message_be) + _bytesread = readbytes!(stream, message_buffer, how_long_will_the_message_be) + + message_buffer +end + +# ╔═╡ faf3c68e-d0fb-4dd9-ac1b-1ff8d922134b +function send_message(stream, bytes) + # bytes = Vector{UInt8}(message) + message_length = convert(MessageLength, length(bytes)) + # @info "message_length" message_length bytes + how_long_will_the_message_be = reinterpret(UInt8, [message_length]) + + # @warn "Writing to stream!!!" + _1 = write(stream, how_long_will_the_message_be) + _2 = write(stream, bytes) + # @warn "WROTE TO STREAM" + + _1 + _2 + # @info "Write" _1 _2 +end + +# ╔═╡ 52495855-a52d-4edf-9c7c-811a5060e641 +function to_binary(message) + io = PipeBuffer() + serialized = Serialization.serialize(io, message) + read(io) +end + +# ╔═╡ 590a7882-3d69-48b0-bb1b-f476c7f8a885 +function respond_to_parent(; to::ParentProcess, about::UUID, with::Message) + binary_message = to_binary(Envelope( + channel_id=about, + message=with, + )) + lock(to.messaging_lock) + try + send_message(to.child_to_parent, binary_message) + finally + unlock(to.messaging_lock) + end +end + +# ╔═╡ 20f66652-bca0-4e47-a4b7-502cfbcb3db5 +function send_message_without_response(process::ChildProcess, envelope::Envelope) + if !process_running(process) + throw(ProcessExitedException(process=process)) + end + + lock(process.messaging_lock) + try + send_message(process.process, to_binary(envelope)) + nothing + finally + unlock(process.messaging_lock) + end +end + +# ╔═╡ e1e79669-8d0a-4b04-a7fd-469e7d8e65b1 +function Base.take!(channel::ChildChannel) + if !isopen(channel.local_channel) + # Trigger InvalidStateException + eval(:(@error "ChildChannel closed D:")) + take!(channel.local_channel) + end + + send_message_without_response(channel.process, Envelope( + channel_id=channel.id, + message=ChannelTakeMessage(), + )) + take!(channel.local_channel) +end + +# ╔═╡ ed6c16ed-bf4a-40ce-9c14-a72fd77e24a1 +function Base.close(channel::ChildChannel) + if isopen(channel.local_channel) + try + close(channel.local_channel) + if process_running(channel.process.process) + send_message_without_response(channel.process, Envelope( + channel_id=channel.id, + message=ChannelCloseMessage() + )) + end + catch e + @error "Problem with closing?" e stack=stacktrace(catch_backtrace()) + nothing + end + end + delete!(channel.process.channels, channel.id) +end + +# ╔═╡ 4289b4ba-45f2-4be7-b983-68f7d97510fa +Base.close(process::ChildProcess) = Base.close(process.process) + +# ╔═╡ 2342d663-030f-4ed2-b1d5-5be1910b6d4c +function create_channel(fn, process::ChildProcess, message) + remote_channel = create_channel(process, message) + + try + return fn(remote_channel) + finally + close(remote_channel) + end +end + +# ╔═╡ 54a03cba-6d00-4632-8bd6-c60753c15ae6 +function listen_for_messages_from_child(child_process) + try + for result in ReadMessages(child_process.process) + if !(result isa Envelope && result.channel_id !== nothing) + throw("Huh") + end + + + if haskey(child_process.channels, result.channel_id) + message = result.message + channel = child_process.channels[result.channel_id] + put!(channel, message) + else + throw("No channel to push to") + end + end + catch error + @error "Error in main processing" error bt=stacktrace(catch_backtrace()) + close(child_process.process) + finally + for (channel_id, channel) in collect(child_process.channels) + close(channel, ProcessExitedException(process=child_process)) + end + end +end + +# ╔═╡ d3fe8144-6ba8-4dd9-b0e3-941a96422267 +""" + create_child_process(; custom_stderr=stderr, exeflags=["--history-file=no"]) + +Spawn a child process that you'll be able to `call()` on and communicate with over +`RemoteChannels`. + +It spawns a process using `open` with some ad-hoc julia code that also loads the `ChildProcesses` module. It then spawns a loop for listening to the spawned process. +The returned `ChildProcess` is basically a bunch of event listeners (in the form of channels) that +""" +function create_child_process(; custom_stderr=stderr, exeflags=["--history-file=no", "--threads=auto"]) + this_file = split(@__FILE__(), "#")[1] + this_module = string(nameof(@__MODULE__)) + + code = """ + # Make sure this library is included in main + var"$this_module" = @eval Main module var"$this_module" + include("$this_file") + end + + const parent_process = var"$this_module".setup_parent_process() + + Threads.@spawn begin + try + var"$this_module".listen_for_messages_from_parent(parent_process) + @info "Done?" + catch error + @error "Shutdown error" error + rethrow(error) + end + end + + try + while true + sleep(typemax(UInt64)) + end + catch e + @error "HMMM SIGNINT MAYBE?" e + end + """ + + process = open( + pipeline(`$(juliapath()) $exeflags -e $code`, stderr=custom_stderr), + read=true, + write=true, + ) + + child_process = ChildProcess(process=process) + schedule(Task() do + listen_for_messages_from_child(child_process) + end) + child_process +end + +# ╔═╡ 6a7aa0ce-0ae3-4e7d-a93a-bfdebe406220 +begin + function handle_child_message( + message, + process::ChildProcess, + input_channel::AbstractChannel, + output_channel::AbstractChannel, + ) + @warn "Unknown message type" message + end + + function handle_child_message( + message::ChannelPushMessage, + process::ChildProcess, + input_channel::AbstractChannel, + output_channel::AbstractChannel, + ) + put!(output_channel, message.value) + end + + function handle_child_message( + message::ChannelCloseMessage, + process::ChildProcess, + input_channel::AbstractChannel, + output_channel::AbstractChannel, + ) + close(output_channel) + end + + function handle_child_message( + message::ErrorMessage, + process::ChildProcess, + input_channel::AbstractChannel, + output_channel::AbstractChannel, + ) + close(output_channel, ChildProcessException( + process=process, + captured=message.error, + )) + end +end + +# ╔═╡ f99f4659-f71e-4f2e-a674-67ba69289817 +function create_channel(process::ChildProcess, expr) + channel_id = uuid4() + + # Create and register channel for responses + response_channel = Channel{Message}() + process.channels[channel_id] = response_channel + + # Tell the child process to start executing + send_message_without_response(process, Envelope( + channel_id=channel_id, + message=CreateChildChannelMessage(expr) + )) + + # Map the messages from the child process to actions + actual_values_channels = Channel(Inf) do ch + try + for message in response_channel + handle_child_message( + message, + process, + response_channel, + ch, + ) + end + catch e + @error "THIS SHOULDNT ERROR MATE" e + close(ch, e) + finally + close(ch) + close(response_channel) + end + end + + ChildChannel( + process=process, + id=channel_id, + local_channel=actual_values_channels, + ) +end + +# ╔═╡ 4b42e233-1f06-49c9-8c6a-9dc21c21ffb7 +function call(process::ChildProcess, expr) + create_channel(process, quote + Channel() do ch + put!(ch, $expr) + end + end) do channel + take!(channel) + end +end + +# ╔═╡ e3e16a8b-7124-4678-8fe7-12ed449e1954 +function call_without_fetch(process::ChildProcess, expr) + create_channel(process, quote + $expr + $(SingleTakeChannel()) + end) do channel + take!(channel) + end +end + +# ╔═╡ 7da416d1-5dfb-4510-9d32-62c1464a83d4 +function from_binary(message) + Serialization.deserialize(IOBuffer(message)) +end + +# ╔═╡ 009ad714-b759-420a-b49a-6caed7ee3faf +function Base.iterate(message_reader::ReadMessages, state=nothing) + if eof(message_reader.io) + return nothing + end + + message = from_binary(read_message(message_reader.io)) + (message, nothing) +end + +# ╔═╡ 5fb65aec-3512-4f48-98ce-300ab9fdadfe +begin + function Base.iterate(channel::ChildChannel) + send_message_without_response(channel.process, Envelope( + channel_id=channel.id, + message=ChannelTakeMessage(), + )) + Base.iterate(channel.local_channel) + end + function Base.iterate(channel::ChildChannel, x) + send_message_without_response(channel.process, Envelope( + channel_id=channel.id, + message=ChannelTakeMessage(), + )) + Base.iterate(channel.local_channel, x) + end +end + +# ╔═╡ 46d905fc-d41e-4c9a-a808-14710f64293a +begin + function handle_message_from_parent_channel( + parent_channel::ParentChannel, + message::ChannelTakeMessage, + ) + next = iterate(parent_channel.result_channel) + if next === nothing + respond_to_parent( + to=parent_channel.process, + about=parent_channel.channel_id, + with=ChannelCloseMessage(), + ) + true + else + respond_to_parent( + to=parent_channel.process, + about=parent_channel.channel_id, + with=ChannelPushMessage(next[1]) + ) + false + end + end + + function handle_message_from_parent_channel( + parent_channel::ParentChannel, + message::ChannelCloseMessage, + ) + if isopen(parent_channel.result_channel) + close(parent_channel.result_channel) + end + false + end + + function handle_message_from_parent_channel( + parent_channel::ParentChannel, + message::Message, + ) + @warn "Unknown channel message type" message + false + end +end + +# ╔═╡ 13089c5d-f833-4fb7-b8cd-4158b1a57103 +function create_parent_channel(; process, channel_id, result_channel) + Channel(Inf) do input_channel + try + for message in input_channel + parent_channel = ParentChannel( + process=process, + channel_id=channel_id, + result_channel=result_channel, + ) + + if handle_message_from_parent_channel( + parent_channel, + message, + ) + break + end + end + catch error + if error isa InterruptException + rethrow(error) + else + @error "Error in channel" error + respond_to_parent( + to=process, + about=channel_id, + with=ErrorMessage(CapturedException(error, catch_backtrace())) + ) + end + finally + close(input_channel) + end + end +end + +# ╔═╡ e4109311-8252-4793-87b8-eae807df7997 +function listen_for_messages_from_parent(process::ParentProcess) + channels = process.channels + + # ?? What do these locks do?? They look cool, but are they useful actually? + locks = Dict{UUID, ReentrantLock}() + + for envelope in ReadMessages(process.parent_to_child) + channel_id = envelope.channel_id + message = envelope.message + + channel_lock = get!(locks, envelope.channel_id) do + ReentrantLock() + end + + schedule(Task() do + lock(channel_lock) + try + if envelope.message isa CreateChildChannelMessage + @assert !haskey(process.channels, channel_id) + + result_channel = Main.eval(message.expr) + process.channels[channel_id] = create_parent_channel( + process=process, + channel_id=channel_id, + result_channel=result_channel, + ) + else + if ( + haskey(process.channels, channel_id) && + isopen(process.channels[channel_id]) + ) + put!(process.channels[channel_id], message) + else + respond_to_parent( + to=process, + about=channel_id, + with=ChannelCloseMessage(), + ) + end + end + catch error + @error "Does it error here already?" error + respond_to_parent( + to=process, + about=envelope.channel_id, + with=ErrorMessage(CapturedException(error, catch_backtrace())) + ) + finally + unlock(channel_lock) + end + end) + end +end + +# ╔═╡ 00000000-0000-0000-0000-000000000001 +PLUTO_PROJECT_TOML_CONTENTS = """ +[deps] +Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" +UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" +""" + +# ╔═╡ 00000000-0000-0000-0000-000000000002 +PLUTO_MANIFEST_TOML_CONTENTS = """ +# This file is machine-generated - editing it directly is not advised + +[[Random]] +deps = ["Serialization"] +uuid = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" + +[[SHA]] +uuid = "ea8e919c-243c-51af-8825-aaa63cd721ce" + +[[Serialization]] +uuid = "9e88b42a-f829-5b0c-bbe9-9e923198166b" + +[[UUIDs]] +deps = ["Random", "SHA"] +uuid = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" +""" + +# ╔═╡ Cell order: +# ╟─4c4b77dc-8545-4922-9897-110fa67c99f4 +# ╟─d3fe8144-6ba8-4dd9-b0e3-941a96422267 +# ╟─f99f4659-f71e-4f2e-a674-67ba69289817 +# ╟─2342d663-030f-4ed2-b1d5-5be1910b6d4c +# ╠═4b42e233-1f06-49c9-8c6a-9dc21c21ffb7 +# ╠═e3e16a8b-7124-4678-8fe7-12ed449e1954 +# ╟─934d18d4-936e-4ee0-aa6e-86aa6f66774c +# ╟─54a03cba-6d00-4632-8bd6-c60753c15ae6 +# ╠═4df77979-0c35-4139-aa5e-da43c1de3a77 +# ╠═6925ffbb-fd9e-402c-a483-c78f28f892a5 +# ╠═8c97d5cb-e346-4b94-b2a1-4fb5ff093bd5 +# ╠═63531683-e295-4ba6-811f-63b0d384ba0f +# ╠═9ba8ee0c-c80c-41d9-8739-11e6ef5d3c15 +# ╠═4289b4ba-45f2-4be7-b983-68f7d97510fa +# ╠═e6b3d1d9-0245-4dc8-a0a5-5831c254479b +# ╠═2a538f97-a9d1-4dcf-a098-5b1e5a8df3ae +# ╠═87612815-7981-4a69-a65b-4465f48c9fd9 +# ╠═009ad714-b759-420a-b49a-6caed7ee3faf +# ╟─de680a32-e053-4655-a1cd-111d81a037e6 +# ╠═ce2acdd1-b4bb-4a8f-8346-a56dfa55f56b +# ╠═87182605-f5a1-46a7-968f-bdfb9d0e08fa +# ╠═b05be9c7-8cc1-47f9-8baa-db66ac83c24f +# ╟─590a7882-3d69-48b0-bb1b-f476c7f8a885 +# ╠═46d905fc-d41e-4c9a-a808-14710f64293a +# ╠═13089c5d-f833-4fb7-b8cd-4158b1a57103 +# ╠═3fd22ac5-e1c9-42a2-8ae1-7cc4e154764a +# ╠═e4109311-8252-4793-87b8-eae807df7997 +# ╟─17cb5a65-2a72-4e7d-8fba-901452b2c19f +# ╠═3431051e-55ce-46c1-a0f5-364662f5c77b +# ╠═85920c27-d8a6-4b5c-93b6-41daa5866f9d +# ╠═af4f0720-f7f7-4d8a-bd8c-8cf5abaf10a0 +# ╠═368ccd31-1681-4a4a-a103-2b9afc0813ee +# ╠═d4da756e-fa81-49d8-8c39-d76f9d15e96f +# ╠═e787d8bd-499d-4a41-8913-62b3d9346748 +# ╠═20f66652-bca0-4e47-a4b7-502cfbcb3db5 +# ╠═6a7aa0ce-0ae3-4e7d-a93a-bfdebe406220 +# ╠═abe897ff-6e86-40eb-a1a6-2918b6c3c5a7 +# ╟─884e103a-8925-477d-a264-f15d02a49aa9 +# ╠═b8865d63-19fa-4438-87a2-ccb531bd73a4 +# ╠═e1e79669-8d0a-4b04-a7fd-469e7d8e65b1 +# ╠═ed6c16ed-bf4a-40ce-9c14-a72fd77e24a1 +# ╠═5fb65aec-3512-4f48-98ce-300ab9fdadfe +# ╟─1d63f09f-dfb2-40dd-beb5-45125cb19006 +# ╠═f81ff6f7-f230-4373-b60b-76e8a0eba929 +# ╠═fc0ca4b5-e03b-4c08-b43d-913ee12269c7 +# ╠═adfb2d12-07a0-4d5e-8e24-1676c07107c7 +# ╠═e8256f1f-c778-4fb3-a2d3-12da0e1cb3da +# ╠═f9270f05-fa22-4027-a2ca-7db61d057c56 +# ╠═0ba318bc-7b4c-4d0f-a46c-2f9dca756926 +# ╠═c83f2936-3768-4724-9c5c-335d7a4aae03 +# ╠═441c27e4-6884-4887-9cd5-bc55d0c49760 +# ╟─1f868b3c-df9a-4d4d-a6a9-9a196298a3af +# ╟─77df6f7b-ed8e-442a-9489-873fc392937c +# ╠═3a62b419-eca2-4de1-89a4-fc9ad6f68372 +# ╠═61410bd1-8456-4608-92d9-3c863f65b89c +# ╠═409707ff-b1ea-453d-b933-0a4b1e5f44c8 +# ╠═0feb758e-38d4-48c0-a5e9-9d1129b1b1b2 +# ╠═b0bcdbdf-a043-4d25-8582-ed173006e040 +# ╟─a6e50946-dd3d-4738-adc1-26534e184776 +# ╟─131a123b-e319-4939-90bf-7fb035ab2e75 +# ╟─e393ff80-3995-11ec-1217-b3642a509067 +# ╟─faf3c68e-d0fb-4dd9-ac1b-1ff8d922134b +# ╟─52495855-a52d-4edf-9c7c-811a5060e641 +# ╟─7da416d1-5dfb-4510-9d32-62c1464a83d4 +# ╟─00000000-0000-0000-0000-000000000001 +# ╟─00000000-0000-0000-0000-000000000002 diff --git a/src/evaluation/ChildProcessesTest.jl b/src/evaluation/ChildProcessesTest.jl new file mode 100644 index 0000000000..95029a9437 --- /dev/null +++ b/src/evaluation/ChildProcessesTest.jl @@ -0,0 +1,272 @@ +### A Pluto.jl notebook ### +# v0.17.7 + +using Markdown +using InteractiveUtils + +# ╔═╡ 7e613bd2-616a-4687-8af5-a22c7a747d97 +import Serialization + +# ╔═╡ d30e1e1b-fa6f-4fbc-bccb-1fcfc7b829df +import UUIDs: UUID, uuid4 + +# ╔═╡ f7d14367-27d7-41a5-9f6a-79cf5e721a7d +import PlutoUI + +# ╔═╡ fe669218-18c3-46c7-80e8-7b1ab6fa77d2 +import PlutoLinks: @ingredients, @use_task + +# ╔═╡ f9675ee0-e728-420b-81bd-22e57583c587 +import PlutoHooks: @use_effect, @use_ref, @use_state, @use_memo + +# ╔═╡ a26cc458-4578-427f-840d-71d78c5c8b01 +begin + ChildProcesses = @ingredients("./ChildProcesses.jl").ChildProcesses + var"ChildProcesses.jl" = ChildProcesses +end + +# ╔═╡ 6ff77f91-ee9c-407c-a243-09fc7e555d73 +function with_process(fn) + process = ChildProcesses.create_child_process() + try + fn(process) + finally + close(process) + end +end + +# ╔═╡ 730cfe5e-1541-4c08-8d4a-86d1f9e4115e +with_process() do process + ChildProcesses.call(process, :(throw("Hi"))) +end + +# ╔═╡ 6ab96c70-ad5d-4614-9a77-2d44d1085567 +with_process() do process + ChildProcesses.call(process, :(1 + 1)) +end + +# ╔═╡ de602aaa-704c-45c4-8b7b-fc58e41236ce +begin + struct FakeProcess <: Base.AbstractPipe + in::IO + out::IO + end + eval(:(Base.process_running(::FakeProcess) = true)) + eval(:(Base.pipe_writer(process::FakeProcess) = process.in)) + eval(:(Base.pipe_reader(process::FakeProcess) = process.out)) +end + +# ╔═╡ 49fdc8a3-0e1a-42f0-acc4-b823eec91d31 +md"---" + +# ╔═╡ 95c5a5bc-db23-4ad3-8ae8-81bc8f0edfd4 +import BenchmarkTools + +# ╔═╡ ced9d1e9-7075-4ff2-8ca2-6a349f2a69c4 +let + stream = PipeBuffer() + BenchmarkTools.@benchmark begin + input = Dict(:x => 1) + ChildProcesses.send_message($stream, ChildProcesses.to_binary(input)) + output = ChildProcesses.from_binary(ChildProcesses.read_message($stream)) + + if input != output + throw("Waoh, input and output should match but didn't!") + end + end +end + +# ╔═╡ 4baac7f2-60fe-4a6f-8612-2acf80c43ef3 +let + process = ChildProcesses.create_child_process() + + benchmark = BenchmarkTools.@benchmark begin + ChildProcesses.call($process, :(1 + 1)) + end + + kill(process) + + benchmark +end + +# ╔═╡ be18d157-6b55-4eaa-99fe-c398e992a9fa +md"### @task_result" + +# ╔═╡ 12578b59-0161-4e72-afef-825166a62121 +struct Pending end + +# ╔═╡ 34d90560-5a3e-4c7f-8126-35e1a6153aa1 +struct Result value end + +# ╔═╡ 83540498-f317-4d8b-8dc6-80a93247b3b2 +struct Failure error end + +# ╔═╡ 00000000-0000-0000-0000-000000000001 +PLUTO_PROJECT_TOML_CONTENTS = """ +[deps] +BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" +PlutoHooks = "0ff47ea0-7a50-410d-8455-4348d5de0774" +PlutoLinks = "0ff47ea0-7a50-410d-8455-4348d5de0420" +PlutoUI = "7f904dfe-b85e-4ff6-b463-dae2292396a8" +Serialization = "9e88b42a-f829-5b0c-bbe9-9e923198166b" +UUIDs = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" + +[compat] +BenchmarkTools = "~1.2.0" +PlutoHooks = "~0.0.3" +PlutoLinks = "~0.1.1" +PlutoUI = "~0.7.16" +""" + +# ╔═╡ 00000000-0000-0000-0000-000000000002 +PLUTO_MANIFEST_TOML_CONTENTS = """ +# This file is machine-generated - editing it directly is not advised + +[[Base64]] +uuid = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" + +[[BenchmarkTools]] +deps = ["JSON", "Logging", "Printf", "Profile", "Statistics", "UUIDs"] +git-tree-sha1 = "61adeb0823084487000600ef8b1c00cc2474cd47" +uuid = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" +version = "1.2.0" + +[[Dates]] +deps = ["Printf"] +uuid = "ade2ca70-3891-5945-98fb-dc099432e06a" + +[[FileWatching]] +uuid = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee" + +[[Hyperscript]] +deps = ["Test"] +git-tree-sha1 = "8d511d5b81240fc8e6802386302675bdf47737b9" +uuid = "47d2ed2b-36de-50cf-bf87-49c2cf4b8b91" +version = "0.0.4" + +[[HypertextLiteral]] +git-tree-sha1 = "5efcf53d798efede8fee5b2c8b09284be359bf24" +uuid = "ac1192a8-f4b3-4bfe-ba22-af5b92cd3ab2" +version = "0.9.2" + +[[IOCapture]] +deps = ["Logging", "Random"] +git-tree-sha1 = "f7be53659ab06ddc986428d3a9dcc95f6fa6705a" +uuid = "b5f81e59-6552-4d32-b1f0-c071b021bf89" +version = "0.2.2" + +[[InteractiveUtils]] +deps = ["Markdown"] +uuid = "b77e0a4c-d291-57a0-90e8-8db25a27a240" + +[[JSON]] +deps = ["Dates", "Mmap", "Parsers", "Unicode"] +git-tree-sha1 = "8076680b162ada2a031f707ac7b4953e30667a37" +uuid = "682c06a0-de6a-54ab-a142-c8b1cf79cde6" +version = "0.21.2" + +[[Libdl]] +uuid = "8f399da3-3557-5675-b5ff-fb832c97cbdb" + +[[LinearAlgebra]] +deps = ["Libdl"] +uuid = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" + +[[Logging]] +uuid = "56ddb016-857b-54e1-b83d-db4d58db5568" + +[[Markdown]] +deps = ["Base64"] +uuid = "d6f4376e-aef5-505a-96c1-9c027394607a" + +[[Mmap]] +uuid = "a63ad114-7e13-5084-954f-fe012c677804" + +[[Parsers]] +deps = ["Dates"] +git-tree-sha1 = "d911b6a12ba974dabe2291c6d450094a7226b372" +uuid = "69de0a69-1ddd-5017-9359-2bf0b02dc9f0" +version = "2.1.1" + +[[PlutoHooks]] +deps = ["FileWatching", "InteractiveUtils", "Markdown", "UUIDs"] +git-tree-sha1 = "f297787f7d7507dada25f6769fe3f08f6b9b8b12" +uuid = "0ff47ea0-7a50-410d-8455-4348d5de0774" +version = "0.0.3" + +[[PlutoLinks]] +deps = ["FileWatching", "InteractiveUtils", "Markdown", "PlutoHooks", "UUIDs"] +git-tree-sha1 = "5f45fc68dd9eb422358a8008e3fb8df3c01d8ab8" +uuid = "0ff47ea0-7a50-410d-8455-4348d5de0420" +version = "0.1.1" + +[[PlutoUI]] +deps = ["Base64", "Dates", "Hyperscript", "HypertextLiteral", "IOCapture", "InteractiveUtils", "JSON", "Logging", "Markdown", "Random", "Reexport", "UUIDs"] +git-tree-sha1 = "4c8a7d080daca18545c56f1cac28710c362478f3" +uuid = "7f904dfe-b85e-4ff6-b463-dae2292396a8" +version = "0.7.16" + +[[Printf]] +deps = ["Unicode"] +uuid = "de0858da-6303-5e67-8744-51eddeeeb8d7" + +[[Profile]] +deps = ["Printf"] +uuid = "9abbd945-dff8-562f-b5e8-e1ebf5ef1b79" + +[[Random]] +deps = ["Serialization"] +uuid = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" + +[[Reexport]] +git-tree-sha1 = "45e428421666073eab6f2da5c9d310d99bb12f9b" +uuid = "189a3867-3050-52da-a836-e630ba90ab69" +version = "1.2.2" + +[[SHA]] +uuid = "ea8e919c-243c-51af-8825-aaa63cd721ce" + +[[Serialization]] +uuid = "9e88b42a-f829-5b0c-bbe9-9e923198166b" + +[[SparseArrays]] +deps = ["LinearAlgebra", "Random"] +uuid = "2f01184e-e22b-5df5-ae63-d93ebab69eaf" + +[[Statistics]] +deps = ["LinearAlgebra", "SparseArrays"] +uuid = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" + +[[Test]] +deps = ["InteractiveUtils", "Logging", "Random", "Serialization"] +uuid = "8dfed614-e22c-5e08-85e1-65c5234f0b40" + +[[UUIDs]] +deps = ["Random", "SHA"] +uuid = "cf7118a7-6976-5b1a-9a39-7adc72f591a4" + +[[Unicode]] +uuid = "4ec0a83e-493e-50e2-b9ac-8f72acf5a8f5" +""" + +# ╔═╡ Cell order: +# ╠═7e613bd2-616a-4687-8af5-a22c7a747d97 +# ╠═d30e1e1b-fa6f-4fbc-bccb-1fcfc7b829df +# ╠═f7d14367-27d7-41a5-9f6a-79cf5e721a7d +# ╠═fe669218-18c3-46c7-80e8-7b1ab6fa77d2 +# ╠═f9675ee0-e728-420b-81bd-22e57583c587 +# ╠═730cfe5e-1541-4c08-8d4a-86d1f9e4115e +# ╠═6ff77f91-ee9c-407c-a243-09fc7e555d73 +# ╠═6ab96c70-ad5d-4614-9a77-2d44d1085567 +# ╠═a26cc458-4578-427f-840d-71d78c5c8b01 +# ╠═de602aaa-704c-45c4-8b7b-fc58e41236ce +# ╟─49fdc8a3-0e1a-42f0-acc4-b823eec91d31 +# ╠═95c5a5bc-db23-4ad3-8ae8-81bc8f0edfd4 +# ╠═ced9d1e9-7075-4ff2-8ca2-6a349f2a69c4 +# ╠═4baac7f2-60fe-4a6f-8612-2acf80c43ef3 +# ╟─be18d157-6b55-4eaa-99fe-c398e992a9fa +# ╠═12578b59-0161-4e72-afef-825166a62121 +# ╠═34d90560-5a3e-4c7f-8126-35e1a6153aa1 +# ╠═83540498-f317-4d8b-8dc6-80a93247b3b2 +# ╟─00000000-0000-0000-0000-000000000001 +# ╟─00000000-0000-0000-0000-000000000002 diff --git a/src/evaluation/Run.jl b/src/evaluation/Run.jl index d0dc39a2f3..3f27d7a7b3 100644 --- a/src/evaluation/Run.jl +++ b/src/evaluation/Run.jl @@ -409,6 +409,9 @@ function update_save_run!(session::ServerSession, notebook::Notebook, cells::Arr update_dependency_cache!(notebook) save && save_notebook(session, notebook) + # TODO REMOVE THIS BEFORE MERGING!!! + prerender_text = false + # _assume `prerender_text == false` if you want to skip some details_ to_run_online = if !prerender_text cells diff --git a/src/evaluation/WorkspaceManager.jl b/src/evaluation/WorkspaceManager.jl index a606823c88..06ce3965f5 100644 --- a/src/evaluation/WorkspaceManager.jl +++ b/src/evaluation/WorkspaceManager.jl @@ -6,13 +6,13 @@ import ..Pluto.PkgCompat import ..Configuration: CompilerOptions, _merge_notebook_compiler_options, _convert_to_flags import ..Pluto.ExpressionExplorer: FunctionName import ..PlutoRunner -import Distributed +import ..ChildProcesses "Contains the Julia process (in the sense of `Distributed.addprocs`) to evaluate code in. Each notebook gets at most one `Workspace` at any time, but it can also have no `Workspace` (it cannot `eval` code in this case)." Base.@kwdef mutable struct Workspace - pid::Integer + process::Any discarded::Bool=false - log_channel::Distributed.RemoteChannel + log_channel::ChildProcesses.ChildChannel module_name::Symbol dowork_token::Token=Token() nbpkg_was_active::Bool=false @@ -43,32 +43,36 @@ function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false session.options.evaluation.workspace_use_distributed end - pid = if use_distributed + process = if use_distributed create_workspaceprocess(;compiler_options=_merge_notebook_compiler_options(notebook, session.options.compiler)) else - pid = Distributed.myid() - if !(isdefined(Main, :PlutoRunner) && Main.PlutoRunner isa Module) - # we make PlutoRunner available in Main, right now it's only defined inside this Pluto module. - @eval Main begin - PlutoRunner = $(PlutoRunner) - end - end - pid + error("mehh") + # pid = Distributed.myid() + # if !(isdefined(Main, :PlutoRunner) && Main.PlutoRunner isa Module) + # # we make PlutoRunner available in Main, right now it's only defined inside this Pluto module. + # @eval Main begin + # PlutoRunner = $(PlutoRunner) + # end + # end + # pid end - Distributed.remotecall_eval(Main, [pid], :(PlutoRunner.notebook_id[] = $(notebook.notebook_id))) - log_channel = Core.eval(Main, quote - $(Distributed).RemoteChannel(() -> eval(:(Main.PlutoRunner.log_channel)), $pid) - end) - run_channel = Core.eval(Main, quote - $(Distributed).RemoteChannel(() -> eval(:(Main.PlutoRunner.run_channel)), $pid) + ChildProcesses.call_without_fetch(process, quote + PlutoRunner.notebook_id[] = $(notebook.notebook_id) end) - module_name = create_emptyworkspacemodule(pid) + # Distributed.remotecall_eval(Main, [pid], :(PlutoRunner.notebook_id[] = $(notebook.notebook_id))) + log_channel = ChildProcesses.create_channel(process, :(Main.PlutoRunner.log_channel)) + # log_channel = Core.eval(Main, quote + # $(Distributed).RemoteChannel(() -> eval(:(Main.PlutoRunner.log_channel)), $pid) + # end) + run_channel = ChildProcesses.create_channel(process, :(Main.PlutoRunner.run_channel)) + + module_name = create_emptyworkspacemodule(process) - original_LOAD_PATH, original_ACTIVE_PROJECT = Distributed.remotecall_eval(Main, pid, :(Base.LOAD_PATH, Base.ACTIVE_PROJECT[])) + original_LOAD_PATH, original_ACTIVE_PROJECT = ChildProcesses.call(process, :(Base.LOAD_PATH, Base.ACTIVE_PROJECT[])) workspace = Workspace(; - pid=pid, + process=process, log_channel=log_channel, module_name=module_name, original_LOAD_PATH=original_LOAD_PATH, @@ -97,11 +101,11 @@ function use_nbpkg_environment((session, notebook)::SN, workspace=nothing) end workspace.nbpkg_was_active = enabled - if workspace.pid != Distributed.myid() + if !(workspace.process isa ChildProcesses.LocalSandbox) new_LP = enabled ? ["@", "@stdlib"] : workspace.original_LOAD_PATH new_AP = enabled ? PkgCompat.env_dir(notebook.nbpkg_ctx) : workspace.original_ACTIVE_PROJECT - Distributed.remotecall_eval(Main, [workspace.pid], quote + ChildProcesses.call(workspace.process, quote copy!(LOAD_PATH, $(new_LP)) Base.ACTIVE_PROJECT[] = $(new_AP) end) @@ -110,7 +114,7 @@ function use_nbpkg_environment((session, notebook)::SN, workspace=nothing) end end -function start_relaying_self_updates((session, notebook)::SN, run_channel::Distributed.RemoteChannel) +function start_relaying_self_updates((session, notebook)::SN, run_channel::ChildProcesses.ChildChannel) while true try next_run_uuid = take!(run_channel) @@ -126,11 +130,7 @@ function start_relaying_self_updates((session, notebook)::SN, run_channel::Distr end end -function start_relaying_logs((session, notebook)::SN, log_channel::Distributed.RemoteChannel) - update_throttled, flush_throttled = Pluto.throttled(0.1) do - Pluto.send_notebook_changes!(Pluto.ClientRequest(session=session, notebook=notebook)) - end - +function start_relaying_logs((session, notebook)::SN, log_channel::ChildProcesses.ChildChannel) while true try next_log::Dict{String,Any} = take!(log_channel) @@ -199,49 +199,44 @@ end function bump_workspace_module(session_notebook::SN) workspace = get_workspace(session_notebook) old_name = workspace.module_name - new_name = workspace.module_name = create_emptyworkspacemodule(workspace.pid) + new_name = workspace.module_name = create_emptyworkspacemodule(workspace.process) old_name, new_name end function possible_bond_values(session_notebook::SN, n::Symbol; get_length::Bool=false) workspace = get_workspace(session_notebook) - pid = workspace.pid - - Distributed.remotecall_eval(Main, pid, quote + ChildProcesses.call(workspace.process, quote PlutoRunner.possible_bond_values($(QuoteNode(n)); get_length=$(get_length)) end) end -function create_emptyworkspacemodule(pid::Integer)::Symbol - Distributed.remotecall_eval(Main, pid, :(PlutoRunner.increment_current_module())) +function create_emptyworkspacemodule(process::ChildProcesses.ChildProcess)::Symbol + ChildProcesses.call(process, :(PlutoRunner.increment_current_module())) end -const Distributed_expr = :( - Base.loaded_modules[Base.PkgId(Base.UUID("8ba89e20-285c-5b6f-9357-94700520ee1b"), "Distributed")] -) - # NOTE: this function only start a worker process using given # compiler options, it does not resolve paths for notebooks # compiler configurations passed to it should be resolved before this -function create_workspaceprocess(;compiler_options=CompilerOptions())::Integer +function create_workspaceprocess(;compiler_options=CompilerOptions()) # run on proc 1 in case Pluto is being used inside a notebook process # Workaround for "only process 1 can add/remove workers" - pid = Distributed.remotecall_eval(Main, 1, quote - $(Distributed_expr).addprocs(1; exeflags=$(_convert_to_flags(compiler_options))) |> first - end) - Distributed.remotecall_eval(Main, [pid], process_preamble()) + exeflags = _convert_to_flags(compiler_options) + process = ChildProcesses.create_child_process(exeflags=exeflags) + + ChildProcesses.call_without_fetch(process, process_preamble()) # so that we NEVER break the workspace with an interrupt 🤕 - @async Distributed.remotecall_eval(Main, [pid], - :(while true + @async ChildProcesses.call_without_fetch(process, quote + while true try wait() catch end - end)) + end + end) - pid + process end "Return the `Workspace` of `notebook`; will be created if none exists yet." @@ -259,15 +254,13 @@ function unmake_workspace(session_notebook::Union{SN,Workspace}; async=false) workspace = get_workspace(session_notebook) workspace.discarded = true - if workspace.pid != Distributed.myid() - filter!(p -> fetch(p.second).pid != workspace.pid, workspaces) + if !(workspace.process isa ChildProcesses.LocalSandbox) + filter!(p -> fetch(p.second).process != workspace.process, workspaces) t = @async begin interrupt_workspace(workspace; verbose=false) # run on proc 1 in case Pluto is being used inside a notebook process # Workaround for "only process 1 can add/remove workers" - Distributed.remotecall_eval(Main, 1, quote - $(Distributed_expr).rmprocs($(workspace.pid)) - end) + close(workspace.process) end async || wait(t) end @@ -285,13 +278,35 @@ function distributed_exception_result(ex::Base.IOError, workspace::Workspace) ) end +function distributed_exception_result(ex::ChildProcesses.ChildProcessException, workspace::Workspace) + ( + output_formatted=PlutoRunner.format_output(CapturedException(InterruptException(), [])), + errored=true, + interrupted=true, + process_exited=false, + runtime=nothing, + published_objects=Dict{String,Any}(), + has_pluto_hook_features=false, + ) +end +function distributed_exception_result(ex::ChildProcesses.ProcessExitedException, workspace::Workspace) + ( + output_formatted=PlutoRunner.format_output(CapturedException(ex, [])), + errored=true, + interrupted=true, + process_exited=true && !workspace.discarded, # don't report a process exit if the workspace was discarded on purpose + runtime=nothing, + published_objects=Dict{String,Any}(), + has_pluto_hook_features=false, + ) +end function distributed_exception_result(exs::CompositeException, workspace::Workspace) ex = exs.exceptions |> first - if ex isa Distributed.RemoteException && - ex.pid == workspace.pid && + if ex isa ChildProcesses.ChildProcessException && + ex.process == workspace.process && ex.captured.ex isa InterruptException ( @@ -303,7 +318,7 @@ function distributed_exception_result(exs::CompositeException, workspace::Worksp published_objects=Dict{String,Any}(), has_pluto_hook_features=false, ) - elseif ex isa Distributed.ProcessExitedException + elseif ex isa ChildProcesses.ProcessExitedException ( output_formatted=PlutoRunner.format_output(CapturedException(exs, [])), errored=true, @@ -346,26 +361,28 @@ function eval_format_fetch_in_workspace( # if multiple notebooks run on the same process, then we need to `cd` between the different notebook paths if session_notebook isa Tuple - if workspace.pid == Distributed.myid() + if workspace.process isa ChildProcesses.LocalSandbox cd_workspace(workspace, session_notebook[2].path) end use_nbpkg_environment(session_notebook, workspace) end - + # run the code 🏃‍♀️ # a try block (on this process) to catch an InterruptException take!(workspace.dowork_token) early_result = try # we use [pid] instead of pid to prevent fetching output - Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.run_expression( - getfield(Main, $(QuoteNode(workspace.module_name))), - $(QuoteNode(expr)), - $cell_id, - $function_wrapped_info, - $forced_expr_id, - user_requested_run=$user_requested_run, - ))) + ChildProcesses.call_without_fetch(workspace.process, quote + PlutoRunner.run_expression( + getfield(Main, $(QuoteNode(workspace.module_name))), + $(QuoteNode(expr)), + $cell_id, + $function_wrapped_info, + $forced_expr_id, + user_requested_run=$user_requested_run, + ) + end) put!(workspace.dowork_token) nothing catch exs @@ -383,7 +400,9 @@ end function eval_in_workspace(session_notebook::Union{SN,Workspace}, expr) workspace = get_workspace(session_notebook) - Distributed.remotecall_eval(Main, [workspace.pid], :(Core.eval($(workspace.module_name), $(expr |> QuoteNode)))) + ChildProcesses.call_without_fetch(workspace.process, quote + Core.eval($(workspace.module_name), $(expr |> QuoteNode)) + end) nothing end @@ -399,13 +418,15 @@ function format_fetch_in_workspace( # instead of fetching the output value (which might not make sense in our context, since the user can define structs, types, functions, etc), we format the cell output on the worker, and fetch the formatted output. withtoken(workspace.dowork_token) do try - Distributed.remotecall_eval(Main, workspace.pid, :(PlutoRunner.formatted_result_of( - $cell_id, - $ends_with_semicolon, - $known_published_objects, - $showmore_id, - getfield(Main, $(QuoteNode(workspace.module_name))), - ))) + ChildProcesses.call(workspace.process, quote + PlutoRunner.formatted_result_of( + $cell_id, + $ends_with_semicolon, + $known_published_objects, + $showmore_id, + getfield(Main, $(QuoteNode(workspace.module_name))), + ) + end) catch ex distributed_exception_result(CompositeException([ex]), workspace) end @@ -420,7 +441,7 @@ function collect_soft_definitions(session_notebook::SN, modules::Set{Expr}) PlutoRunner.collect_soft_definitions($module_name, $modules) end - Distributed.remotecall_eval(Main, workspace.pid, ex) + ChildProcesses.call(workspace.process, ex) end @@ -428,18 +449,20 @@ function macroexpand_in_workspace(session_notebook::Union{SN,Workspace}, macroca workspace = get_workspace(session_notebook) module_name = module_name === nothing ? workspace.module_name : module_name - Distributed.remotecall_eval(Main, workspace.pid, quote + ChildProcesses.call(workspace.process, quote try (true, PlutoRunner.try_macroexpand($(module_name), $(cell_uuid), $(macrocall |> QuoteNode))) - catch error - # We have to be careful here, for example a thrown `MethodError()` will contain the called method and arguments. - # which normally would be very useful for debugging, but we can't serialize it! - # So we make sure we only serialize the exception we know about, and string-ify the others. - if (error isa LoadError && error.error isa UndefVarError) || error isa UndefVarError - (false, error) - else - (false, ErrorException(sprint(showerror, error))) - end + catch e + (false, e) + # catch error + # # We have to be careful here, for example a thrown `MethodError()` will contain the called method and arguments. + # # which normally would be very useful for debugging, but we can't serialize it! + # # So we make sure we only serialize the exception we know about, and string-ify the others. + # if (error isa LoadError && error.error isa UndefVarError) || error isa UndefVarError + # (false, error) + # else + # (false, ErrorException(sprint(showerror, error))) + # end end end) end @@ -448,13 +471,15 @@ end function eval_fetch_in_workspace(session_notebook::Union{SN,Workspace}, expr) workspace = get_workspace(session_notebook) - Distributed.remotecall_eval(Main, workspace.pid, :(Core.eval($(workspace.module_name), $(expr |> QuoteNode)))) + ChildProcesses.call(workspace.process, :(Core.eval($(workspace.module_name), $(expr |> QuoteNode)))) end function do_reimports(session_notebook::Union{SN,Workspace}, module_imports_to_move::Set{Expr}) workspace = get_workspace(session_notebook) workspace_name = workspace.module_name - Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.do_reimports($(workspace_name), $module_imports_to_move))) + ChildProcesses.call_without_fetch(workspace.process, quote + PlutoRunner.do_reimports($(workspace_name), $module_imports_to_move) + end) end "Move variables to a new module. A given set of variables to be 'deleted' will not be moved to the new module, making them unavailable. " @@ -462,7 +487,9 @@ function move_vars(session_notebook::Union{SN,Workspace}, old_workspace_name::Sy workspace = get_workspace(session_notebook) new_workspace_name = something(new_workspace_name, workspace.module_name) - Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.move_vars($(old_workspace_name |> QuoteNode), $(new_workspace_name |> QuoteNode), $to_delete, $methods_to_delete, $module_imports_to_move))) + ChildProcesses.call_without_fetch(workspace.process, quote + PlutoRunner.move_vars($(old_workspace_name |> QuoteNode), $(new_workspace_name |> QuoteNode), $to_delete, $methods_to_delete, $module_imports_to_move) + end) end move_vars(session_notebook::Union{SN,Workspace}, to_delete::Set{Symbol}, methods_to_delete::Set{Tuple{UUID,FunctionName}}, module_imports_to_move::Set{Expr}; kwargs...) = @@ -489,6 +516,8 @@ end function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true)::Bool workspace = get_workspace(session_notebook) + verbose=true + if poll(() -> isready(workspace.dowork_token), 2.0, 5/100) verbose && println("Cell finished, other cells cancelled!") return true @@ -500,7 +529,7 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true https://docs.microsoft.com/en-us/windows/wsl" return false end - if workspace.pid == Distributed.myid() + if workspace.process isa ChildProcesses.LocalSandbox verbose && @warn """Cells in this workspace can't be stopped, because it is not running in a separate workspace. Use `ENV["PLUTO_WORKSPACE_USE_DISTRIBUTED"]` to control whether future workspaces are generated in a separate process.""" return false end @@ -514,8 +543,8 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true # TODO: this will also kill "pending" evaluations, and any evaluations started within 100ms of the kill. A global "evaluation count" would fix this. # TODO: listen for the final words of the remote process on stdout/stderr: "Force throwing a SIGINT" try - verbose && @info "Sending interrupt to process $(workspace.pid)" - Distributed.interrupt(workspace.pid) + verbose && @info "Sending interrupt to process $(workspace)..." + kill(workspace.process, Base.SIGINT) if poll(() -> isready(workspace.dowork_token), 5.0, 5/100) verbose && println("Cell interrupted!") @@ -526,7 +555,7 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true while !isready(workspace.dowork_token) for _ in 1:5 verbose && print(" 🔥 ") - Distributed.interrupt(workspace.pid) + kill(workspace.process, Base.SIGINT) sleep(0.18) if isready(workspace.dowork_token) break @@ -539,8 +568,7 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true true catch ex if !(ex isa KeyError) - @warn "Interrupt failed for unknown reason" - showerror(ex, stacktrace(catch_backtrace())) + @warn "Interrupt failed for unknown reason" sprint(showerror, ex, stacktrace(catch_backtrace())) end false end diff --git a/src/webserver/REPLTools.jl b/src/webserver/REPLTools.jl index b4b5fd9b35..171c6b0898 100644 --- a/src/webserver/REPLTools.jl +++ b/src/webserver/REPLTools.jl @@ -1,9 +1,9 @@ import FuzzyCompletions: complete_path, completion_text, score -import Distributed import .PkgCompat: package_completions using Markdown import REPL + ### # RESPONSES FOR AUTOCOMPLETE & DOCS ### @@ -79,10 +79,10 @@ responses[:complete] = function response_complete(🙋::ClientRequest) if will_run_code(🙋.notebook) && isready(workspace.dowork_token) # we don't use eval_format_fetch_in_workspace because we don't want the output to be string-formatted. # This works in this particular case, because the return object, a `Completion`, exists in this scope too. - Distributed.remotecall_eval(Main, workspace.pid, :(PlutoRunner.completion_fetcher( + ChildProcesses.call(workspace.process, :(PlutoRunner.completion_fetcher( $query, $pos, getfield(Main, $(QuoteNode(workspace.module_name))), - ))) + ))) else # We can at least autocomplete general julia things: PlutoRunner.completion_fetcher(query, pos, Main) @@ -121,7 +121,7 @@ responses[:docs] = function response_docs(🙋::ClientRequest) workspace = WorkspaceManager.get_workspace((🙋.session, 🙋.notebook)) if will_run_code(🙋.notebook) && isready(workspace.dowork_token) - Distributed.remotecall_eval(Main, workspace.pid, :(PlutoRunner.doc_fetcher( + ChildProcesses.call(workspace.process, :(PlutoRunner.doc_fetcher( $query, getfield(Main, $(QuoteNode(workspace.module_name))), )))