diff --git a/src/evaluation/WorkspaceManager.jl b/src/evaluation/WorkspaceManager.jl index c556e97e15..314dc99a01 100644 --- a/src/evaluation/WorkspaceManager.jl +++ b/src/evaluation/WorkspaceManager.jl @@ -8,7 +8,11 @@ import ..Pluto.ExpressionExplorer: FunctionName import ..PlutoRunner import Distributed -"Contains the Julia process (in the sense of `Distributed.addprocs`) to evaluate code in. Each notebook gets at most one `Workspace` at any time, but it can also have no `Workspace` (it cannot `eval` code in this case)." +""" +Contains the Julia process (in the sense of `Distributed.addprocs`) to evaluate code in. +Each notebook gets at most one `Workspace` at any time, but it can also have no `Workspace` +(it cannot `eval` code in this case). +""" Base.@kwdef mutable struct Workspace pid::Integer notebook_id::UUID @@ -22,29 +26,32 @@ Base.@kwdef mutable struct Workspace original_ACTIVE_PROJECT::Union{Nothing,String}=nothing end -"These expressions get evaluated whenever a new `Workspace` process is created." -process_preamble() = quote - ccall(:jl_exit_on_sigint, Cvoid, (Cint,), 0) - include($(project_relative_path(joinpath("src", "runner"), "Loader.jl"))) - ENV["GKSwstype"] = "nul" - ENV["JULIA_REVISE_WORKER_ONLY"] = "1" -end +const SN = Tuple{ServerSession, Notebook} + +const active_workspaces = Dict{UUID,Task}() -const workspaces = Dict{UUID,Task}() "Set of notebook IDs that we will never make a process for again." const discarded_workspaces = Set{UUID}() -const SN = Tuple{ServerSession,Notebook} +const Distributed_expr = quote + Base.loaded_modules[Base.PkgId(Base.UUID("8ba89e20-285c-5b6f-9357-94700520ee1b"), "Distributed")] +end + +"These expressions get evaluated whenever a new `Workspace` process is created." +function process_preamble() + quote + Base.exit_on_sigint(false) + include($(project_relative_path(joinpath("src", "runner"), "Loader.jl"))) + ENV["GKSwstype"] = "nul" + ENV["JULIA_REVISE_WORKER_ONLY"] = "1" + end +end -"""Create a workspace for the notebook, optionally in the main process.""" +"Create a workspace for the notebook, optionally in the main process." function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false)::Workspace is_offline_renderer || (notebook.process_status = ProcessStatus.starting) - use_distributed = if is_offline_renderer - false - else - session.options.evaluation.workspace_use_distributed - end + use_distributed = !is_offline_renderer && session.options.evaluation.workspace_use_distributed pid = if use_distributed @debug "Creating workspace process" notebook.path length(notebook.cells) @@ -52,7 +59,7 @@ function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false else pid = Distributed.myid() if !(isdefined(Main, :PlutoRunner) && Main.PlutoRunner isa Module) - # we make PlutoRunner available in Main, right now it's only defined inside this Pluto module. + # Make PlutoRunner available in Main, right now it's only defined inside this Pluto module. @eval Main begin PlutoRunner = $(PlutoRunner) end @@ -62,18 +69,18 @@ function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false Distributed.remotecall_eval(Main, [pid], session.options.evaluation.workspace_custom_startup_expr) - Distributed.remotecall_eval(Main, [pid], :(PlutoRunner.notebook_id[] = $(notebook.notebook_id))) + Distributed.remotecall_eval(Main, [pid], quote + PlutoRunner.notebook_id[] = $(notebook.notebook_id) + end) remote_log_channel = Core.eval(Main, quote $(Distributed).RemoteChannel(() -> eval(quote - channel = Channel{Any}(10) Main.PlutoRunner.setup_plutologger( - $($(notebook.notebook_id)), - channel; + $($(notebook.notebook_id)), + channel; make_global=$($(use_distributed)) ) - channel end), $pid) end) @@ -81,6 +88,7 @@ function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false run_channel = Core.eval(Main, quote $(Distributed).RemoteChannel(() -> eval(:(Main.PlutoRunner.run_channel)), $pid) end) + module_name = create_emptyworkspacemodule(pid) original_LOAD_PATH, original_ACTIVE_PROJECT = Distributed.remotecall_eval(Main, pid, :(Base.LOAD_PATH, Base.ACTIVE_PROJECT[])) @@ -108,14 +116,10 @@ end function use_nbpkg_environment((session, notebook)::SN, workspace=nothing) enabled = notebook.nbpkg_ctx !== nothing - if workspace.nbpkg_was_active == enabled - return - end + workspace.nbpkg_was_active == enabled && return workspace = workspace !== nothing ? workspace : get_workspace((session, notebook)) - if workspace.discarded - return - end + workspace.discarded && return workspace.nbpkg_was_active = enabled if workspace.pid != Distributed.myid() @@ -127,7 +131,7 @@ function use_nbpkg_environment((session, notebook)::SN, workspace=nothing) Base.ACTIVE_PROJECT[] = $(new_AP) end) else - # uhmmmmmm TODO + # TODO end end @@ -159,26 +163,24 @@ function start_relaying_logs((session, notebook)::SN, log_channel::Distributed.R fn = next_log["file"] match = findfirst("#==#", fn) - # We always show the log at the currently running cell, which is given by + # Show the log at the currently running cell, which is given by running_cell_id = next_log["cell_id"]::UUID # Some logs originate from outside of the running code, through function calls. Some code here to deal with that: begin source_cell_id = if match !== nothing - # the log originated from within the notebook - + # The log originated from within the notebook UUID(fn[match[end]+1:end]) else - # the log originated from a function call defined outside of the notebook - - # we will show the log at the currently running cell, at "line -1", i.e. without line info. + # The log originated from a function call defined outside of the notebook. + # Show the log at the currently running cell, at "line -1", i.e. without line info. next_log["line"] = -1 running_cell_id end if running_cell_id != source_cell_id - # the log originated from a function in another cell of the notebook - # we will show the log at the currently running cell, at "line -1", i.e. without line info. + # The log originated from a function in another cell of the notebook + # Show the log at the currently running cell, at "line -1", i.e. without line info. next_log["line"] = -1 end end @@ -226,7 +228,7 @@ end "Call `cd(\$path)` inside the workspace. This is done when creating a workspace, and whenever the notebook changes path." function cd_workspace(workspace, path::AbstractString) eval_in_workspace(workspace, quote - cd($(path |> dirname)) + cd(dirname($path)) end) end @@ -241,21 +243,18 @@ end function possible_bond_values(session_notebook::SN, n::Symbol; get_length::Bool=false) workspace = get_workspace(session_notebook) - pid = workspace.pid - Distributed.remotecall_eval(Main, pid, quote + Distributed.remotecall_eval(Main, workspace.pid, quote PlutoRunner.possible_bond_values($(QuoteNode(n)); get_length=$(get_length)) end) end function create_emptyworkspacemodule(pid::Integer)::Symbol - Distributed.remotecall_eval(Main, pid, :(PlutoRunner.increment_current_module())) + Distributed.remotecall_eval(Main, pid, quote + PlutoRunner.increment_current_module() + end) end -const Distributed_expr = :( - Base.loaded_modules[Base.PkgId(Base.UUID("8ba89e20-285c-5b6f-9357-94700520ee1b"), "Distributed")] -) - # NOTE: this function only start a worker process using given # compiler options, it does not resolve paths for notebooks # compiler configurations passed to it should be resolved before this @@ -269,12 +268,13 @@ function create_workspaceprocess(; compiler_options=CompilerOptions())::Integer Distributed.remotecall_eval(Main, [pid], process_preamble()) # so that we NEVER break the workspace with an interrupt 🤕 - @async Distributed.remotecall_eval(Main, [pid], - :(while true + @async Distributed.remotecall_eval(Main, [pid], quote + while true try wait() catch end - end)) + end + end) pid end @@ -291,14 +291,15 @@ function get_workspace(session_notebook::SN; allow_creation::Bool=true)::Union{N error("Cannot run code in this notebook: it has already shut down.") end - task = !allow_creation ? - get(workspaces, notebook.notebook_id, nothing) : - get!(workspaces, notebook.notebook_id) do - Task(() -> make_workspace(session_notebook)) + task = if !allow_creation + get(active_workspaces, notebook.notebook_id, nothing) + else + get!(active_workspaces, notebook.notebook_id) do + Task(() -> make_workspace(session_notebook)) + end end isnothing(task) && return nothing - istaskstarted(task) || schedule(task) fetch(task) end @@ -313,7 +314,7 @@ function unmake_workspace(session_notebook::SN; async::Bool=false, verbose::Bool allow_restart || push!(discarded_workspaces, notebook.notebook_id) if workspace.pid != Distributed.myid() - filter!(p -> fetch(p.second).pid != workspace.pid, workspaces) + filter!(p -> fetch(p.second).pid != workspace.pid, active_workspaces) t = @async begin interrupt_workspace(workspace; verbose=false) # run on proc 1 in case Pluto is being used inside a notebook process @@ -345,15 +346,12 @@ function distributed_exception_result(ex::Base.IOError, workspace::Workspace) ) end - - function distributed_exception_result(exs::CompositeException, workspace::Workspace) - ex = exs.exceptions |> first + ex = first(exs.exceptions) if ex isa Distributed.RemoteException && - ex.pid == workspace.pid && - ex.captured.ex isa InterruptException - + ex.pid == workspace.pid && + ex.captured.ex isa InterruptException ( output_formatted=PlutoRunner.format_output(CapturedException(InterruptException(), [])), errored=true, @@ -388,9 +386,12 @@ function distributed_exception_result(exs::CompositeException, workspace::Worksp end -"Evaluate expression inside the workspace - output is fetched and formatted, errors are caught and formatted. Returns formatted output and error flags. +""" +Evaluate expression inside the workspace - output is fetched and formatted, +errors are caught and formatted. Returns formatted output and error flags. -`expr` has to satisfy `ExpressionExplorer.is_toplevel_expr`." +`expr` has to satisfy `ExpressionExplorer.is_toplevel_expr`. +""" function eval_format_fetch_in_workspace( session_notebook::Union{SN,Workspace}, expr::Expr, @@ -415,40 +416,46 @@ function eval_format_fetch_in_workspace( use_nbpkg_environment(session_notebook, workspace) end - # run the code 🏃‍♀️ + # Run the code 🏃 - # a try block (on this process) to catch an InterruptException + # A try block (on this process) to catch an InterruptException take!(workspace.dowork_token) early_result = try - # we use [pid] instead of pid to prevent fetching output - Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.run_expression( - getfield(Main, $(QuoteNode(workspace.module_name))), - $(QuoteNode(expr)), - $(workspace.notebook_id), - $cell_id, - $function_wrapped_info, - $forced_expr_id; - user_requested_run=$user_requested_run, - capture_stdout=$(capture_stdout && !is_on_this_process), - ))) + # Use [pid] instead of pid to prevent fetching output + Distributed.remotecall_eval(Main, [workspace.pid], quote + PlutoRunner.run_expression( + getfield(Main, $(QuoteNode(workspace.module_name))), + $(QuoteNode(expr)), + $(workspace.notebook_id), + $cell_id, + $function_wrapped_info, + $forced_expr_id; + user_requested_run=$user_requested_run, + capture_stdout=$(capture_stdout && !is_on_this_process), + ) + end) put!(workspace.dowork_token) nothing - catch exs - # We don't use a `finally` because the token needs to be back asap for the interrupting code to pick it up. + catch e + # Don't use a `finally` because the token needs to be back asap for the interrupting code to pick it up. put!(workspace.dowork_token) - distributed_exception_result(exs, workspace) + distributed_exception_result(e, workspace) end - early_result === nothing ? - format_fetch_in_workspace(workspace, cell_id, ends_with_semicolon, known_published_objects) : - early_result + if early_result === nothing + format_fetch_in_workspace(workspace, cell_id, ends_with_semicolon, known_published_objects) + else + early_result + end end "Evaluate expression inside the workspace - output is not fetched, errors are rethrown. For internal use." function eval_in_workspace(session_notebook::Union{SN,Workspace}, expr) workspace = get_workspace(session_notebook) - Distributed.remotecall_eval(Main, [workspace.pid], :(Core.eval($(workspace.module_name), $(expr |> QuoteNode)))) + Distributed.remotecall_eval(Main, [workspace.pid], quote + Core.eval($(workspace.module_name), $(QuoteNode(expr))) + end) nothing end @@ -461,42 +468,42 @@ function format_fetch_in_workspace( )::PlutoRunner.FormattedCellResult workspace = get_workspace(session_notebook) - # instead of fetching the output value (which might not make sense in our context, since the user can define structs, types, functions, etc), we format the cell output on the worker, and fetch the formatted output. + # Instead of fetching the output value (which might not make sense in our context, + # since the user can define structs, types, functions, etc), + # we format the cell output on the worker, and fetch the formatted output. withtoken(workspace.dowork_token) do try - Distributed.remotecall_eval(Main, workspace.pid, :(PlutoRunner.formatted_result_of( - $(workspace.notebook_id), - $cell_id, - $ends_with_semicolon, - $known_published_objects, - $showmore_id, - getfield(Main, $(QuoteNode(workspace.module_name))), - ))) - catch ex - distributed_exception_result(CompositeException([ex]), workspace) + Distributed.remotecall_eval(Main, workspace.pid, quote + PlutoRunner.formatted_result_of( + $(workspace.notebook_id), + $cell_id, + $ends_with_semicolon, + $known_published_objects, + $showmore_id, + getfield(Main, $(QuoteNode(workspace.module_name))), + ) + end) + catch e + distributed_exception_result(CompositeException([e]), workspace) end end end function collect_soft_definitions(session_notebook::SN, modules::Set{Expr}) workspace = get_workspace(session_notebook) - module_name = workspace.module_name - - ex = quote - PlutoRunner.collect_soft_definitions($module_name, $modules) - end - Distributed.remotecall_eval(Main, workspace.pid, ex) + Distributed.remotecall_eval(Main, workspace.pid, quote + PlutoRunner.collect_soft_definitions($(workspace.module_name), $modules) + end) end - function macroexpand_in_workspace(session_notebook::Union{SN,Workspace}, macrocall, cell_id, module_name=nothing)::Tuple{Bool,Any} workspace = get_workspace(session_notebook) module_name = module_name === nothing ? workspace.module_name : module_name Distributed.remotecall_eval(Main, workspace.pid, quote try - (true, PlutoRunner.try_macroexpand($(module_name), $(workspace.notebook_id), $(cell_id), $(macrocall |> QuoteNode))) + (true, PlutoRunner.try_macroexpand($module_name, $(workspace.notebook_id), $cell_id, $(QuoteNode(macrocall)))) catch error # We have to be careful here, for example a thrown `MethodError()` will contain the called method and arguments. # which normally would be very useful for debugging, but we can't serialize it! @@ -514,25 +521,52 @@ end function eval_fetch_in_workspace(session_notebook::Union{SN,Workspace}, expr) workspace = get_workspace(session_notebook) - Distributed.remotecall_eval(Main, workspace.pid, :(Core.eval($(workspace.module_name), $(expr |> QuoteNode)))) + Distributed.remotecall_eval(Main, workspace.pid, quote + Core.eval($(workspace.module_name), $(QuoteNode(expr))) + end) end function do_reimports(session_notebook::Union{SN,Workspace}, module_imports_to_move::Set{Expr}) workspace = get_workspace(session_notebook) - workspace_name = workspace.module_name - Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.do_reimports($(workspace_name), $module_imports_to_move))) + + Distributed.remotecall_eval(Main, [workspace.pid], quote + PlutoRunner.do_reimports($(workspace.module_name), $module_imports_to_move) + end) end -"Move variables to a new module. A given set of variables to be 'deleted' will not be moved to the new module, making them unavailable. " -function move_vars(session_notebook::Union{SN,Workspace}, old_workspace_name::Symbol, new_workspace_name::Union{Nothing,Symbol}, to_delete::Set{Symbol}, methods_to_delete::Set{Tuple{UUID,FunctionName}}, module_imports_to_move::Set{Expr}, invalidated_cell_uuids::Set{UUID}; kwargs...) +""" +Move variables to a new module. Variables to be 'deleted' will not be moved to +the new module, making them unavailable. +""" +function move_vars( + session_notebook::Union{SN,Workspace}, + old_workspace_name::Symbol, + new_workspace_name::Union{Nothing,Symbol}, + to_delete::Set{Symbol}, + methods_to_delete::Set{Tuple{UUID,FunctionName}}, + module_imports_to_move::Set{Expr}, + invalidated_cell_uuids::Set{UUID}; + kwargs... + ) + workspace = get_workspace(session_notebook) new_workspace_name = something(new_workspace_name, workspace.module_name) - Distributed.remotecall_eval(Main, [workspace.pid], :(PlutoRunner.move_vars($(old_workspace_name |> QuoteNode), $(new_workspace_name |> QuoteNode), $to_delete, $methods_to_delete, $module_imports_to_move, $invalidated_cell_uuids))) + Distributed.remotecall_eval(Main, [workspace.pid], quote + PlutoRunner.move_vars( + $(QuoteNode(old_workspace_name)), + $(QuoteNode(new_workspace_name)), + $to_delete, + $methods_to_delete, + $module_imports_to_move, + $invalidated_cell_uuids, + ) + end) end -move_vars(session_notebook::Union{SN,Workspace}, to_delete::Set{Symbol}, methods_to_delete::Set{Tuple{UUID,FunctionName}}, module_imports_to_move::Set{Expr}, invalidated_cell_uuids::Set{UUID}; kwargs...) = +function move_vars(session_notebook::Union{SN,Workspace}, to_delete::Set{Symbol}, methods_to_delete::Set{Tuple{UUID,FunctionName}}, module_imports_to_move::Set{Expr}, invalidated_cell_uuids::Set{UUID}; kwargs...) move_vars(session_notebook, bump_workspace_module(session_notebook)..., to_delete, methods_to_delete, module_imports_to_move, invalidated_cell_uuids; kwargs...) +end # TODO: delete me @deprecate( @@ -605,10 +639,12 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true https://docs.microsoft.com/en-us/windows/wsl" return false end + if workspace.pid == Distributed.myid() verbose && @warn """Cells in this workspace can't be stopped, because it is not running in a separate workspace. Use `ENV["PLUTO_WORKSPACE_USE_DISTRIBUTED"]` to control whether future workspaces are generated in a separate process.""" return false end + if isready(workspace.dowork_token) verbose && @info "Tried to stop idle workspace - ignoring." return true @@ -642,10 +678,10 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true verbose && println() verbose && println("Cell interrupted!") true - catch ex - if !(ex isa KeyError) + catch e + if !(e isa KeyError) @warn "Interrupt failed for unknown reason" - showerror(ex, stacktrace(catch_backtrace())) + showerror(e, stacktrace(catch_backtrace())) end false end diff --git a/test/React.jl b/test/React.jl index eb1b5e0d7a..eff57fc685 100644 --- a/test/React.jl +++ b/test/React.jl @@ -26,7 +26,7 @@ import Distributed Cell("Distributed.myid()"), ]) - @test !haskey(WorkspaceManager.workspaces, notebook.notebook_id) + @test !haskey(WorkspaceManager.active_workspaces, notebook.notebook_id) update_run!(🍭, notebook, notebook.cells[1:2]) @test notebook.cells[1].output.body == notebook.cells[2].output.body diff --git a/test/webserver.jl b/test/webserver.jl index 9afbd3bdf1..0d94e1887c 100644 --- a/test/webserver.jl +++ b/test/webserver.jl @@ -91,7 +91,7 @@ end # right now, the notebook was only added to the session and assigned an ID. Let's wait for it to get a process: @test poll(60) do - haskey(WorkspaceManager.workspaces, notebook.notebook_id) + haskey(WorkspaceManager.active_workspaces, notebook.notebook_id) end sleep(1)