From 30054b03f432d005a3322428566d5f26ea7e1196 Mon Sep 17 00:00:00 2001 From: Stas Date: Thu, 10 Aug 2023 17:34:07 +0200 Subject: [PATCH] feat: metric endpoint per tenant (#147) --- VERSION | 2 +- lib/supavisor.ex | 1 + lib/supavisor/application.ex | 1 + lib/supavisor/monitoring/prom_ex.ex | 27 ++++++++++++- lib/supavisor/syn_handler.ex | 1 + lib/supavisor/tenants_metrics.ex | 40 +++++++++++++++++++ .../controllers/metrics_controller.ex | 22 ++++++++++ lib/supavisor_web/router.ex | 1 + 8 files changed, 92 insertions(+), 3 deletions(-) create mode 100644 lib/supavisor/tenants_metrics.ex diff --git a/VERSION b/VERSION index b3269842..8bc53d52 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.2.27 +0.2.28 diff --git a/lib/supavisor.ex b/lib/supavisor.ex index ab7e2d04..bdabc020 100644 --- a/lib/supavisor.ex +++ b/lib/supavisor.ex @@ -107,6 +107,7 @@ defmodule Supavisor do @spec del_all_cache(String.t(), String.t()) :: map() def del_all_cache(tenant, user) do %{secrets: Cachex.del(Supavisor.Cache, {:secrets, tenant, user})} + %{metrics: Cachex.del(Supavisor.Cache, {:metrics, tenant})} end @spec get_local_pool(String.t(), String.t()) :: pid() | nil diff --git a/lib/supavisor/application.ex b/lib/supavisor/application.ex index d834821a..53627ea9 100644 --- a/lib/supavisor/application.ex +++ b/lib/supavisor/application.ex @@ -73,6 +73,7 @@ defmodule Supavisor.Application do }, Supavisor.Vault, {Cachex, name: Supavisor.Cache}, + Supavisor.TenantsMetrics, # Start the Endpoint (http/https) SupavisorWeb.Endpoint ] diff --git a/lib/supavisor/monitoring/prom_ex.ex b/lib/supavisor/monitoring/prom_ex.ex index 30c56c47..28ecde0c 100644 --- a/lib/supavisor/monitoring/prom_ex.ex +++ b/lib/supavisor/monitoring/prom_ex.ex @@ -69,8 +69,6 @@ defmodule Supavisor.Monitoring.PromEx do Application.fetch_env!(:supavisor, :metrics_tags) |> Enum.map_join(",", fn {k, v} -> "#{k}=\"#{v}\"" end) - Logger.debug("Default prom tags: #{def_tags}") - metrics = PromEx.get_metrics(__MODULE__) |> String.split("\n") @@ -82,6 +80,31 @@ defmodule Supavisor.Monitoring.PromEx do metrics end + @spec do_cache_tenants_metrics() :: :ok + def do_cache_tenants_metrics() do + metrics = get_metrics() |> String.split("\n") + + Registry.select(Supavisor.Registry.TenantSups, [{{:"$1", :_, :_}, [], [:"$1"]}]) + |> Enum.uniq() + |> Enum.reduce(metrics, fn tenant, acc -> + {matched, rest} = Enum.split_with(acc, &String.contains?(&1, "tenant=\"#{tenant}\"")) + + if matched != [] do + Cachex.put(Supavisor.Cache, {:metrics, tenant}, Enum.join(matched, "\n")) + end + + rest + end) + end + + @spec get_tenant_metrics(String.t()) :: String.t() + def get_tenant_metrics(tenant) do + case Cachex.get(Supavisor.Cache, {:metrics, tenant}) do + {_, metrics} when is_binary(metrics) -> metrics + _ -> "" + end + end + @spec parse_and_add_tags(String.t(), String.t()) :: String.t() defp parse_and_add_tags(line, def_tags) do case Regex.run(~r/(?!\#)^(\w+)(?:{(.*?)})?\s*(.+)$/, line) do diff --git a/lib/supavisor/syn_handler.ex b/lib/supavisor/syn_handler.ex index 61ee5d33..ed5a0ff4 100644 --- a/lib/supavisor/syn_handler.ex +++ b/lib/supavisor/syn_handler.ex @@ -16,6 +16,7 @@ defmodule Supavisor.SynHandler do # remove all Prometheus metrics for the specified tenant PromEx.remove_metrics(tenant, user_alias) + Supavisor.del_all_cache(tenant, user_alias) end def resolve_registry_conflict( diff --git a/lib/supavisor/tenants_metrics.ex b/lib/supavisor/tenants_metrics.ex new file mode 100644 index 00000000..48de4702 --- /dev/null +++ b/lib/supavisor/tenants_metrics.ex @@ -0,0 +1,40 @@ +defmodule Supavisor.TenantsMetrics do + @moduledoc false + use GenServer, restart: :transient + require Logger + + alias Supavisor.Monitoring.PromEx + + @check_timeout 10_000 + + def start_link(args) do + GenServer.start_link(__MODULE__, args, name: __MODULE__) + end + + ## Callbacks + + @impl true + def init(_args) do + send(self(), :check_metrics) + {:ok, %{check_ref: make_ref()}} + end + + @impl true + def handle_info(:check_metrics, state) do + Process.cancel_timer(state.check_ref) + + PromEx.do_cache_tenants_metrics() + + {:noreply, %{state | check_ref: check_metrics()}} + end + + ## Internal functions + + defp check_metrics() do + Process.send_after( + self(), + :check_metrics, + @check_timeout + ) + end +end diff --git a/lib/supavisor_web/controllers/metrics_controller.ex b/lib/supavisor_web/controllers/metrics_controller.ex index a049413f..47b0c9ba 100644 --- a/lib/supavisor_web/controllers/metrics_controller.ex +++ b/lib/supavisor_web/controllers/metrics_controller.ex @@ -17,6 +17,16 @@ defmodule SupavisorWeb.MetricsController do |> send_resp(200, cluster_metrics) end + def tenant(conn, %{"external_id" => ext_id}) do + cluster_metrics = fetch_cluster_metrics(ext_id) + + code = if cluster_metrics == "", do: 404, else: 200 + + conn + |> put_resp_content_type("text/plain") + |> send_resp(code, cluster_metrics) + end + @spec fetch_cluster_metrics() :: String.t() defp fetch_cluster_metrics() do Node.list() @@ -29,6 +39,18 @@ defmodule SupavisorWeb.MetricsController do {node, :rpc.call(node, PromEx, :get_metrics, [], 10_000)} end + @spec fetch_cluster_metrics(String.t()) :: String.t() + defp fetch_cluster_metrics(tenant) do + Node.list() + |> Task.async_stream(&fetch_node_metrics(&1, tenant), timeout: :infinity) + |> Enum.reduce(PromEx.get_tenant_metrics(tenant), &merge_node_metrics/2) + end + + @spec fetch_node_metrics(atom(), String.t()) :: {atom(), term()} + defp fetch_node_metrics(node, tenant) do + {node, :rpc.call(node, PromEx, :get_tenant_metrics, [tenant], 10_000)} + end + defp merge_node_metrics({_, {node, {:badrpc, reason}}}, acc) do Logger.error("Cannot fetch metrics from the node #{inspect(node)} because #{inspect(reason)}") acc diff --git a/lib/supavisor_web/router.ex b/lib/supavisor_web/router.ex index 6f1934e2..df8eded4 100644 --- a/lib/supavisor_web/router.ex +++ b/lib/supavisor_web/router.ex @@ -52,6 +52,7 @@ defmodule SupavisorWeb.Router do pipe_through(:metrics) get("/", MetricsController, :index) + get("/:external_id", MetricsController, :tenant) end # Other scopes may use custom stacks.