diff --git a/Project.toml b/Project.toml index 7c7f655297..1744e0881d 100644 --- a/Project.toml +++ b/Project.toml @@ -8,7 +8,6 @@ version = "0.19.27" Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f" Configurations = "5218b696-f38b-4ac9-8b61-a12ec717816d" Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" -Distributed = "8ba89e20-285c-5b6f-9357-94700520ee1b" FileWatching = "7b1f6079-737a-58dc-b8bc-7a2ca5c1b5ee" FuzzyCompletions = "fb4132e2-a121-4a70-b8a1-d5b831dcdcc2" HTTP = "cd3eb016-35fb-5094-929b-558a96fad6f3" @@ -17,6 +16,7 @@ InteractiveUtils = "b77e0a4c-d291-57a0-90e8-8db25a27a240" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" LoggingExtras = "e6f89c97-d47a-5376-807f-9c37f3926c36" MIMEs = "6c6e2e6c-3030-632d-7369-2d6c69616d65" +Malt = "36869731-bdee-424d-aa32-cab38c994e3b" Markdown = "d6f4376e-aef5-505a-96c1-9c027394607a" MsgPack = "99f44e22-a591-53d1-9472-aa23ef4bd671" Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" @@ -38,6 +38,7 @@ HTTP = "^1.5.2" HypertextLiteral = "0.7, 0.8, 0.9" LoggingExtras = "0.4, 1" MIMEs = "0.1" +Malt = "1.0.3" MsgPack = "1.1" PrecompileSignatures = "3" PrecompileTools = "1" diff --git a/src/Configuration.jl b/src/Configuration.jl index 95236fa6b4..2b7eed6ae9 100644 --- a/src/Configuration.jl +++ b/src/Configuration.jl @@ -140,6 +140,7 @@ end const RUN_NOTEBOOK_ON_LOAD_DEFAULT = true const WORKSPACE_USE_DISTRIBUTED_DEFAULT = true +const WORKSPACE_USE_DISTRIBUTED_STDLIB_DEFAULT = nothing const LAZY_WORKSPACE_CREATION_DEFAULT = false const CAPTURE_STDOUT_DEFAULT = true const WORKSPACE_CUSTOM_STARTUP_EXPR_DEFAULT = nothing @@ -152,6 +153,7 @@ These options are not intended to be changed during normal use. - `run_notebook_on_load::Bool = $RUN_NOTEBOOK_ON_LOAD_DEFAULT` Whether to evaluate a notebook on load. - `workspace_use_distributed::Bool = $WORKSPACE_USE_DISTRIBUTED_DEFAULT` Whether to start notebooks in a separate process. +- `workspace_use_distributed_stdlib::Bool? = $WORKSPACE_USE_DISTRIBUTED_STDLIB_DEFAULT` Should we use the Distributed stdlib to run processes? Distributed will be replaced by Malt.jl, you can use this option to already get the old behaviour. `nothing` means: determine automatically (which is currently the same as `true`). - `lazy_workspace_creation::Bool = $LAZY_WORKSPACE_CREATION_DEFAULT` - `capture_stdout::Bool = $CAPTURE_STDOUT_DEFAULT` - `workspace_custom_startup_expr::Union{Nothing,Expr} = $WORKSPACE_CUSTOM_STARTUP_EXPR_DEFAULT` An expression to be evaluated in the workspace process before running notebook code. @@ -159,6 +161,7 @@ These options are not intended to be changed during normal use. @option mutable struct EvaluationOptions run_notebook_on_load::Bool = RUN_NOTEBOOK_ON_LOAD_DEFAULT workspace_use_distributed::Bool = WORKSPACE_USE_DISTRIBUTED_DEFAULT + workspace_use_distributed_stdlib::Union{Bool,Nothing} = WORKSPACE_USE_DISTRIBUTED_STDLIB_DEFAULT lazy_workspace_creation::Bool = LAZY_WORKSPACE_CREATION_DEFAULT capture_stdout::Bool = CAPTURE_STDOUT_DEFAULT workspace_custom_startup_expr::Union{Nothing,Expr} = WORKSPACE_CUSTOM_STARTUP_EXPR_DEFAULT @@ -292,6 +295,7 @@ function from_flat_kwargs(; run_notebook_on_load::Bool = RUN_NOTEBOOK_ON_LOAD_DEFAULT, workspace_use_distributed::Bool = WORKSPACE_USE_DISTRIBUTED_DEFAULT, + workspace_use_distributed_stdlib::Union{Bool,Nothing} = WORKSPACE_USE_DISTRIBUTED_STDLIB_DEFAULT, lazy_workspace_creation::Bool = LAZY_WORKSPACE_CREATION_DEFAULT, capture_stdout::Bool = CAPTURE_STDOUT_DEFAULT, workspace_custom_startup_expr::Union{Nothing,Expr} = WORKSPACE_CUSTOM_STARTUP_EXPR_DEFAULT, @@ -340,6 +344,7 @@ function from_flat_kwargs(; evaluation = EvaluationOptions(; run_notebook_on_load, workspace_use_distributed, + workspace_use_distributed_stdlib, lazy_workspace_creation, capture_stdout, workspace_custom_startup_expr, diff --git a/src/evaluation/WorkspaceManager.jl b/src/evaluation/WorkspaceManager.jl index 55fb090655..0d952a7d55 100644 --- a/src/evaluation/WorkspaceManager.jl +++ b/src/evaluation/WorkspaceManager.jl @@ -7,18 +7,19 @@ import ..Pluto.PkgCompat import ..Configuration: CompilerOptions, _merge_notebook_compiler_options, _convert_to_flags import ..Pluto.ExpressionExplorer: FunctionName import ..PlutoRunner -import Distributed +import Malt +import Malt.Distributed """ -Contains the Julia process (in the sense of `Distributed.addprocs`) to evaluate code in. +Contains the Julia process to evaluate code in. Each notebook gets at most one `Workspace` at any time, but it can also have no `Workspace` (it cannot `eval` code in this case). """ Base.@kwdef mutable struct Workspace - pid::Integer + worker::Malt.AbstractWorker notebook_id::UUID discarded::Bool=false - remote_log_channel::Distributed.RemoteChannel + remote_log_channel::Union{Distributed.RemoteChannel,AbstractChannel} module_name::Symbol dowork_token::Token=Token() nbpkg_was_active::Bool=false @@ -29,25 +30,19 @@ end const SN = Tuple{ServerSession, Notebook} +"These expressions get evaluated whenever a new `Workspace` process is created." +process_preamble() = quote + Base.exit_on_sigint(false) + include($(project_relative_path(joinpath("src", "runner"), "Loader.jl"))) + ENV["GKSwstype"] = "nul" + ENV["JULIA_REVISE_WORKER_ONLY"] = "1" +end + const active_workspaces = Dict{UUID,Task}() "Set of notebook IDs that we will never make a process for again." const discarded_workspaces = Set{UUID}() -const Distributed_expr = quote - Base.loaded_modules[Base.PkgId(Base.UUID("8ba89e20-285c-5b6f-9357-94700520ee1b"), "Distributed")] -end - -"These expressions get evaluated whenever a new `Workspace` process is created." -function process_preamble() - quote - Base.exit_on_sigint(false) - include($(project_relative_path(joinpath("src", "runner"), "Loader.jl"))) - ENV["GKSwstype"] = "nul" - ENV["JULIA_REVISE_WORKER_ONLY"] = "1" - end -end - "Create a workspace for the notebook, optionally in the main process." function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false)::Workspace workspace_business = is_offline_renderer ? Status.Business(name=:gobble) : Status.report_business_started!(notebook.status_tree, :workspace) @@ -55,26 +50,22 @@ function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false Status.report_business_planned!(workspace_business, :init_process) is_offline_renderer || (notebook.process_status = ProcessStatus.starting) - - use_distributed = !is_offline_renderer && session.options.evaluation.workspace_use_distributed - - pid = if use_distributed - @debug "Creating workspace process" notebook.path length(notebook.cells) - create_workspaceprocess(; - compiler_options=_merge_notebook_compiler_options(notebook, session.options.compiler), - status=create_status, - ) + + WorkerType = if is_offline_renderer || !session.options.evaluation.workspace_use_distributed + Malt.InProcessWorker + elseif something( + session.options.evaluation.workspace_use_distributed_stdlib, + true + # VERSION < v"1.8.0-0" + ) + Malt.DistributedStdlibWorker else - pid = Distributed.myid() - if !(isdefined(Main, :PlutoRunner) && Main.PlutoRunner isa Module) - # Make PlutoRunner available in Main, right now it's only defined inside this Pluto module. - @eval Main begin - PlutoRunner = $(PlutoRunner) - end - end - pid + Malt.Worker end + @debug "Creating workspace process" notebook.path length(notebook.cells) + worker = create_workspaceprocess(WorkerType; compiler_options=_merge_notebook_compiler_options(notebook, session.options.compiler)) + Status.report_business_finished!(workspace_business, :create_process) init_status = Status.report_business_started!(workspace_business, :init_process) Status.report_business_started!(init_status, Symbol(1)) @@ -82,33 +73,29 @@ function make_workspace((session, notebook)::SN; is_offline_renderer::Bool=false Status.report_business_planned!(init_status, Symbol(3)) Status.report_business_planned!(init_status, Symbol(4)) - Distributed.remotecall_eval(Main, [pid], session.options.evaluation.workspace_custom_startup_expr) + Malt.remote_eval_wait(worker, session.options.evaluation.workspace_custom_startup_expr) - Distributed.remotecall_eval(Main, [pid], quote + Malt.remote_eval_wait(worker, quote PlutoRunner.notebook_id[] = $(notebook.notebook_id) end) - remote_log_channel = Core.eval(Main, quote - $(Distributed).RemoteChannel(() -> eval(quote - channel = Channel{Any}(10) - Main.PlutoRunner.setup_plutologger( - $($(notebook.notebook_id)), - channel, - ) + remote_log_channel = Malt.worker_channel(worker, quote + channel = Channel{Any}(10) + Main.PlutoRunner.setup_plutologger( + $(notebook.notebook_id), channel - end), $pid) + ) + channel end) - run_channel = Core.eval(Main, quote - $(Distributed).RemoteChannel(() -> eval(:(Main.PlutoRunner.run_channel)), $pid) - end) + run_channel = Malt.worker_channel(worker, :(Main.PlutoRunner.run_channel)) - module_name = create_emptyworkspacemodule(pid) + module_name = create_emptyworkspacemodule(worker) - original_LOAD_PATH, original_ACTIVE_PROJECT = Distributed.remotecall_eval(Main, pid, :(Base.LOAD_PATH, Base.ACTIVE_PROJECT[])) + original_LOAD_PATH, original_ACTIVE_PROJECT = Malt.remote_eval_fetch(worker, :(Base.LOAD_PATH, Base.ACTIVE_PROJECT[])) workspace = Workspace(; - pid, + worker, notebook_id=notebook.notebook_id, remote_log_channel, module_name, @@ -155,20 +142,20 @@ function use_nbpkg_environment((session, notebook)::SN, workspace=nothing) workspace.discarded && return workspace.nbpkg_was_active = enabled - if workspace.pid != Distributed.myid() - new_LP = enabled ? ["@", "@stdlib"] : workspace.original_LOAD_PATH - new_AP = enabled ? PkgCompat.env_dir(notebook.nbpkg_ctx) : workspace.original_ACTIVE_PROJECT - - Distributed.remotecall_eval(Main, [workspace.pid], quote - copy!(LOAD_PATH, $(new_LP)) - Base.ACTIVE_PROJECT[] = $(new_AP) - end) - else - # TODO + if workspace.worker isa Malt.InProcessWorker + # Not supported + return end + new_LP = enabled ? ["@", "@stdlib"] : workspace.original_LOAD_PATH + new_AP = enabled ? PkgCompat.env_dir(notebook.nbpkg_ctx) : workspace.original_ACTIVE_PROJECT + + Malt.remote_eval_wait(workspace.worker, quote + copy!(LOAD_PATH, $(new_LP)) + Base.ACTIVE_PROJECT[] = $(new_AP) + end) end -function start_relaying_self_updates((session, notebook)::SN, run_channel::Distributed.RemoteChannel) +function start_relaying_self_updates((session, notebook)::SN, run_channel) while true try next_run_uuid = take!(run_channel) @@ -184,7 +171,7 @@ function start_relaying_self_updates((session, notebook)::SN, run_channel::Distr end end -function start_relaying_logs((session, notebook)::SN, log_channel::Distributed.RemoteChannel) +function start_relaying_logs((session, notebook)::SN, log_channel) update_throttled, flush_throttled = Pluto.throttled(0.1) do Pluto.send_notebook_changes!(Pluto.ClientRequest(; session, notebook)) end @@ -255,7 +242,7 @@ end function bump_workspace_module(session_notebook::SN) workspace = get_workspace(session_notebook) old_name = workspace.module_name - new_name = workspace.module_name = create_emptyworkspacemodule(workspace.pid) + new_name = workspace.module_name = create_emptyworkspacemodule(workspace.worker) old_name, new_name end @@ -263,21 +250,21 @@ end function get_bond_names(session_notebook::SN, cell_id) workspace = get_workspace(session_notebook) - Distributed.remotecall_eval(Main, workspace.pid, quote - PlutoRunner.get_bond_names($cell_id) + Malt.remote_eval_fetch(workspace.worker, quote + PlutoRunner.get_bond_names($cell_id) end) end function possible_bond_values(session_notebook::SN, n::Symbol; get_length::Bool=false) workspace = get_workspace(session_notebook) - Distributed.remotecall_eval(Main, workspace.pid, quote + Malt.remote_eval_fetch(workspace.worker, quote PlutoRunner.possible_bond_values($(QuoteNode(n)); get_length=$(get_length)) end) end -function create_emptyworkspacemodule(pid::Integer)::Symbol - Distributed.remotecall_eval(Main, pid, quote +function create_emptyworkspacemodule(worker::Malt.AbstractWorker)::Symbol + Malt.remote_eval_fetch(worker, quote PlutoRunner.increment_current_module() end) end @@ -285,33 +272,41 @@ end # NOTE: this function only start a worker process using given # compiler options, it does not resolve paths for notebooks # compiler configurations passed to it should be resolved before this -function create_workspaceprocess(; compiler_options=CompilerOptions(), status::Status.Business=Business())::Integer - - Status.report_business_started!(status, Symbol(1)) - Status.report_business_planned!(status, Symbol(2)) - # run on proc 1 in case Pluto is being used inside a notebook process - # Workaround for "only process 1 can add/remove workers" - pid = Distributed.remotecall_eval(Main, 1, quote - $(Distributed_expr).addprocs(1; exeflags=$(_convert_to_flags(compiler_options))) |> first - end) - - Status.report_business_finished!(status, Symbol(1)) - Status.report_business_started!(status, Symbol(2)) - - Distributed.remotecall_eval(Main, [pid], process_preamble()) +function create_workspaceprocess(WorkerType; compiler_options=CompilerOptions(), status::Status.Business=Status.Business())::Malt.AbstractWorker - # so that we NEVER break the workspace with an interrupt 🤕 - @async Distributed.remotecall_eval(Main, [pid], quote - while true - try - wait() - catch end + if WorkerType === Malt.InProcessWorker + worker = WorkerType() + + if !(isdefined(Main, :PlutoRunner) && Main.PlutoRunner isa Module) + # we make PlutoRunner available in Main, right now it's only defined inside this Pluto module. + Malt.remote_eval_wait(Main, worker, quote + PlutoRunner = $(PlutoRunner) + end) end - end) + else + + Status.report_business_started!(status, Symbol(1)) + Status.report_business_planned!(status, Symbol(2)) + + worker = WorkerType(; exeflags=_convert_to_flags(compiler_options)) + + Status.report_business_finished!(status, Symbol(1)) + Status.report_business_started!(status, Symbol(2)) + + Malt.remote_eval_wait(worker, process_preamble()) + + # so that we NEVER break the workspace with an interrupt 🤕 + Malt.remote_eval(worker, quote + while true + try + wait() + catch end + end + end) + end Status.report_business_finished!(status) - - pid + worker end """ @@ -348,28 +343,16 @@ function unmake_workspace(session_notebook::SN; async::Bool=false, verbose::Bool workspace.discarded = true allow_restart || push!(discarded_workspaces, notebook.notebook_id) - if workspace.pid != Distributed.myid() - filter!(p -> fetch(p.second).pid != workspace.pid, active_workspaces) - t = @async begin - interrupt_workspace(workspace; verbose=false) - # run on proc 1 in case Pluto is being used inside a notebook process - # Workaround for "only process 1 can add/remove workers" - Distributed.remotecall_eval(Main, 1, quote - $(Distributed_expr).rmprocs($(workspace.pid)) - end) - end - async || wait(t) - else - if !isready(workspace.dowork_token) - @error "Cannot unmake a workspace running inside the same process: the notebook is still running." - elseif verbose - @warn "Cannot unmake a workspace running inside the same process: the notebook might still be running. If you are sure that your code is not running the notebook async, then you can use the `verbose=false` keyword argument to disable this message." - end + filter!(p -> fetch(p.second).worker != workspace.worker, active_workspaces) + t = @async begin + interrupt_workspace(workspace; verbose=false) + Malt.stop(workspace.worker) end + async || wait(t) nothing end -function distributed_exception_result(ex::Base.IOError, workspace::Workspace) +function workspace_exception_result(ex::Base.IOError, workspace::Workspace) ( output_formatted=PlutoRunner.format_output(CapturedException(ex, [])), errored=true, @@ -381,12 +364,11 @@ function distributed_exception_result(ex::Base.IOError, workspace::Workspace) ) end -function distributed_exception_result(exs::CompositeException, workspace::Workspace) +function workspace_exception_result(exs::CompositeException, workspace::Workspace) ex = first(exs.exceptions) - if ex isa Distributed.RemoteException && - ex.pid == workspace.pid && - ex.captured.ex isa InterruptException + if ex isa InterruptException || (ex isa Malt.RemoteException && occursin("InterruptException", ex.message)) + @info "Found an interrupt!" ex ( output_formatted=PlutoRunner.format_output(CapturedException(InterruptException(), [])), errored=true, @@ -396,7 +378,7 @@ function distributed_exception_result(exs::CompositeException, workspace::Worksp published_objects=Dict{String,Any}(), has_pluto_hook_features=false, ) - elseif ex isa Distributed.ProcessExitedException + elseif ex isa Malt.TerminatedWorkerException ( output_formatted=PlutoRunner.format_output(CapturedException(exs, [])), errored=true, @@ -440,14 +422,10 @@ function eval_format_fetch_in_workspace( )::PlutoRunner.FormattedCellResult workspace = get_workspace(session_notebook) - - is_on_this_process = workspace.pid == Distributed.myid() + is_on_this_process = workspace.worker isa Malt.InProcessWorker # if multiple notebooks run on the same process, then we need to `cd` between the different notebook paths if session_notebook isa Tuple - if is_on_this_process - cd_workspace(workspace, session_notebook[2].path) - end use_nbpkg_environment(session_notebook, workspace) end @@ -456,8 +434,7 @@ function eval_format_fetch_in_workspace( # A try block (on this process) to catch an InterruptException take!(workspace.dowork_token) early_result = try - # Use [pid] instead of pid to prevent fetching output - Distributed.remotecall_eval(Main, [workspace.pid], quote + Malt.remote_eval_wait(workspace.worker, quote PlutoRunner.run_expression( getfield(Main, $(QuoteNode(workspace.module_name))), $(QuoteNode(expr)), @@ -474,7 +451,7 @@ function eval_format_fetch_in_workspace( catch e # Don't use a `finally` because the token needs to be back asap for the interrupting code to pick it up. put!(workspace.dowork_token) - distributed_exception_result(e, workspace) + workspace_exception_result(e, workspace) end if early_result === nothing @@ -488,7 +465,7 @@ end function eval_in_workspace(session_notebook::Union{SN,Workspace}, expr) workspace = get_workspace(session_notebook) - Distributed.remotecall_eval(Main, [workspace.pid], quote + Malt.remote_eval_wait(workspace.worker, quote Core.eval($(workspace.module_name), $(QuoteNode(expr))) end) nothing @@ -509,7 +486,7 @@ function format_fetch_in_workspace( # we format the cell output on the worker, and fetch the formatted output. withtoken(workspace.dowork_token) do try - Distributed.remotecall_eval(Main, workspace.pid, quote + Malt.remote_eval_fetch(workspace.worker, quote PlutoRunner.formatted_result_of( $(workspace.notebook_id), $cell_id, @@ -521,7 +498,7 @@ function format_fetch_in_workspace( ) end) catch e - distributed_exception_result(CompositeException([e]), workspace) + workspace_exception_result(CompositeException([e]), workspace) end end end @@ -529,7 +506,7 @@ end function collect_soft_definitions(session_notebook::SN, modules::Set{Expr}) workspace = get_workspace(session_notebook) - Distributed.remotecall_eval(Main, workspace.pid, quote + Malt.remote_eval_fetch(workspace.worker, quote PlutoRunner.collect_soft_definitions($(workspace.module_name), $modules) end) end @@ -538,7 +515,7 @@ function macroexpand_in_workspace(session_notebook::SN, macrocall, cell_id, modu workspace = get_workspace(session_notebook) module_name = module_name === Symbol("") ? workspace.module_name : module_name - Distributed.remotecall_eval(Main, workspace.pid, quote + Malt.remote_eval_fetch(workspace.worker, quote try (true, PlutoRunner.try_macroexpand($(module_name), $(workspace.notebook_id), $(cell_id), $(macrocall |> QuoteNode); capture_stdout=$(capture_stdout))) catch error @@ -558,7 +535,7 @@ end function eval_fetch_in_workspace(session_notebook::Union{SN,Workspace}, expr) workspace = get_workspace(session_notebook) - Distributed.remotecall_eval(Main, workspace.pid, quote + Malt.remote_eval_fetch(workspace.worker, quote Core.eval($(workspace.module_name), $(QuoteNode(expr))) end) end @@ -566,14 +543,14 @@ end function do_reimports(session_notebook::Union{SN,Workspace}, module_imports_to_move::Set{Expr}) workspace = get_workspace(session_notebook) - Distributed.remotecall_eval(Main, [workspace.pid], quote + Malt.remote_eval_wait(workspace.worker, quote PlutoRunner.do_reimports($(workspace.module_name), $module_imports_to_move) end) end """ -Move variables to a new module. Variables to be 'deleted' will not be moved to -the new module, making them unavailable. +Move variables to a new module. A given set of variables to be 'deleted' will +not be moved to the new module, making them unavailable. """ function move_vars( session_notebook::Union{SN,Workspace}, @@ -590,7 +567,7 @@ function move_vars( workspace = get_workspace(session_notebook) new_workspace_name = something(new_workspace_name, workspace.module_name) - Distributed.remotecall_eval(Main, [workspace.pid], quote + Malt.remote_eval_wait(workspace.worker, quote PlutoRunner.move_vars( $(QuoteNode(old_workspace_name)), $(QuoteNode(new_workspace_name)), @@ -679,11 +656,6 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true return false end - if workspace.pid == Distributed.myid() - verbose && @warn """Cells in this workspace can't be stopped, because it is not running in a separate workspace. Use `ENV["PLUTO_WORKSPACE_USE_DISTRIBUTED"]` to control whether future workspaces are generated in a separate process.""" - return false - end - if isready(workspace.dowork_token) verbose && @info "Tried to stop idle workspace - ignoring." return true @@ -694,8 +666,8 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true # TODO: this will also kill "pending" evaluations, and any evaluations started within 100ms of the kill. A global "evaluation count" would fix this. # TODO: listen for the final words of the remote process on stdout/stderr: "Force throwing a SIGINT" try - verbose && @info "Sending interrupt to process $(workspace.pid)" - Distributed.interrupt(workspace.pid) + verbose && @info "Sending interrupt to process $(workspace.worker)" + Malt.interrupt(workspace.worker) if poll(() -> isready(workspace.dowork_token), 5.0, 5/100) verbose && println("Cell interrupted!") @@ -706,7 +678,7 @@ function interrupt_workspace(session_notebook::Union{SN,Workspace}; verbose=true while !isready(workspace.dowork_token) for _ in 1:5 verbose && print(" 🔥 ") - Distributed.interrupt(workspace.pid) + Malt.interrupt(workspace.worker) sleep(0.18) if isready(workspace.dowork_token) break diff --git a/src/runner/PlutoRunner.jl b/src/runner/PlutoRunner.jl index 030de8cd48..dce8300e6e 100644 --- a/src/runner/PlutoRunner.jl +++ b/src/runner/PlutoRunner.jl @@ -1,17 +1,21 @@ # Will be evaluated _inside_ the workspace process. -# Pluto does most things on process 1 (the server), and it uses little workspace processes to evaluate notebook code in. -# These baby processes don't import Pluto, they only import this module. Functions from this module are called by WorkspaceManager.jl via Distributed +# Pluto does most things on the server, but it uses worker processes to evaluate notebook code in. +# These processes don't import Pluto, they only import this module. +# Functions from this module are called by WorkspaceManager.jl via Malt. -# So when reading this file, pretend that you are living in process 2, and you are communicating with Pluto's server, who lives in process 1. +# When reading this file, pretend that you are living in a worker process, +# and you are communicating with Pluto's server, who lives in the main process. # The package environment that this file is loaded with is the NotebookProcessProject.toml file in this directory. # SOME EXTRA NOTES # 1. The entire PlutoRunner should be a single file. -# 2. We restrict the communication between this PlutoRunner and the Pluto server to only use *Base Julia types*, like `String`, `Dict`, `NamedTuple`, etc. +# 2. Restrict the communication between this PlutoRunner and the Pluto server to only use *Base Julia types*, like `String`, `Dict`, `NamedTuple`, etc. -# These restriction are there to allow flexibility in the way that this file is loaded on a runner process, which is something that we might want to change in the future, like when we make the transition to our own Distributed. +# These restriction are there to allow flexibility in the way that this file is +# loaded on a runner process, which is something that we might want to change +# in the future. module PlutoRunner @@ -21,7 +25,6 @@ import InteractiveUtils using Markdown import Markdown: html, htmlinline, LaTeX, withtag, htmlesc -import Distributed import Base64 import FuzzyCompletions: Completion, BslashCompletion, ModuleCompletion, PropertyCompletion, FieldCompletion, PathCompletion, DictCompletion, completions, completion_text, score import Base: show, istextmime @@ -32,7 +35,7 @@ import REPL export @bind -# This is not a struct to make it easier to pass these objects between distributed processes. +# This is not a struct to make it easier to pass these objects between processes. const MimedOutput = Tuple{Union{String,Vector{UInt8},Dict{Symbol,Any}},MIME} const ObjectID = typeof(objectid("hello computer")) @@ -878,7 +881,7 @@ const table_column_display_limit_increase = 30 const tree_display_extra_items = Dict{UUID,Dict{ObjectDimPair,Int64}}() -# This is not a struct to make it easier to pass these objects between distributed processes. +# This is not a struct to make it easier to pass these objects between processes. const FormattedCellResult = NamedTuple{(:output_formatted, :errored, :interrupted, :process_exited, :runtime, :published_objects, :has_pluto_hook_features),Tuple{PlutoRunner.MimedOutput,Bool,Bool,Bool,Union{UInt64,Nothing},Dict{String,Any},Bool}} function formatted_result_of( @@ -2192,15 +2195,15 @@ function possible_bond_values(s::Symbol; get_length::Bool=false) try length(possible_values) catch - length(make_distributed_serializable(possible_values)) + length(make_serializable(possible_values)) end : - make_distributed_serializable(possible_values) + make_serializable(possible_values) end end -make_distributed_serializable(x::Any) = x -make_distributed_serializable(x::Union{AbstractVector,AbstractSet,Base.Generator}) = collect(x) -make_distributed_serializable(x::Union{Vector,Set,OrdinalRange}) = x +make_serializable(x::Any) = x +make_serializable(x::Union{AbstractVector,AbstractSet,Base.Generator}) = collect(x) +make_serializable(x::Union{Vector,Set,OrdinalRange}) = x """ diff --git a/src/webserver/REPLTools.jl b/src/webserver/REPLTools.jl index 74f432b19d..7da202f6ca 100644 --- a/src/webserver/REPLTools.jl +++ b/src/webserver/REPLTools.jl @@ -1,5 +1,5 @@ import FuzzyCompletions: complete_path, completion_text, score -import Distributed +import Malt import .PkgCompat: package_completions using Markdown import REPL @@ -83,10 +83,13 @@ responses[:complete] = function response_complete(🙋::ClientRequest) if will_run_code(🙋.notebook) && workspace isa WorkspaceManager.Workspace && isready(workspace.dowork_token) # we don't use eval_format_fetch_in_workspace because we don't want the output to be string-formatted. # This works in this particular case, because the return object, a `Completion`, exists in this scope too. - Distributed.remotecall_eval(Main, workspace.pid, :(PlutoRunner.completion_fetcher( - $query, $pos, - getfield(Main, $(QuoteNode(workspace.module_name))), - ))) + Malt.remote_eval_fetch(workspace.worker, quote + PlutoRunner.completion_fetcher( + $query, + $pos, + getfield(Main, $(QuoteNode(workspace.module_name))), + ) + end) else # We can at least autocomplete general julia things: PlutoRunner.completion_fetcher(query, pos, Main) @@ -132,10 +135,12 @@ responses[:docs] = function response_docs(🙋::ClientRequest) (repr(MIME("text/html"), doc_md), :👍) else if will_run_code(🙋.notebook) && workspace isa WorkspaceManager.Workspace && isready(workspace.dowork_token) - Distributed.remotecall_eval(Main, workspace.pid, :(PlutoRunner.doc_fetcher( - $query, - getfield(Main, $(QuoteNode(workspace.module_name))), - ))) + Malt.remote_eval_fetch(workspace.worker, quote + PlutoRunner.doc_fetcher( + $query, + getfield(Main, $(QuoteNode(workspace.module_name))), + ) + end) else (nothing, :⌛) end diff --git a/test/Bonds.jl b/test/Bonds.jl index 3fe9e559c4..bf1b028e09 100644 --- a/test/Bonds.jl +++ b/test/Bonds.jl @@ -1,7 +1,7 @@ using Test import Pluto import Pluto: update_run!, update_save_run!, WorkspaceManager, ClientSession, ServerSession, Notebook, Cell -import Distributed +import Malt @testset "Bonds" begin @@ -358,9 +358,8 @@ import Distributed # test that the notebook file is runnable: - test_proc = Distributed.addprocs(1)[1] - - Distributed.remotecall_eval(Main, test_proc, quote + test_proc = Malt.Worker() + Malt.remote_eval_wait(test_proc, quote import Pkg try Pkg.UPDATED_REGISTRY_THIS_SESSION[] = true @@ -368,11 +367,11 @@ import Distributed Pkg.activate(mktempdir()) Pkg.add("AbstractPlutoDingetjes") end) - @test Distributed.remotecall_eval(Main, test_proc, quote + @test Malt.remote_eval_fetch(test_proc, quote include($(notebook.path)) true end) - Distributed.rmprocs(test_proc) + Malt.stop(test_proc) end @testset "Dependent Bound Variables" begin diff --git a/test/Dynamic.jl b/test/Dynamic.jl index 56496f0d2d..1d8675c740 100644 --- a/test/Dynamic.jl +++ b/test/Dynamic.jl @@ -193,7 +193,7 @@ end @testset "PlutoRunner API" begin 🍭 = ServerSession() - 🍭.options.evaluation.workspace_use_distributed = true + # 🍭.options.evaluation.workspace_use_distributed = true cid = uuid1() diff --git a/test/React.jl b/test/React.jl index 0921a4ce0d..bb249032c9 100644 --- a/test/React.jl +++ b/test/React.jl @@ -1,14 +1,15 @@ using Test import Pluto: Configuration, Notebook, ServerSession, ClientSession, update_run!, Cell, WorkspaceManager import Pluto.Configuration: Options, EvaluationOptions -import Distributed @testset "Reactivity" begin 🍭 = ServerSession() 🍭.options.evaluation.workspace_use_distributed = false - @testset "Basic $(parallel ? "distributed" : "single-process")" for parallel in [false, true] - 🍭.options.evaluation.workspace_use_distributed = parallel + @testset "Basic $workertype" for workertype in [:Malt, :Distributed, :InProcess] + 🍭.options.evaluation.workspace_use_distributed = workertype !== :InProcess + 🍭.options.evaluation.workspace_use_distributed_stdlib = workertype === :Distributed + notebook = Notebook([ Cell("x = 1"), @@ -22,7 +23,13 @@ import Distributed end"""), Cell("g(6) + g(6,6)"), - Cell("import Distributed"), + Cell(""" + begin + pushfirst!(LOAD_PATH, "@stdlib") + import Distributed + popfirst!(LOAD_PATH) + end + """), Cell("Distributed.myid()"), ]) @@ -70,10 +77,14 @@ import Distributed @test notebook.cells[6].output.body == "3" update_run!(🍭, notebook, notebook.cells[7:8]) - @test if parallel - notebook.cells[8].output.body != string(Distributed.myid()) + if workertype === :Distributed + @test notebook.cells[8].output.body ∉ ("1", string(Distributed.myid())) + elseif workertype === :Malt + @test notebook.cells[8].output.body == "1" + elseif workertype === :InProcess + @test notebook.cells[8].output.body == string(Distributed.myid()) else - notebook.cells[8].output.body == string(Distributed.myid()) + error() end WorkspaceManager.unmake_workspace((🍭, notebook); verbose=false) diff --git a/test/ReloadFromFile.jl b/test/ReloadFromFile.jl index 7cc6b9aac4..ec602fee3c 100644 --- a/test/ReloadFromFile.jl +++ b/test/ReloadFromFile.jl @@ -1,7 +1,6 @@ using Test import Pluto: Configuration, Notebook, ServerSession, ClientSession, update_run!, Cell, WorkspaceManager, SessionActions, save_notebook import Pluto.Configuration: Options, EvaluationOptions -import Distributed using Pluto.WorkspaceManager: poll import Pkg diff --git a/test/WorkspaceManager.jl b/test/WorkspaceManager.jl index 92975be90a..fc7138e4a9 100644 --- a/test/WorkspaceManager.jl +++ b/test/WorkspaceManager.jl @@ -2,7 +2,7 @@ using Test using Pluto.Configuration: CompilerOptions using Pluto.WorkspaceManager: _merge_notebook_compiler_options import Pluto: update_save_run!, update_run!, WorkspaceManager, ClientSession, ServerSession, Notebook, Cell, project_relative_path -import Distributed +import Malt @testset "Workspace manager" begin # basic functionality is already tested by the reactivity tests @@ -54,7 +54,8 @@ import Distributed Sys.iswindows() || @testset "Pluto inside Pluto" begin 🍭 = ServerSession() - 🍭.options.evaluation.workspace_use_distributed = true + 🍭.options.evaluation.capture_stdout = false + 🍭.options.evaluation.workspace_use_distributed_stdlib = false notebook = Notebook([ Cell("""begin @@ -65,7 +66,10 @@ import Distributed import Pluto end"""), Cell(""" - s = Pluto.ServerSession() + begin + s = Pluto.ServerSession() + s.options.evaluation.workspace_use_distributed_stdlib = false + end """), Cell(""" nb = Pluto.SessionActions.open(s, Pluto.project_relative_path("sample", "Tower of Hanoi.jl"); run_async=false, as_sample=true) @@ -86,17 +90,10 @@ import Distributed update_run!(🍭, notebook, notebook.cells[5]) @test notebook.cells[5] |> noerror - - desired_nprocs = Distributed.nprocs() - 1 setcode!(notebook.cells[5], "Pluto.SessionActions.shutdown(s, nb)") update_run!(🍭, notebook, notebook.cells[5]) @test noerror(notebook.cells[5]) - while Distributed.nprocs() != desired_nprocs - sleep(.1) - end - sleep(.1) - WorkspaceManager.unmake_workspace((🍭, notebook)) end end diff --git a/test/cell_disabling.jl b/test/cell_disabling.jl index f5f9575dd3..3c8e8f498b 100644 --- a/test/cell_disabling.jl +++ b/test/cell_disabling.jl @@ -1,6 +1,6 @@ using Test using Pluto -using Pluto: update_run!, ServerSession, ClientSession, Cell, Notebook, set_disabled, is_disabled +using Pluto: update_run!, ServerSession, ClientSession, Cell, Notebook, set_disabled, is_disabled, WorkspaceManager @@ -236,6 +236,7 @@ using Pluto: update_run!, ServerSession, ClientSession, Cell, Notebook, set_disa update_run!(🍭, notebook, c([12])) @test c(14).output.body == "3" + WorkspaceManager.unmake_workspace((🍭, notebook)) end @@ -342,4 +343,5 @@ end update_run!(🍭, notebook, notebook.cells) @test get_disabled_cells(notebook) == [] + WorkspaceManager.unmake_workspace((🍭, notebook)) end diff --git a/test/helpers.jl b/test/helpers.jl index c61217a08b..3b745c25fc 100644 --- a/test/helpers.jl +++ b/test/helpers.jl @@ -16,8 +16,9 @@ import Pluto.ExpressionExplorer: SymbolsState, compute_symbolreferences, Functio using Sockets using Test using HTTP -import Distributed import Pkg +import Malt +import Malt.Distributed function Base.show(io::IO, s::SymbolsState) print(io, "SymbolsState([") @@ -243,8 +244,8 @@ has_embedded_pkgfiles(nb::Pluto.Notebook) = Log an error message if there are any running processes created by Distrubted, that were not shut down. """ function verify_no_running_processes() - if length(Distributed.procs()) != 1 - @error "Not all notebook processes were closed during tests!" Distributed.procs() + if length(Distributed.procs()) != 1 || !isempty(Malt.__iNtErNaL_get_running_procs()) + @error "Not all notebook processes were closed during tests!" Distributed.procs() Malt.__iNtErNaL_get_running_procs() end end diff --git a/test/packages/Basic.jl b/test/packages/Basic.jl index bc1b204a0b..a255865c77 100644 --- a/test/packages/Basic.jl +++ b/test/packages/Basic.jl @@ -5,7 +5,7 @@ using Pluto.Configuration: CompilerOptions import Pluto: update_save_run!, update_run!, WorkspaceManager, ClientSession, ServerSession, Notebook, Cell, project_relative_path, SessionActions, load_notebook import Pluto.PkgUtils import Pluto.PkgCompat -import Distributed +import Malt @testset "Built-in Pkg" begin @@ -13,8 +13,9 @@ import Distributed # We have our own registry for these test! Take a look at https://github.com/JuliaPluto/PlutoPkgTestRegistry#readme for more info about the test packages and their dependencies. Pkg.Registry.add(pluto_test_registry_spec) - @testset "Basic" begin + @testset "Basic $(use_distributed_stdlib ? "Distributed" : "Malt")" for use_distributed_stdlib in (false, true) 🍭 = ServerSession() + 🍭.options.evaluation.workspace_use_distributed_stdlib = use_distributed_stdlib # See https://github.com/JuliaPluto/PlutoPkgTestRegistry @@ -395,8 +396,7 @@ import Distributed end @testset "DrWatson cell" begin - 🍭 = ServerSession() - 🍭.options.evaluation.workspace_use_distributed = false + 🍭 = ServerSession() notebook = Notebook([ Cell("using Plots"), @@ -449,11 +449,11 @@ import Distributed @static if VERSION < v"1.10.0-0" # see https://github.com/fonsp/Pluto.jl/pull/2626#issuecomment-1671244510 @testset "File format -- Forwards compat" begin # Using Distributed, we will create a new Julia process in which we install Pluto 0.14.7 (before PlutoPkg). We run the new notebook file on the old Pluto. - p = Distributed.addprocs(1) |> first + test_worker = Malt.Worker() @test post_pkg_notebook isa String - Distributed.remotecall_eval(Main, p, quote + Malt.remote_eval_wait(Main, test_worker, quote path = tempname() write(path, $(post_pkg_notebook)) import Pkg @@ -462,35 +462,38 @@ import Distributed Pkg.UPDATED_REGISTRY_THIS_SESSION[] = true end - Pkg.activate(mktempdir()) + Pkg.activate(;temp=true) Pkg.add(Pkg.PackageSpec(;name="Pluto",version=v"0.14.7")) + # Distributed is required for old Pluto to work! + Pkg.add("Distributed") + import Pluto + @info Pluto.PLUTO_VERSION @assert Pluto.PLUTO_VERSION == v"0.14.7" + end) + @test Malt.remote_eval_fetch(Main, test_worker, quote s = Pluto.ServerSession() - s.options.evaluation.workspace_use_distributed = false - nb = Pluto.SessionActions.open(s, path; run_async=false) - - nothing + nb.cells[2].errored == false end) # Cells that use Example will error because the package is not installed. - # @test Distributed.remotecall_eval(Main, p, quote + # @test Malt.remote_eval_fetch(Main, test_worker, quote # nb.cells[1].errored == false # end) - @test Distributed.remotecall_eval(Main, p, quote + @test Malt.remote_eval_fetch(Main, test_worker, quote nb.cells[2].errored == false end) - # @test Distributed.remotecall_eval(Main, p, quote + # @test Malt.remote_eval_fetch(Main, test_worker, quote # nb.cells[3].errored == false # end) - # @test Distributed.remotecall_eval(Main, p, quote + # @test Malt.remote_eval_fetch(Main, test_worker, quote # nb.cells[3].output.body == "25" # end) - Distributed.rmprocs([p]) + Malt.stop(test_worker) end end @@ -768,4 +771,3 @@ end # LibGit2.checkout!(repo, "aef26d37e1d0e8f8387c011ccb7c4a38398a18f6") - diff --git a/test/runtests.jl b/test/runtests.jl index 3b3ce21a51..88901d4ae3 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -10,7 +10,7 @@ if get(ENV, "PLUTO_TEST_ONLY_COMPILETIMES", nothing) == "true" end @timeit_include("Events.jl") verify_no_running_processes() -@timeit_include("WorkspaceManager.jl") +@timeit_include("Configuration.jl") verify_no_running_processes() @timeit_include("packages/Basic.jl") verify_no_running_processes() @@ -30,7 +30,7 @@ verify_no_running_processes() verify_no_running_processes() @timeit_include("Notebook.jl") verify_no_running_processes() -@timeit_include("Configuration.jl") +@timeit_include("WorkspaceManager.jl") verify_no_running_processes() # tests that don't start new processes: