From 2fee79e1f5602530bb6ea9e024052b8346caafb7 Mon Sep 17 00:00:00 2001 From: Oneric Date: Fri, 14 Mar 2025 16:48:48 +0100 Subject: [PATCH 1/2] Use apropriate cancellation type for oban jobs :discard marks jobs as "discarded", i.e. jobs which permanently failed due to e.g. exhausting all retries or explicitly being discared due to a fatal error. :cancel marks jobs as "cancelled" which does not imply failure. While neither method counts as a job "exception" in the set of telemetries we currently export via Prometheus, the different state is visible in the (not-exported) metadata of oban job telemetry. We can use handlers of those events to build bespoke statistics. Ideally we'd like to distinguish in the receiver worker between "invalid" and "already present or delete of unknown" documents, but this is cumbersome to get get right with a list of free-form, human-readable descriptions oof the violated constraints. For now, just count both as an fatal error. # but that is cumbersome to get right with a list of string error descriptions --- lib/pleroma/workers/receiver_worker.ex | 6 +++--- lib/pleroma/workers/remote_fetcher_worker.ex | 12 ++++++------ test/pleroma/web/federator_test.exs | 4 ++-- test/pleroma/workers/receiver_worker_test.exs | 2 +- test/pleroma/workers/remote_fetcher_worker_test.exs | 8 ++++---- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index 13493ec8b..edb176358 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -18,13 +18,13 @@ def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do {:discard, :origin_containment_failed} {:error, {:reject, reason}} -> - {:discard, reason} + {:cancel, reason} {:error, :already_present} -> - {:discard, :already_present} + {:cancel, :already_present} {:error, :ignore} -> - {:discard, :ignore} + {:cancel, :ignore} # invalid data or e.g. deleting an object we don't know about anyway {:error, {:validate, issue}} -> diff --git a/lib/pleroma/workers/remote_fetcher_worker.ex b/lib/pleroma/workers/remote_fetcher_worker.ex index 1bc0e5d0c..3d1b381c7 100644 --- a/lib/pleroma/workers/remote_fetcher_worker.ex +++ b/lib/pleroma/workers/remote_fetcher_worker.ex @@ -16,22 +16,22 @@ def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do :ok {:error, :forbidden} -> - {:discard, :forbidden} + {:cancel, :forbidden} {:error, :not_found} -> - {:discard, :not_found} + {:cancel, :not_found} {:error, :allowed_depth} -> - {:discard, :allowed_depth} + {:cancel, :allowed_depth} {:error, :invalid_uri_scheme} -> - {:discard, :invalid_uri_scheme} + {:cancel, :invalid_uri_scheme} {:error, :local_resource} -> - {:discard, :local_resource} + {:cancel, :local_resource} {:reject, _} -> - {:discard, :reject} + {:cancel, :reject} {:error, :id_mismatch} -> {:discard, :id_mismatch} diff --git a/test/pleroma/web/federator_test.exs b/test/pleroma/web/federator_test.exs index 30da35669..7a23d4117 100644 --- a/test/pleroma/web/federator_test.exs +++ b/test/pleroma/web/federator_test.exs @@ -132,7 +132,7 @@ test "successfully processes incoming AP docs with correct origin" do assert {:ok, _activity} = ObanHelpers.perform(job) assert {:ok, job} = Federator.incoming_ap_doc(params) - assert {:discard, :already_present} = ObanHelpers.perform(job) + assert {:cancel, :already_present} = ObanHelpers.perform(job) end test "successfully normalises public scope descriptors" do @@ -199,7 +199,7 @@ test "it does not crash if MRF rejects the post" do |> Jason.decode!() assert {:ok, job} = Federator.incoming_ap_doc(params) - assert {:discard, _} = ObanHelpers.perform(job) + assert {:cancel, _} = ObanHelpers.perform(job) end end end diff --git a/test/pleroma/workers/receiver_worker_test.exs b/test/pleroma/workers/receiver_worker_test.exs index 9fd0e1fa8..6c4b14751 100644 --- a/test/pleroma/workers/receiver_worker_test.exs +++ b/test/pleroma/workers/receiver_worker_test.exs @@ -20,7 +20,7 @@ test "it ignores MRF reject" do with_mock Pleroma.Web.ActivityPub.Transmogrifier, handle_incoming: fn _ -> {:reject, "MRF"} end do - assert {:discard, "MRF"} = + assert {:cancel, "MRF"} = ReceiverWorker.perform(%Oban.Job{ args: %{"op" => "incoming_ap_doc", "params" => params} }) diff --git a/test/pleroma/workers/remote_fetcher_worker_test.exs b/test/pleroma/workers/remote_fetcher_worker_test.exs index c30e773d4..2104baab2 100644 --- a/test/pleroma/workers/remote_fetcher_worker_test.exs +++ b/test/pleroma/workers/remote_fetcher_worker_test.exs @@ -39,19 +39,19 @@ defmodule Pleroma.Workers.RemoteFetcherWorkerTest do end test "does not requeue a deleted object" do - assert {:discard, _} = + assert {:cancel, _} = RemoteFetcherWorker.perform(%Oban.Job{ args: %{"op" => "fetch_remote", "id" => @deleted_object_one} }) - assert {:discard, _} = + assert {:cancel, _} = RemoteFetcherWorker.perform(%Oban.Job{ args: %{"op" => "fetch_remote", "id" => @deleted_object_two} }) end test "does not requeue an unauthorized object" do - assert {:discard, _} = + assert {:cancel, _} = RemoteFetcherWorker.perform(%Oban.Job{ args: %{"op" => "fetch_remote", "id" => @unauthorized_object} }) @@ -60,7 +60,7 @@ test "does not requeue an unauthorized object" do test "does not requeue an object that exceeded depth" do clear_config([:instance, :federation_incoming_replies_max_depth], 0) - assert {:discard, _} = + assert {:cancel, _} = RemoteFetcherWorker.perform(%Oban.Job{ args: %{"op" => "fetch_remote", "id" => @depth_object, "depth" => 1} }) From 1f6f5edf85005cf9f8777b07acf3c14f488896fe Mon Sep 17 00:00:00 2001 From: Oneric Date: Sat, 1 Mar 2025 20:37:06 +0100 Subject: [PATCH 2/2] telemetry: expose stats about failed deliveries And also log about it which we so far didn't do --- lib/pleroma/web/telemetry.ex | 82 +++++++++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/lib/pleroma/web/telemetry.ex b/lib/pleroma/web/telemetry.ex index 8d43e366f..1251a23c3 100644 --- a/lib/pleroma/web/telemetry.ex +++ b/lib/pleroma/web/telemetry.ex @@ -3,6 +3,7 @@ defmodule Pleroma.Web.Telemetry do import Telemetry.Metrics alias Pleroma.Stats alias Pleroma.Config + require Logger def start_link(arg) do Supervisor.start_link(__MODULE__, arg, name: __MODULE__) @@ -10,6 +11,8 @@ def start_link(arg) do @impl true def init(_arg) do + subscribe_to_events() + children = [ {:telemetry_poller, measurements: periodic_measurements(), period: 10_000} @@ -32,6 +35,82 @@ defp prometheus_children do end end + defp subscribe_to_events() do + # note: all handlers for a event are executed blockingly + # before the event can conclude, so they mustn't do anything heavy + :telemetry.attach( + "akkoma-oban-jobs-stop", + [:oban, :job, :stop], + &__MODULE__.handle_lib_event/4, + [] + ) + + :telemetry.attach( + "akkoma-oban-jobs-fail", + [:oban, :job, :exception], + &__MODULE__.handle_lib_event/4, + [] + ) + end + + def handle_lib_event( + [:oban, :job, event], + _measurements, + %{state: :discard, worker: "Pleroma.Workers.PublisherWorker"} = meta, + _config + ) + when event in [:stop, :exception] do + {full_target, simple_target, reason} = + case meta.args["op"] do + "publish" -> + {meta.args["activity_id"], nil, "inbox_collection"} + + "publish_one" -> + full_target = get_in(meta.args, ["params", "inbox"]) + %{host: simple_target} = URI.parse(full_target || "") + error = collect_apdelivery_error(event, meta) + {full_target, simple_target, error} + end + + Logger.error("Failed AP delivery: #{inspect(reason)} for #{inspect(full_target)}") + + :telemetry.execute([:akkoma, :ap, :delivery, :fail], %{final: 1}, %{ + reason: reason, + target: simple_target + }) + end + + def handle_lib_event([:oban, :job, _], _, _, _), do: :ok + + defp collect_apdelivery_error(:exception, %{result: nil, reason: reason}) do + case reason do + %Oban.CrashError{} -> "crash" + %Oban.TimeoutError{} -> "timeout" + %type{} -> "exception_#{type}" + end + end + + defp collect_apdelivery_error(:exception, %{result: {:error, error}}) do + process_fail_reason(error) + end + + defp collect_apdelivery_error(:stop, %{result: {_, reason}}) do + process_fail_reason(reason) + end + + defp collect_apdelivery_error(_, _) do + "cause_unknown" + end + + defp process_fail_reason(error) do + case error do + error when is_binary(error) -> error + error when is_atom(error) -> "#{error}" + %{status: code} when is_number(code) -> "http_#{code}" + _ -> "error_unknown" + end + end + # A seperate set of metrics for distributions because phoenix dashboard does NOT handle them well defp distribution_metrics do [ @@ -215,7 +294,8 @@ defp common_metrics(byte_unit \\ :byte) do last_value("pleroma.local_users.total"), last_value("pleroma.domains.total"), last_value("pleroma.local_statuses.total"), - last_value("pleroma.remote_users.total") + last_value("pleroma.remote_users.total"), + counter("akkoma.ap.delivery.fail.final", tags: [:target, :reason]) ] end