diff --git a/lib/pleroma/web/telemetry.ex b/lib/pleroma/web/telemetry.ex index 8d43e366f..1251a23c3 100644 --- a/lib/pleroma/web/telemetry.ex +++ b/lib/pleroma/web/telemetry.ex @@ -3,6 +3,7 @@ defmodule Pleroma.Web.Telemetry do import Telemetry.Metrics alias Pleroma.Stats alias Pleroma.Config + require Logger def start_link(arg) do Supervisor.start_link(__MODULE__, arg, name: __MODULE__) @@ -10,6 +11,8 @@ def start_link(arg) do @impl true def init(_arg) do + subscribe_to_events() + children = [ {:telemetry_poller, measurements: periodic_measurements(), period: 10_000} @@ -32,6 +35,82 @@ defp prometheus_children do end end + defp subscribe_to_events() do + # note: all handlers for a event are executed blockingly + # before the event can conclude, so they mustn't do anything heavy + :telemetry.attach( + "akkoma-oban-jobs-stop", + [:oban, :job, :stop], + &__MODULE__.handle_lib_event/4, + [] + ) + + :telemetry.attach( + "akkoma-oban-jobs-fail", + [:oban, :job, :exception], + &__MODULE__.handle_lib_event/4, + [] + ) + end + + def handle_lib_event( + [:oban, :job, event], + _measurements, + %{state: :discard, worker: "Pleroma.Workers.PublisherWorker"} = meta, + _config + ) + when event in [:stop, :exception] do + {full_target, simple_target, reason} = + case meta.args["op"] do + "publish" -> + {meta.args["activity_id"], nil, "inbox_collection"} + + "publish_one" -> + full_target = get_in(meta.args, ["params", "inbox"]) + %{host: simple_target} = URI.parse(full_target || "") + error = collect_apdelivery_error(event, meta) + {full_target, simple_target, error} + end + + Logger.error("Failed AP delivery: #{inspect(reason)} for #{inspect(full_target)}") + + :telemetry.execute([:akkoma, :ap, :delivery, :fail], %{final: 1}, %{ + reason: reason, + target: simple_target + }) + end + + def handle_lib_event([:oban, :job, _], _, _, _), do: :ok + + defp collect_apdelivery_error(:exception, %{result: nil, reason: reason}) do + case reason do + %Oban.CrashError{} -> "crash" + %Oban.TimeoutError{} -> "timeout" + %type{} -> "exception_#{type}" + end + end + + defp collect_apdelivery_error(:exception, %{result: {:error, error}}) do + process_fail_reason(error) + end + + defp collect_apdelivery_error(:stop, %{result: {_, reason}}) do + process_fail_reason(reason) + end + + defp collect_apdelivery_error(_, _) do + "cause_unknown" + end + + defp process_fail_reason(error) do + case error do + error when is_binary(error) -> error + error when is_atom(error) -> "#{error}" + %{status: code} when is_number(code) -> "http_#{code}" + _ -> "error_unknown" + end + end + # A seperate set of metrics for distributions because phoenix dashboard does NOT handle them well defp distribution_metrics do [ @@ -215,7 +294,8 @@ defp common_metrics(byte_unit \\ :byte) do last_value("pleroma.local_users.total"), last_value("pleroma.domains.total"), last_value("pleroma.local_statuses.total"), - last_value("pleroma.remote_users.total") + last_value("pleroma.remote_users.total"), + counter("akkoma.ap.delivery.fail.final", tags: [:target, :reason]) ] end diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index 5629fcd90..a8ea86eae 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -35,13 +35,13 @@ def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do {:discard, :origin_containment_failed} {:error, {:reject, reason}} -> - {:discard, reason} + {:cancel, reason} {:error, :already_present} -> - {:discard, :already_present} + {:cancel, :already_present} {:error, :ignore} -> - {:discard, :ignore} + {:cancel, :ignore} # invalid data or e.g. deleting an object we don't know about anyway {:error, {:validate, issue}} -> diff --git a/lib/pleroma/workers/remote_fetcher_worker.ex b/lib/pleroma/workers/remote_fetcher_worker.ex index 1bc0e5d0c..3d1b381c7 100644 --- a/lib/pleroma/workers/remote_fetcher_worker.ex +++ b/lib/pleroma/workers/remote_fetcher_worker.ex @@ -16,22 +16,22 @@ def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do :ok {:error, :forbidden} -> - {:discard, :forbidden} + {:cancel, :forbidden} {:error, :not_found} -> - {:discard, :not_found} + {:cancel, :not_found} {:error, :allowed_depth} -> - {:discard, :allowed_depth} + {:cancel, :allowed_depth} {:error, :invalid_uri_scheme} -> - {:discard, :invalid_uri_scheme} + {:cancel, :invalid_uri_scheme} {:error, :local_resource} -> - {:discard, :local_resource} + {:cancel, :local_resource} {:reject, _} -> - {:discard, :reject} + {:cancel, :reject} {:error, :id_mismatch} -> {:discard, :id_mismatch} diff --git a/test/pleroma/web/federator_test.exs b/test/pleroma/web/federator_test.exs index 30da35669..7a23d4117 100644 --- a/test/pleroma/web/federator_test.exs +++ b/test/pleroma/web/federator_test.exs @@ -132,7 +132,7 @@ test "successfully processes incoming AP docs with correct origin" do assert {:ok, _activity} = ObanHelpers.perform(job) assert {:ok, job} = Federator.incoming_ap_doc(params) - assert {:discard, :already_present} = ObanHelpers.perform(job) + assert {:cancel, :already_present} = ObanHelpers.perform(job) end test "successfully normalises public scope descriptors" do @@ -199,7 +199,7 @@ test "it does not crash if MRF rejects the post" do |> Jason.decode!() assert {:ok, job} = Federator.incoming_ap_doc(params) - assert {:discard, _} = ObanHelpers.perform(job) + assert {:cancel, _} = ObanHelpers.perform(job) end end end diff --git a/test/pleroma/workers/receiver_worker_test.exs b/test/pleroma/workers/receiver_worker_test.exs index 9fd0e1fa8..6c4b14751 100644 --- a/test/pleroma/workers/receiver_worker_test.exs +++ b/test/pleroma/workers/receiver_worker_test.exs @@ -20,7 +20,7 @@ test "it ignores MRF reject" do with_mock Pleroma.Web.ActivityPub.Transmogrifier, handle_incoming: fn _ -> {:reject, "MRF"} end do - assert {:discard, "MRF"} = + assert {:cancel, "MRF"} = ReceiverWorker.perform(%Oban.Job{ args: %{"op" => "incoming_ap_doc", "params" => params} }) diff --git a/test/pleroma/workers/remote_fetcher_worker_test.exs b/test/pleroma/workers/remote_fetcher_worker_test.exs index c30e773d4..2104baab2 100644 --- a/test/pleroma/workers/remote_fetcher_worker_test.exs +++ b/test/pleroma/workers/remote_fetcher_worker_test.exs @@ -39,19 +39,19 @@ defmodule Pleroma.Workers.RemoteFetcherWorkerTest do end test "does not requeue a deleted object" do - assert {:discard, _} = + assert {:cancel, _} = RemoteFetcherWorker.perform(%Oban.Job{ args: %{"op" => "fetch_remote", "id" => @deleted_object_one} }) - assert {:discard, _} = + assert {:cancel, _} = RemoteFetcherWorker.perform(%Oban.Job{ args: %{"op" => "fetch_remote", "id" => @deleted_object_two} }) end test "does not requeue an unauthorized object" do - assert {:discard, _} = + assert {:cancel, _} = RemoteFetcherWorker.perform(%Oban.Job{ args: %{"op" => "fetch_remote", "id" => @unauthorized_object} }) @@ -60,7 +60,7 @@ test "does not requeue an unauthorized object" do test "does not requeue an object that exceeded depth" do clear_config([:instance, :federation_incoming_replies_max_depth], 0) - assert {:discard, _} = + assert {:cancel, _} = RemoteFetcherWorker.perform(%Oban.Job{ args: %{"op" => "fetch_remote", "id" => @depth_object, "depth" => 1} })