Merge pull request 'telemetry: expose count of currently pending/scheduled jobs per queue' (#982) from Oneric/akkoma:telemetry-job-queue-pending into develop

Reviewed-on: https://akkoma.dev/AkkomaGang/akkoma/pulls/982
This commit is contained in:
Oneric 2025-10-04 22:44:50 +00:00
commit 24d828d9a6
4 changed files with 549 additions and 315 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 152 KiB

After

Width:  |  Height:  |  Size: 165 KiB

View file

@ -119,7 +119,8 @@ Thats it, youve got a fancy dashboard with long-term, 24/7 metrics now!
Updating the dashboard can be done by just repeating the import process.
Heres an example taken from a healthy, small instance where
nobody was logged in for about the first half of the displayed time span:
nobody was logged in except for a few minutes
resulting in an (expected) spike in incoming requests:
![Full view of the reference dashboard as it looked at the time of writing](img/grafana_dashboard.webp)
!!! note
@ -259,7 +260,6 @@ This too requires administrator rights to access and can be found under `/akkoma
The exposed aggregate info is mostly redundant with job statistics already tracked in Prometheus,
but it additionally also:
- shows not-yet executed jobs in the backlog of queues
- shows full argument and meta details for each job
- allows interactively deleting or manually retrying jobs
*(keep this in mind when granting people administrator rights!)*

File diff suppressed because it is too large Load diff

View file

@ -1,5 +1,6 @@
defmodule Pleroma.Web.Telemetry do
use Supervisor
import Ecto.Query
import Telemetry.Metrics
alias Pleroma.Stats
alias Pleroma.Config
@ -15,7 +16,7 @@ def init(_arg) do
children =
[
{:telemetry_poller, measurements: periodic_measurements(), period: 10_000}
{:telemetry_poller, measurements: periodic_measurements(), period: 60_000}
] ++
prometheus_children()
@ -303,7 +304,9 @@ defp common_metrics(byte_unit \\ :byte) do
last_value("pleroma.domains.total"),
last_value("pleroma.local_statuses.total"),
last_value("pleroma.remote_users.total"),
counter("akkoma.ap.delivery.fail.final", tags: [:target, :reason])
counter("akkoma.ap.delivery.fail.final", tags: [:target, :reason]),
last_value("akkoma.job.queue.scheduled", tags: [:queue_name]),
last_value("akkoma.job.queue.available", tags: [:queue_name])
]
end
@ -315,7 +318,8 @@ def live_dashboard_metrics, do: common_metrics(:megabyte) ++ summary_metrics(:me
defp periodic_measurements do
[
{__MODULE__, :io_stats, []},
{__MODULE__, :instance_stats, []}
{__MODULE__, :instance_stats, []},
{__MODULE__, :oban_pending, []}
]
end
@ -333,4 +337,26 @@ def instance_stats do
:telemetry.execute([:pleroma, :local_statuses], %{total: stats.status_count}, %{})
:telemetry.execute([:pleroma, :remote_users], %{total: stats.remote_user_count}, %{})
end
def oban_pending() do
query =
from(j in Oban.Job,
select: %{queue: j.queue, state: j.state, count: count()},
where: j.state in ["scheduled", "available"],
group_by: [j.queue, j.state]
)
conf = Oban.Config.new(Config.get!(Oban))
qres = Oban.Repo.all(conf, query)
acc = Enum.into(conf.queues, %{}, fn {x, _} -> {x, %{available: 0, scheduled: 0}} end)
acc =
Enum.reduce(qres, acc, fn %{queue: q, state: state, count: count}, acc ->
put_in(acc, [String.to_existing_atom(q), String.to_existing_atom(state)], count)
end)
for {queue, info} <- acc do
:telemetry.execute([:akkoma, :job, :queue], info, %{queue_name: queue})
end
end
end