Use apropriate cancellation type for oban jobs

:discard marks jobs as "discarded", i.e. jobs which permanently failed
due to e.g. exhausting all retries or explicitly being discared due to a
fatal error.
:cancel marks jobs as "cancelled" which does not imply failure.

While neither method counts as a job "exception" in the set of
telemetries we currently export via Prometheus, the different state
is visible in the (not-exported) metadata of oban job telemetry.
We can use handlers of those events to build bespoke statistics.

Ideally we'd like to distinguish in the receiver worker between
"invalid" and "already present or delete of unknown" documents,
but this is cumbersome to get get right with a list of
free-form, human-readable descriptions oof the violated constraints.
For now, just count both as an fatal error.
        # but that is cumbersome to get right with a list of string error descriptions
This commit is contained in:
Oneric 2025-03-14 16:48:48 +01:00
parent 6a6d4254d5
commit 2fee79e1f5
5 changed files with 16 additions and 16 deletions

View file

@ -18,13 +18,13 @@ def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do
{:discard, :origin_containment_failed} {:discard, :origin_containment_failed}
{:error, {:reject, reason}} -> {:error, {:reject, reason}} ->
{:discard, reason} {:cancel, reason}
{:error, :already_present} -> {:error, :already_present} ->
{:discard, :already_present} {:cancel, :already_present}
{:error, :ignore} -> {:error, :ignore} ->
{:discard, :ignore} {:cancel, :ignore}
# invalid data or e.g. deleting an object we don't know about anyway # invalid data or e.g. deleting an object we don't know about anyway
{:error, {:validate, issue}} -> {:error, {:validate, issue}} ->

View file

@ -16,22 +16,22 @@ def perform(%Job{args: %{"op" => "fetch_remote", "id" => id} = args}) do
:ok :ok
{:error, :forbidden} -> {:error, :forbidden} ->
{:discard, :forbidden} {:cancel, :forbidden}
{:error, :not_found} -> {:error, :not_found} ->
{:discard, :not_found} {:cancel, :not_found}
{:error, :allowed_depth} -> {:error, :allowed_depth} ->
{:discard, :allowed_depth} {:cancel, :allowed_depth}
{:error, :invalid_uri_scheme} -> {:error, :invalid_uri_scheme} ->
{:discard, :invalid_uri_scheme} {:cancel, :invalid_uri_scheme}
{:error, :local_resource} -> {:error, :local_resource} ->
{:discard, :local_resource} {:cancel, :local_resource}
{:reject, _} -> {:reject, _} ->
{:discard, :reject} {:cancel, :reject}
{:error, :id_mismatch} -> {:error, :id_mismatch} ->
{:discard, :id_mismatch} {:discard, :id_mismatch}

View file

@ -132,7 +132,7 @@ test "successfully processes incoming AP docs with correct origin" do
assert {:ok, _activity} = ObanHelpers.perform(job) assert {:ok, _activity} = ObanHelpers.perform(job)
assert {:ok, job} = Federator.incoming_ap_doc(params) assert {:ok, job} = Federator.incoming_ap_doc(params)
assert {:discard, :already_present} = ObanHelpers.perform(job) assert {:cancel, :already_present} = ObanHelpers.perform(job)
end end
test "successfully normalises public scope descriptors" do test "successfully normalises public scope descriptors" do
@ -199,7 +199,7 @@ test "it does not crash if MRF rejects the post" do
|> Jason.decode!() |> Jason.decode!()
assert {:ok, job} = Federator.incoming_ap_doc(params) assert {:ok, job} = Federator.incoming_ap_doc(params)
assert {:discard, _} = ObanHelpers.perform(job) assert {:cancel, _} = ObanHelpers.perform(job)
end end
end end
end end

View file

@ -20,7 +20,7 @@ test "it ignores MRF reject" do
with_mock Pleroma.Web.ActivityPub.Transmogrifier, with_mock Pleroma.Web.ActivityPub.Transmogrifier,
handle_incoming: fn _ -> {:reject, "MRF"} end do handle_incoming: fn _ -> {:reject, "MRF"} end do
assert {:discard, "MRF"} = assert {:cancel, "MRF"} =
ReceiverWorker.perform(%Oban.Job{ ReceiverWorker.perform(%Oban.Job{
args: %{"op" => "incoming_ap_doc", "params" => params} args: %{"op" => "incoming_ap_doc", "params" => params}
}) })

View file

@ -39,19 +39,19 @@ defmodule Pleroma.Workers.RemoteFetcherWorkerTest do
end end
test "does not requeue a deleted object" do test "does not requeue a deleted object" do
assert {:discard, _} = assert {:cancel, _} =
RemoteFetcherWorker.perform(%Oban.Job{ RemoteFetcherWorker.perform(%Oban.Job{
args: %{"op" => "fetch_remote", "id" => @deleted_object_one} args: %{"op" => "fetch_remote", "id" => @deleted_object_one}
}) })
assert {:discard, _} = assert {:cancel, _} =
RemoteFetcherWorker.perform(%Oban.Job{ RemoteFetcherWorker.perform(%Oban.Job{
args: %{"op" => "fetch_remote", "id" => @deleted_object_two} args: %{"op" => "fetch_remote", "id" => @deleted_object_two}
}) })
end end
test "does not requeue an unauthorized object" do test "does not requeue an unauthorized object" do
assert {:discard, _} = assert {:cancel, _} =
RemoteFetcherWorker.perform(%Oban.Job{ RemoteFetcherWorker.perform(%Oban.Job{
args: %{"op" => "fetch_remote", "id" => @unauthorized_object} args: %{"op" => "fetch_remote", "id" => @unauthorized_object}
}) })
@ -60,7 +60,7 @@ test "does not requeue an unauthorized object" do
test "does not requeue an object that exceeded depth" do test "does not requeue an object that exceeded depth" do
clear_config([:instance, :federation_incoming_replies_max_depth], 0) clear_config([:instance, :federation_incoming_replies_max_depth], 0)
assert {:discard, _} = assert {:cancel, _} =
RemoteFetcherWorker.perform(%Oban.Job{ RemoteFetcherWorker.perform(%Oban.Job{
args: %{"op" => "fetch_remote", "id" => @depth_object, "depth" => 1} args: %{"op" => "fetch_remote", "id" => @depth_object, "depth" => 1}
}) })