receiver_worker: prevent duplicate jobs

E.g. \*oma federates (most) follower-only posts multiple times
to each personal inbox. This commonly leads to race conditions
with jobs of several copies running at the same time and getting
past the initial "already known" check but then later all but
one will crash with an exception from the unique db index.

Since the only special thing we do with copies anyway is to discard them,
just don't create such duplicate jobs in the first place.
For the same reason and since failed jobs don't count towards
duplicates, this should have virtually no effect on federation.
This commit is contained in:
Oneric 2025-03-17 23:44:46 +01:00
parent 74182abb5b
commit 195042bdc9
2 changed files with 40 additions and 1 deletions

View file

@ -7,7 +7,24 @@ defmodule Pleroma.Workers.ReceiverWorker do
alias Pleroma.Web.Federator
use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
use Pleroma.Workers.WorkerHelper,
queue: "federator_incoming",
unique: [
keys: [:op, :id],
# all states except :discarded
states: [:scheduled, :available, :executing, :retryable, :completed, :cancelled]
]
def enqueue(op, %{"id" => _} = params, worker_args), do: do_enqueue(op, params, worker_args)
def enqueue(op, %{"params" => %{"id" => id}} = params, worker_args) when is_binary(id) do
do_enqueue(op, Map.put(params, "id", id), worker_args)
end
def enqueue(op, params, worker_args) do
# should be rare if it happens at all (transient activity)
do_enqueue(op, Map.put(params, "id", Ecto.UUID.generate()), worker_args)
end
@impl Oban.Worker
def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do

View file

@ -0,0 +1,22 @@
defmodule Pleroma.Repo.Migrations.ReceiverWorkerIdKey do
use Ecto.Migration
def up() do
# since we currently still support PostgreSQL 12 and 13, do NOT use the args['id'] snytax!
"""
UPDATE public.oban_jobs
SET args = jsonb_set(
args,
'{id}',
to_jsonb(COALESCE(args#>>'{params,id}', id::text))
)
WHERE worker = 'Pleroma.Workers.ReceiverWorker';
"""
|> Pleroma.Repo.query!([], timeout: :infinity)
end
def down() do
# no action needed
:ok
end
end