Merge pull request 'Expose stats about finally failed AP deliveries in prometheus' (#882) from Oneric/akkoma:telemetry-failed-deliveries into develop
Reviewed-on: https://akkoma.dev/AkkomaGang/akkoma/pulls/882
This commit is contained in:
commit
13940a558a
6 changed files with 97 additions and 17 deletions
|
@ -3,6 +3,7 @@ defmodule Pleroma.Web.Telemetry do
|
||||||
import Telemetry.Metrics
|
import Telemetry.Metrics
|
||||||
alias Pleroma.Stats
|
alias Pleroma.Stats
|
||||||
alias Pleroma.Config
|
alias Pleroma.Config
|
||||||
|
require Logger
|
||||||
|
|
||||||
def start_link(arg) do
|
def start_link(arg) do
|
||||||
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
|
Supervisor.start_link(__MODULE__, arg, name: __MODULE__)
|
||||||
|
@ -10,6 +11,8 @@ def start_link(arg) do
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def init(_arg) do
|
def init(_arg) do
|
||||||
|
subscribe_to_events()
|
||||||
|
|
||||||
children =
|
children =
|
||||||
[
|
[
|
||||||
{:telemetry_poller, measurements: periodic_measurements(), period: 10_000}
|
{:telemetry_poller, measurements: periodic_measurements(), period: 10_000}
|
||||||
|
@ -32,6 +35,82 @@ defp prometheus_children do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp subscribe_to_events() do
|
||||||
|
# note: all handlers for a event are executed blockingly
|
||||||
|
# before the event can conclude, so they mustn't do anything heavy
|
||||||
|
:telemetry.attach(
|
||||||
|
"akkoma-oban-jobs-stop",
|
||||||
|
[:oban, :job, :stop],
|
||||||
|
&__MODULE__.handle_lib_event/4,
|
||||||
|
[]
|
||||||
|
)
|
||||||
|
|
||||||
|
:telemetry.attach(
|
||||||
|
"akkoma-oban-jobs-fail",
|
||||||
|
[:oban, :job, :exception],
|
||||||
|
&__MODULE__.handle_lib_event/4,
|
||||||
|
[]
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_lib_event(
|
||||||
|
[:oban, :job, event],
|
||||||
|
_measurements,
|
||||||
|
%{state: :discard, worker: "Pleroma.Workers.PublisherWorker"} = meta,
|
||||||
|
_config
|
||||||
|
)
|
||||||
|
when event in [:stop, :exception] do
|
||||||
|
{full_target, simple_target, reason} =
|
||||||
|
case meta.args["op"] do
|
||||||
|
"publish" ->
|
||||||
|
{meta.args["activity_id"], nil, "inbox_collection"}
|
||||||
|
|
||||||
|
"publish_one" ->
|
||||||
|
full_target = get_in(meta.args, ["params", "inbox"])
|
||||||
|
%{host: simple_target} = URI.parse(full_target || "")
|
||||||
|
error = collect_apdelivery_error(event, meta)
|
||||||
|
{full_target, simple_target, error}
|
||||||
|
end
|
||||||
|
|
||||||
|
Logger.error("Failed AP delivery: #{inspect(reason)} for #{inspect(full_target)}")
|
||||||
|
|
||||||
|
:telemetry.execute([:akkoma, :ap, :delivery, :fail], %{final: 1}, %{
|
||||||
|
reason: reason,
|
||||||
|
target: simple_target
|
||||||
|
})
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_lib_event([:oban, :job, _], _, _, _), do: :ok
|
||||||
|
|
||||||
|
defp collect_apdelivery_error(:exception, %{result: nil, reason: reason}) do
|
||||||
|
case reason do
|
||||||
|
%Oban.CrashError{} -> "crash"
|
||||||
|
%Oban.TimeoutError{} -> "timeout"
|
||||||
|
%type{} -> "exception_#{type}"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp collect_apdelivery_error(:exception, %{result: {:error, error}}) do
|
||||||
|
process_fail_reason(error)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp collect_apdelivery_error(:stop, %{result: {_, reason}}) do
|
||||||
|
process_fail_reason(reason)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp collect_apdelivery_error(_, _) do
|
||||||
|
"cause_unknown"
|
||||||
|
end
|
||||||
|
|
||||||
|
defp process_fail_reason(error) do
|
||||||
|
case error do
|
||||||
|
error when is_binary(error) -> error
|
||||||
|
error when is_atom(error) -> "#{error}"
|
||||||
|
%{status: code} when is_number(code) -> "http_#{code}"
|
||||||
|
_ -> "error_unknown"
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
# A seperate set of metrics for distributions because phoenix dashboard does NOT handle them well
|
# A seperate set of metrics for distributions because phoenix dashboard does NOT handle them well
|
||||||
defp distribution_metrics do
|
defp distribution_metrics do
|
||||||
[
|
[
|
||||||
|
@ -215,7 +294,8 @@ defp common_metrics(byte_unit \\ :byte) do
|
||||||
last_value("pleroma.local_users.total"),
|
last_value("pleroma.local_users.total"),
|
||||||
last_value("pleroma.domains.total"),
|
last_value("pleroma.domains.total"),
|
||||||
last_value("pleroma.local_statuses.total"),
|
last_value("pleroma.local_statuses.total"),
|
||||||
last_value("pleroma.remote_users.total")
|
last_value("pleroma.remote_users.total"),
|
||||||
|
counter("akkoma.ap.delivery.fail.final", tags: [:target, :reason])
|
||||||
]
|
]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -35,13 +35,13 @@ def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do
|
||||||
{:discard, :origin_containment_failed}
|
{:discard, :origin_containment_failed}
|
||||||
|
|
||||||
{:error, {:reject, reason}} ->
|
{:error, {:reject, reason}} ->
|
||||||
{:discard, reason}
|
{:cancel, reason}
|
||||||
|
|
||||||
{:error, :already_present} ->
|
{:error, :already_present} ->
|
||||||
{:discard, :already_present}
|
{:cancel, :already_present}
|
||||||
|
|
||||||
{:error, :ignore} ->
|
{:error, :ignore} ->
|
||||||
{:discard, :ignore}
|
{:cancel, :ignore}
|
||||||
|
|
||||||
# invalid data or e.g. deleting an object we don't know about anyway
|
# invalid data or e.g. deleting an object we don't know about anyway
|
||||||
{:error, {:validate, issue}} ->
|
{:error, {:validate, issue}} ->
|
||||||
|
|
|
@ -16,22 +16,22 @@ def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
|
||||||
:ok
|
:ok
|
||||||
|
|
||||||
{:error, :forbidden} ->
|
{:error, :forbidden} ->
|
||||||
{:discard, :forbidden}
|
{:cancel, :forbidden}
|
||||||
|
|
||||||
{:error, :not_found} ->
|
{:error, :not_found} ->
|
||||||
{:discard, :not_found}
|
{:cancel, :not_found}
|
||||||
|
|
||||||
{:error, :allowed_depth} ->
|
{:error, :allowed_depth} ->
|
||||||
{:discard, :allowed_depth}
|
{:cancel, :allowed_depth}
|
||||||
|
|
||||||
{:error, :invalid_uri_scheme} ->
|
{:error, :invalid_uri_scheme} ->
|
||||||
{:discard, :invalid_uri_scheme}
|
{:cancel, :invalid_uri_scheme}
|
||||||
|
|
||||||
{:error, :local_resource} ->
|
{:error, :local_resource} ->
|
||||||
{:discard, :local_resource}
|
{:cancel, :local_resource}
|
||||||
|
|
||||||
{:reject, _} ->
|
{:reject, _} ->
|
||||||
{:discard, :reject}
|
{:cancel, :reject}
|
||||||
|
|
||||||
{:error, :id_mismatch} ->
|
{:error, :id_mismatch} ->
|
||||||
{:discard, :id_mismatch}
|
{:discard, :id_mismatch}
|
||||||
|
|
|
@ -132,7 +132,7 @@ test "successfully processes incoming AP docs with correct origin" do
|
||||||
assert {:ok, _activity} = ObanHelpers.perform(job)
|
assert {:ok, _activity} = ObanHelpers.perform(job)
|
||||||
|
|
||||||
assert {:ok, job} = Federator.incoming_ap_doc(params)
|
assert {:ok, job} = Federator.incoming_ap_doc(params)
|
||||||
assert {:discard, :already_present} = ObanHelpers.perform(job)
|
assert {:cancel, :already_present} = ObanHelpers.perform(job)
|
||||||
end
|
end
|
||||||
|
|
||||||
test "successfully normalises public scope descriptors" do
|
test "successfully normalises public scope descriptors" do
|
||||||
|
@ -199,7 +199,7 @@ test "it does not crash if MRF rejects the post" do
|
||||||
|> Jason.decode!()
|
|> Jason.decode!()
|
||||||
|
|
||||||
assert {:ok, job} = Federator.incoming_ap_doc(params)
|
assert {:ok, job} = Federator.incoming_ap_doc(params)
|
||||||
assert {:discard, _} = ObanHelpers.perform(job)
|
assert {:cancel, _} = ObanHelpers.perform(job)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -20,7 +20,7 @@ test "it ignores MRF reject" do
|
||||||
|
|
||||||
with_mock Pleroma.Web.ActivityPub.Transmogrifier,
|
with_mock Pleroma.Web.ActivityPub.Transmogrifier,
|
||||||
handle_incoming: fn _ -> {:reject, "MRF"} end do
|
handle_incoming: fn _ -> {:reject, "MRF"} end do
|
||||||
assert {:discard, "MRF"} =
|
assert {:cancel, "MRF"} =
|
||||||
ReceiverWorker.perform(%Oban.Job{
|
ReceiverWorker.perform(%Oban.Job{
|
||||||
args: %{"op" => "incoming_ap_doc", "params" => params}
|
args: %{"op" => "incoming_ap_doc", "params" => params}
|
||||||
})
|
})
|
||||||
|
|
|
@ -39,19 +39,19 @@ defmodule Pleroma.Workers.RemoteFetcherWorkerTest do
|
||||||
end
|
end
|
||||||
|
|
||||||
test "does not requeue a deleted object" do
|
test "does not requeue a deleted object" do
|
||||||
assert {:discard, _} =
|
assert {:cancel, _} =
|
||||||
RemoteFetcherWorker.perform(%Oban.Job{
|
RemoteFetcherWorker.perform(%Oban.Job{
|
||||||
args: %{"op" => "fetch_remote", "id" => @deleted_object_one}
|
args: %{"op" => "fetch_remote", "id" => @deleted_object_one}
|
||||||
})
|
})
|
||||||
|
|
||||||
assert {:discard, _} =
|
assert {:cancel, _} =
|
||||||
RemoteFetcherWorker.perform(%Oban.Job{
|
RemoteFetcherWorker.perform(%Oban.Job{
|
||||||
args: %{"op" => "fetch_remote", "id" => @deleted_object_two}
|
args: %{"op" => "fetch_remote", "id" => @deleted_object_two}
|
||||||
})
|
})
|
||||||
end
|
end
|
||||||
|
|
||||||
test "does not requeue an unauthorized object" do
|
test "does not requeue an unauthorized object" do
|
||||||
assert {:discard, _} =
|
assert {:cancel, _} =
|
||||||
RemoteFetcherWorker.perform(%Oban.Job{
|
RemoteFetcherWorker.perform(%Oban.Job{
|
||||||
args: %{"op" => "fetch_remote", "id" => @unauthorized_object}
|
args: %{"op" => "fetch_remote", "id" => @unauthorized_object}
|
||||||
})
|
})
|
||||||
|
@ -60,7 +60,7 @@ test "does not requeue an unauthorized object" do
|
||||||
test "does not requeue an object that exceeded depth" do
|
test "does not requeue an object that exceeded depth" do
|
||||||
clear_config([:instance, :federation_incoming_replies_max_depth], 0)
|
clear_config([:instance, :federation_incoming_replies_max_depth], 0)
|
||||||
|
|
||||||
assert {:discard, _} =
|
assert {:cancel, _} =
|
||||||
RemoteFetcherWorker.perform(%Oban.Job{
|
RemoteFetcherWorker.perform(%Oban.Job{
|
||||||
args: %{"op" => "fetch_remote", "id" => @depth_object, "depth" => 1}
|
args: %{"op" => "fetch_remote", "id" => @depth_object, "depth" => 1}
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in a new issue