
With the current strategy the individual and cumulative backoff looks like this (the + part denotes max extra random delay): attempt backoff_single cumulative 1 16+30 16+30 2 47+60 63+90 3 243+90 ≈ 4min 321+180 4 1024+120 ≈17min 1360+300 ≈23+5min 5 3125+150 ≈20min 4500+450 ≈75+8min 6 7776+180 ≈ 2.1h 12291+630 ≈3.4h 7 16807+210 ≈ 4.6h 29113+840 ≈8h 8 32768+240 ≈ 9.1h 61896+1080 ≈17h 9 59049+270 ≈16.4h 120960+1350 ≈33h 10 100000+300 ≈27.7h 220975+1650 ≈61h We default to 5 retries meaning the least backoff runs with attempt=4. Therefore outgoing activiities might already be permanently dropped by a downtime of only 23 minutes which doesn't seem too implausible to occur. Furthermore it seems excessive to retry this quickly this often at the beginning. At the same time, we’d like to have at least one quick'ish retry to deal with transient issues and maintain reasonable federation responsiveness. If an admin wants to tolerate one -day downtime of remotes, retries need to be almost doubled. The new backoff strategy implemented in this commit instead switches to an exponetial after a few initial attempts: attempt backoff_single cumulative 1 16+30 16+30 2 143+60 159+90 3 2202+90 ≈37min 2361+180 ≈40min 4 8160+120 ≈ 2.3h 10521+300 ≈ 3h 5 77393+150 ≈21.5h 87914+450 ≈24h Initial retries are still fast, but the same amount of retries now allows a remote downtime of at least 40 minutes. Customising the retry count to 5 allows for whole-day downtimes.
72 lines
2 KiB
Elixir
72 lines
2 KiB
Elixir
# Pleroma: A lightweight social networking server
|
|
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
|
|
# SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
defmodule Pleroma.Workers.WorkerHelper do
|
|
alias Pleroma.Config
|
|
alias Pleroma.Workers.WorkerHelper
|
|
|
|
def worker_args(queue) do
|
|
case Config.get([:workers, :retries, queue]) do
|
|
nil -> []
|
|
max_attempts -> [max_attempts: max_attempts]
|
|
end
|
|
end
|
|
|
|
def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
|
|
backoff =
|
|
:math.pow(attempt, pow) +
|
|
base_backoff +
|
|
:rand.uniform(2 * base_backoff) * attempt
|
|
|
|
trunc(backoff)
|
|
end
|
|
|
|
def exponential_backoff(attempt, base, base_backoff \\ 15) do
|
|
backoff =
|
|
:math.pow(base, attempt) +
|
|
base_backoff +
|
|
:rand.uniform(2 * base_backoff) * attempt
|
|
|
|
trunc(backoff)
|
|
end
|
|
|
|
defmacro __using__(opts) do
|
|
caller_module = __CALLER__.module
|
|
queue = Keyword.fetch!(opts, :queue)
|
|
# by default just stop unintended duplicates - this can and should be overridden
|
|
# if you want to have a more complex uniqueness constraint
|
|
uniqueness = Keyword.get(opts, :unique, period: 1)
|
|
|
|
quote do
|
|
# Note: `max_attempts` is intended to be overridden in `new/2` call
|
|
use Oban.Worker,
|
|
queue: unquote(queue),
|
|
max_attempts: 1,
|
|
unique: unquote(uniqueness)
|
|
|
|
alias Oban.Job
|
|
|
|
defp do_enqueue(op, params, worker_args \\ []) do
|
|
params = Map.merge(%{"op" => op}, params)
|
|
queue_atom = String.to_atom(unquote(queue))
|
|
worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
|
|
|
|
unquote(caller_module)
|
|
|> apply(:new, [params, worker_args])
|
|
|> Oban.insert()
|
|
end
|
|
|
|
def enqueue(op, params, worker_args \\ []),
|
|
do: do_enqueue(op, params, worker_args)
|
|
|
|
@impl Oban.Worker
|
|
def timeout(_job) do
|
|
queue_atom = String.to_atom(unquote(queue))
|
|
Config.get([:workers, :timeout, queue_atom], :timer.minutes(1))
|
|
end
|
|
|
|
defoverridable enqueue: 3
|
|
end
|
|
end
|
|
end
|