Skip to content

Commit

Permalink
Merge pull request #13 from ProducerMatt/new-query-system
Browse files Browse the repository at this point in the history
New query + respond system
  • Loading branch information
ProducerMatt authored Apr 30, 2024
2 parents 44c4379 + 64267a8 commit 7bda109
Show file tree
Hide file tree
Showing 11 changed files with 224 additions and 148 deletions.
135 changes: 80 additions & 55 deletions lib/plugin.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,15 @@ defmodule Plugin do
require InteractionForm
@first_response_timeout 500

@callback process_msg(SiteConfig.t(), Msg.t()) :: nil | Response.t()

# TODO: replace with predicate? and handle prefix cleaning seperately
@doc "Given a config and message, indicate if we should respond, and if so what is the relevant part of the message?"
@callback at_module?(SiteConfig.t(), Msg.t()) :: boolean() | {:cleaned, text :: String.t()}
@doc """
Should this module respond to the given config and message? If so, we will execute this plugin's "respond" callback with the returned value.
These methods on all plugins are run syncronously with every incoming message in the caller thread.
"""
@callback query(SiteConfig.t(), Msg.t()) :: nil | {:respond, arg :: any()}
@doc """
Decide on a response for the given arg.
"""
@callback respond(arg :: any()) :: nil | Response.t()

@typedoc """
Describe uses for a plugin in a input-output manner, no prefix included.
Expand All @@ -45,24 +49,20 @@ defmodule Plugin do
@callback usage() :: usage_tuples()
@callback description() :: TxtBlock.t()

defguard is_bot_invoked(cfg, msg)
when cfg.bot_is_loud or msg.at_bot? or msg.dm? or msg.prefix != false

defmacro __using__(_opts \\ []) do
quote do
@behaviour unquote(__MODULE__)

@impl Plugin
def at_module?(cfg, msg) do
# Should we process the message?
if msg.at_bot? or msg.dm? or msg.prefix do
{:cleaned, msg.body}
else
false
end
end

defoverridable at_module?: 2
end
end

@spec default_predicate(map(), map(), success_tuple) :: success_tuple
when success_tuple: {:respond, arg :: any()}
def default_predicate(cfg, msg, success_tuple) when is_bot_invoked(cfg, msg), do: success_tuple
def default_predicate(cfg, msg, _) when not is_bot_invoked(cfg, msg), do: nil

@doc "returns loaded modules using the Plugin behavior."
@spec! ls() :: MapSet.t(module())
def ls() do
Expand All @@ -89,23 +89,15 @@ defmodule Plugin do
MapSet.intersection(enabled, ls())
end

def default_plugin_mfa(plug, [cfg, msg]) do
{plug, :process_msg, [cfg, msg]}
end

@type! job_result ::
{:job_error, :timeout}
| {:job_error, tuple()}
| {:job_ok, nil}
| {:job_ok, %Response{}}
@type! plugin_job_result :: {atom(), job_result()}
| {:job_ok, any()}
@type! plugin_job_result :: {module(), job_result()}

@doc "Attempt some task, safely catch errors, and format the error report for the originating service"
@spec! get_response(S.module_function_args() | atom(), SiteConfig.t(), S.Msg.t()) ::
@spec! get_response(S.module_function_args(), SiteConfig.t(), S.Msg.t()) ::
job_result()
def get_response(plugin, cfg, msg) when is_atom(plugin),
do: get_response(default_plugin_mfa(plugin, [cfg, msg]), cfg, msg)

def get_response({m, f, a}, cfg, msg) do
# if an error occurs in process_msg, catch it and return as data
try do
Expand Down Expand Up @@ -143,45 +135,75 @@ defmodule Plugin do
end
end

@spec! query_plugins(list(S.module_function_args() | module()), SiteConfig.t(), S.Msg.t()) ::
@spec! query_plugins(
nonempty_list(module() | S.module_function_args())
| MapSet.t(module() | S.module_function_args()),
SiteConfig.t(),
S.Msg.t()
) ::
nil | {response :: Response.t(), interaction_id :: S.interaction_id()}
def query_plugins(call_list, cfg, msg) do
tasks =
Enum.map(call_list, fn
mfa = {this_plug, _func, _args} ->
{this_plug,
Task.Supervisor.async_nolink(
S.quick_task_via(),
__MODULE__,
:get_response,
[mfa, cfg, msg]
)}
{this_plug, func, args} ->
{
this_plug,
Task.Supervisor.async_nolink(
S.quick_task_via(),
__MODULE__,
:get_response,
[{this_plug, func, args}, cfg, msg]
)
}

this_plug when is_atom(this_plug) ->
{this_plug,
Task.Supervisor.async_nolink(
S.quick_task_via(),
__MODULE__,
:get_response,
[this_plug, cfg, msg]
)}
{
this_plug,
case get_response({this_plug, :query, [cfg, msg]}, cfg, msg) do
{:job_ok, {:respond, arg}} ->
Task.Supervisor.async_nolink(
S.quick_task_via(),
__MODULE__,
:get_response,
[{this_plug, :respond, [arg]}, cfg, msg]
)

other ->
other
end
}
end)

# make a map of task references to the plugins they were called for
task_ids =
Enum.reduce(tasks, %{}, fn
{plug, %{ref: ref}}, acc ->
Map.put(acc, ref, plug)
{
task_id_map,
launched_tasks,
declined_queries
} =
Enum.reduce(tasks, {%{}, [], []}, fn
{plug, t = %Task{ref: ref}}, {task_map, launched_tasks, declined_queries} ->
{
Map.put(task_map, ref, plug),
[t | launched_tasks],
declined_queries
}

{plug, res}, {task_map, launched_tasks, declined_queries} ->
{
task_map,
launched_tasks,
[{plug, res} | declined_queries]
}
end)

# to yield with Task.yield_many(), the plugins and tasks must part
task_results =
Enum.map(tasks, &Kernel.elem(&1, 1))
launched_tasks
|> Task.yield_many(timeout: @first_response_timeout)
|> Enum.map(fn {task, result} ->
# they are reunited :-)
{
task_ids[task.ref],
Map.fetch!(task_id_map, task.ref),
result || Task.shutdown(task, :brutal_kill)
}
end)
Expand Down Expand Up @@ -218,12 +240,14 @@ defmodule Plugin do
end
end)

%{r: chosen_response, tb: traceback} = resolve_responses(task_results)
%{r: chosen_response, tb: traceback} = resolve_responses(task_results ++ declined_queries)

case chosen_response do
# no plugins want to respond
nil ->
nil

# we have a response to immediately provide
%Response{callback: nil} ->
{:ok, iid} =
S.InteractionForm.new(
Expand All @@ -238,9 +262,11 @@ defmodule Plugin do

{chosen_response, iid}

# we can't get a response until we run this callback
# TODO: let callback decide to not respond, and fall back to the next highest priority response
%Response{callback: {mod, fun, args}} ->
followup =
apply(mod, fun, [cfg | args])
apply(mod, fun, args)

new_tb = [
traceback,
Expand Down Expand Up @@ -270,8 +296,8 @@ defmodule Plugin do
nil | {response :: Response.t(), interaction_id :: S.interaction_id()}
def get_top_response(cfg, msg) do
case S.Interact.channel_locked?(msg.channel_id) do
{{m, f, a}, _plugin, _iid} ->
{response, iid} = query_plugins([{m, f, [cfg, msg | a]}], cfg, msg)
{{m, f, args_without_msg}, _plugin, _iid} ->
{response, iid} = query_plugins([{m, f, [msg | args_without_msg]}], cfg, msg)

explained_response =
Map.update!(response, :why, fn tb ->
Expand All @@ -291,7 +317,6 @@ defmodule Plugin do

false ->
__MODULE__.ls(SiteConfig.fetch!(cfg, :plugs))
|> MapSet.to_list()
|> query_plugins(cfg, msg)
end
end
Expand Down
23 changes: 11 additions & 12 deletions lib/plugin/sentience.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@ defmodule Plugin.Sentience do
end

@impl Plugin
@spec! process_msg(SiteConfig.t(), S.Msg.t()) :: nil | S.Response.t()
def process_msg(cfg, msg) do
if at_module?(cfg, msg) do
S.Response.new(
confidence: 1,
text: S.confused_response(),
origin_msg_id: msg.id,
why: ["I didn't have any better ideas."]
)
else
nil
end
def query(cfg, msg), do: Plugin.default_predicate(cfg, msg, {:respond, msg.id})

@impl Plugin
@spec! respond(msg_id :: S.msg_id()) :: nil | S.Response.t()
def respond(msg_id) do
S.Response.new(
confidence: 1,
text: S.confused_response(),
origin_msg_id: msg_id,
why: ["I didn't have any better ideas."]
)
end
end
34 changes: 18 additions & 16 deletions lib/plugin/test.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,19 +25,21 @@ defmodule Plugin.Test do
"A set of functions for testing Stampede functionality."
end

@spec! process_msg(SiteConfig.t(), S.Msg.t()) :: nil | S.Response.t()
@impl Plugin
def process_msg(cfg, msg) do
case at_module?(cfg, msg) do
{:cleaned, "ping"} ->
def query(cfg, msg), do: Plugin.default_predicate(cfg, msg, {:respond, msg})

@impl Plugin
def respond(msg) do
case msg.body do
"ping" ->
S.Response.new(
confidence: 10,
text: "pong!",
origin_msg_id: msg.id,
why: ["They pinged so I ponged!"]
)

{:cleaned, "callback"} ->
"callback" ->
num = :rand.uniform(10)

S.Response.new(
Expand All @@ -49,7 +51,7 @@ defmodule Plugin.Test do
)

# test channel locks
{:cleaned, "a"} ->
"a" ->
S.Response.new(
confidence: 10,
text: "locked in on #{msg.author_id} awaiting b",
Expand All @@ -59,32 +61,32 @@ defmodule Plugin.Test do
channel_lock: {:lock, msg.channel_id, {__MODULE__, :lock_callback, [:b]}}
)

{:cleaned, "timeout"} ->
"timeout" ->
:timer.seconds(11) |> Process.sleep()
raise "This job should be killed before here"

{:cleaned, "raise"} ->
"raise" ->
raise SillyError

{:cleaned, "throw"} ->
"throw" ->
throw(SillyThrow)

_ ->
nil
end
end

def callback_example(_cfg, num, msg_id) when is_number(num) do
def callback_example(num, msg_id) when is_number(num) do
S.Response.new(
confidence: 10,
origin_msg_id: msg_id,
text: "Called back with number #{num}"
)
end

def lock_callback(cfg, msg, :b) do
case at_module?(cfg, msg) do
{:cleaned, "b"} ->
def lock_callback(msg, :b) do
case msg.body do
"b" ->
S.Response.new(
confidence: 10,
text: "b response. awaiting c",
Expand All @@ -106,9 +108,9 @@ defmodule Plugin.Test do
end
end

def lock_callback(cfg, msg, :c) do
case at_module?(cfg, msg) do
{:cleaned, "c"} ->
def lock_callback(msg, :c) do
case msg.body do
"c" ->
S.Response.new(
confidence: 10,
text: "c response. interaction done!",
Expand Down
Loading

0 comments on commit 7bda109

Please sign in to comment.