Delete useless, custom JobQueueMonitor

While its data was included in healthcheck responses,
it was not used to determine the healthy status
and for informational purposes Prometheus metrics,
ObanWeb dashboard or the Phoenix live dashboard are all better fits.
In particular, the data shown in healtcheck responses had no temporal
information, but there’s quite a difference between X failures scattered
across many days of uptime and X failures within a couple minutes.
This commit is contained in:
Oneric 2025-10-10 00:00:00 +00:00
parent 6dc546eda7
commit f4e188af0a
5 changed files with 0 additions and 175 deletions

View file

@ -68,7 +68,6 @@ def start(_type, _args) do
http_children() ++
[
Pleroma.Stats,
Pleroma.JobQueueMonitor,
{Majic.Pool, [name: Pleroma.MajicPool, pool_size: Config.get([:majic_pool, :size], 2)]},
{Oban, Config.get(Oban)},
Pleroma.Web.Endpoint,

View file

@ -14,7 +14,6 @@ defmodule Pleroma.Healthcheck do
active: 0,
idle: 0,
memory_used: 0,
job_queue_stats: nil,
healthy: true
@type t :: %__MODULE__{
@ -22,7 +21,6 @@ defmodule Pleroma.Healthcheck do
active: non_neg_integer(),
idle: non_neg_integer(),
memory_used: number(),
job_queue_stats: map(),
healthy: boolean()
}
@ -32,7 +30,6 @@ def system_info do
memory_used: Float.round(:recon_alloc.memory(:allocated) / 1024 / 1024, 2)
}
|> assign_db_info()
|> assign_job_queue_stats()
|> check_health()
end
@ -58,11 +55,6 @@ defp assign_db_info(healthcheck) do
Map.merge(healthcheck, db_info)
end
defp assign_job_queue_stats(healthcheck) do
stats = Pleroma.JobQueueMonitor.stats()
Map.put(healthcheck, :job_queue_stats, stats)
end
@spec check_health(Healthcheck.t()) :: Healthcheck.t()
def check_health(%{pool_size: pool_size, active: active} = check)
when active >= pool_size do

View file

@ -1,95 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.JobQueueMonitor do
use GenServer
@initial_state %{workers: %{}, queues: %{}, processed_jobs: 0}
@queue %{processed_jobs: 0, success: 0, failure: 0}
@operation %{processed_jobs: 0, success: 0, failure: 0}
def start_link(_) do
GenServer.start_link(__MODULE__, @initial_state, name: __MODULE__)
end
@impl true
def init(state) do
:telemetry.attach(
"oban-monitor-failure",
[:oban, :job, :exception],
&Pleroma.JobQueueMonitor.handle_event/4,
nil
)
:telemetry.attach(
"oban-monitor-success",
[:oban, :job, :stop],
&Pleroma.JobQueueMonitor.handle_event/4,
nil
)
{:ok, state}
end
def stats do
GenServer.call(__MODULE__, :stats)
end
def handle_event([:oban, :job, event], %{duration: duration}, meta, _) do
GenServer.cast(
__MODULE__,
{:process_event, mapping_status(event), duration, meta}
)
end
@impl true
def handle_call(:stats, _from, state) do
{:reply, state, state}
end
@impl true
def handle_cast({:process_event, status, duration, meta}, state) do
state =
state
|> Map.update!(:workers, fn workers ->
workers
|> Map.put_new(meta.worker, %{})
|> Map.update!(meta.worker, &update_worker(&1, status, meta, duration))
end)
|> Map.update!(:queues, fn workers ->
workers
|> Map.put_new(meta.queue, @queue)
|> Map.update!(meta.queue, &update_queue(&1, status, meta, duration))
end)
|> Map.update!(:processed_jobs, &(&1 + 1))
{:noreply, state}
end
defp update_worker(worker, status, meta, duration) do
worker
|> Map.put_new(meta.args["op"], @operation)
|> Map.update!(meta.args["op"], &update_op(&1, status, meta, duration))
end
defp update_op(op, :enqueue, _meta, _duration) do
op
|> Map.update!(:enqueued, &(&1 + 1))
end
defp update_op(op, status, _meta, _duration) do
op
|> Map.update!(:processed_jobs, &(&1 + 1))
|> Map.update!(status, &(&1 + 1))
end
defp update_queue(queue, status, _meta, _duration) do
queue
|> Map.update!(:processed_jobs, &(&1 + 1))
|> Map.update!(status, &(&1 + 1))
end
defp mapping_status(:stop), do: :success
defp mapping_status(:exception), do: :failure
end

View file

@ -15,7 +15,6 @@ test "system_info/0" do
:active,
:healthy,
:idle,
:job_queue_stats,
:memory_used,
:pool_size
])

View file

@ -1,70 +0,0 @@
# Pleroma: A lightweight social networking server
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.JobQueueMonitorTest do
use ExUnit.Case, async: true
alias Pleroma.JobQueueMonitor
@success {:process_event, :success, 1337,
%{
args: %{"op" => "refresh_subscriptions"},
attempt: 1,
id: 339,
max_attempts: 5,
queue: "federator_outgoing",
worker: "Pleroma.Workers.SubscriberWorker"
}}
@failure {:process_event, :failure, 22_521_134,
%{
args: %{"op" => "force_password_reset", "user_id" => "9nJG6n6Nbu7tj9GJX6"},
attempt: 1,
error: %RuntimeError{message: "oops"},
id: 345,
kind: :exception,
max_attempts: 1,
queue: "background",
stack: [
{Pleroma.Workers.BackgroundWorker, :perform, 2,
[file: ~c"lib/pleroma/workers/background_worker.ex", line: 31]},
{Oban.Queue.Executor, :safe_call, 1,
[file: ~c"lib/oban/queue/executor.ex", line: 42]},
{:timer, :tc, 3, [file: ~c"timer.erl", line: 197]},
{Oban.Queue.Executor, :call, 2, [file: ~c"lib/oban/queue/executor.ex", line: 23]},
{Task.Supervised, :invoke_mfa, 2, [file: ~c"lib/task/supervised.ex", line: 90]},
{:proc_lib, :init_p_do_apply, 3, [file: ~c"proc_lib.erl", line: 249]}
],
worker: "Pleroma.Workers.BackgroundWorker"
}}
test "stats/0" do
assert %{processed_jobs: _, queues: _, workers: _} = JobQueueMonitor.stats()
end
test "handle_cast/2" do
state = %{workers: %{}, queues: %{}, processed_jobs: 0}
assert {:noreply, state} = JobQueueMonitor.handle_cast(@success, state)
assert {:noreply, state} = JobQueueMonitor.handle_cast(@failure, state)
assert {:noreply, state} = JobQueueMonitor.handle_cast(@success, state)
assert {:noreply, state} = JobQueueMonitor.handle_cast(@failure, state)
assert state == %{
processed_jobs: 4,
queues: %{
"background" => %{failure: 2, processed_jobs: 2, success: 0},
"federator_outgoing" => %{failure: 0, processed_jobs: 2, success: 2}
},
workers: %{
"Pleroma.Workers.BackgroundWorker" => %{
"force_password_reset" => %{failure: 2, processed_jobs: 2, success: 0}
},
"Pleroma.Workers.SubscriberWorker" => %{
"refresh_subscriptions" => %{failure: 0, processed_jobs: 2, success: 2}
}
}
}
end
end