diff --git a/lib/pleroma/workers/receiver_worker.ex b/lib/pleroma/workers/receiver_worker.ex index 13493ec8b..5629fcd90 100644 --- a/lib/pleroma/workers/receiver_worker.ex +++ b/lib/pleroma/workers/receiver_worker.ex @@ -7,7 +7,24 @@ defmodule Pleroma.Workers.ReceiverWorker do alias Pleroma.Web.Federator - use Pleroma.Workers.WorkerHelper, queue: "federator_incoming" + use Pleroma.Workers.WorkerHelper, + queue: "federator_incoming", + unique: [ + keys: [:op, :id], + # all states except :discarded + states: [:scheduled, :available, :executing, :retryable, :completed, :cancelled] + ] + + def enqueue(op, %{"id" => _} = params, worker_args), do: do_enqueue(op, params, worker_args) + + def enqueue(op, %{"params" => %{"id" => id}} = params, worker_args) when is_binary(id) do + do_enqueue(op, Map.put(params, "id", id), worker_args) + end + + def enqueue(op, params, worker_args) do + # should be rare if it happens at all (transient activity) + do_enqueue(op, Map.put(params, "id", Ecto.UUID.generate()), worker_args) + end @impl Oban.Worker def perform(%Job{args: %{"op" => "incoming_ap_doc", "params" => params}}) do diff --git a/priv/repo/migrations/20250318000000_receiver_worker_id_key.exs b/priv/repo/migrations/20250318000000_receiver_worker_id_key.exs new file mode 100644 index 000000000..dbd2dd0f0 --- /dev/null +++ b/priv/repo/migrations/20250318000000_receiver_worker_id_key.exs @@ -0,0 +1,22 @@ +defmodule Pleroma.Repo.Migrations.ReceiverWorkerIdKey do + use Ecto.Migration + + def up() do + # since we currently still support PostgreSQL 12 and 13, do NOT use the args['id'] snytax! + """ + UPDATE public.oban_jobs + SET args = jsonb_set( + args, + '{id}', + to_jsonb(COALESCE(args#>>'{params,id}', id::text)) + ) + WHERE worker = 'Pleroma.Workers.ReceiverWorker'; + """ + |> Pleroma.Repo.query!([], timeout: :infinity) + end + + def down() do + # no action needed + :ok + end +end