Add configurable timeline per oban job (#273)
Heavily inspired by https://git.pleroma.social/pleroma/pleroma/-/merge_requests/3777 Co-authored-by: FloatingGhost <hannah@coffee-and-dreams.uk> Reviewed-on: https://akkoma.dev/AkkomaGang/akkoma/pulls/273
This commit is contained in:
parent
7d4c4aa16e
commit
c1127e321b
9 changed files with 84 additions and 0 deletions
|
@ -584,6 +584,27 @@
|
||||||
federator_incoming: 5,
|
federator_incoming: 5,
|
||||||
federator_outgoing: 5,
|
federator_outgoing: 5,
|
||||||
search_indexing: 2
|
search_indexing: 2
|
||||||
|
],
|
||||||
|
timeout: [
|
||||||
|
activity_expiration: :timer.seconds(5),
|
||||||
|
token_expiration: :timer.seconds(5),
|
||||||
|
filter_expiration: :timer.seconds(5),
|
||||||
|
backup: :timer.seconds(900),
|
||||||
|
federator_incoming: :timer.seconds(10),
|
||||||
|
federator_outgoing: :timer.seconds(10),
|
||||||
|
ingestion_queue: :timer.seconds(5),
|
||||||
|
web_push: :timer.seconds(5),
|
||||||
|
mailer: :timer.seconds(5),
|
||||||
|
transmogrifier: :timer.seconds(5),
|
||||||
|
scheduled_activities: :timer.seconds(5),
|
||||||
|
poll_notifications: :timer.seconds(5),
|
||||||
|
background: :timer.seconds(5),
|
||||||
|
remote_fetcher: :timer.seconds(10),
|
||||||
|
attachments_cleanup: :timer.seconds(900),
|
||||||
|
new_users_digest: :timer.seconds(10),
|
||||||
|
mute_expire: :timer.seconds(5),
|
||||||
|
search_indexing: :timer.seconds(5),
|
||||||
|
nodeinfo_fetcher: :timer.seconds(10)
|
||||||
]
|
]
|
||||||
|
|
||||||
config :pleroma, Pleroma.Formatter,
|
config :pleroma, Pleroma.Formatter,
|
||||||
|
|
|
@ -1979,6 +1979,32 @@
|
||||||
federator_incoming: 5,
|
federator_incoming: 5,
|
||||||
federator_outgoing: 5
|
federator_outgoing: 5
|
||||||
]
|
]
|
||||||
|
},
|
||||||
|
%{
|
||||||
|
key: :timeout,
|
||||||
|
type: {:keyword, :integer},
|
||||||
|
description: "Timeout for jobs, per `Oban` queue, in ms",
|
||||||
|
suggestions: [
|
||||||
|
activity_expiration: :timer.seconds(5),
|
||||||
|
token_expiration: :timer.seconds(5),
|
||||||
|
filter_expiration: :timer.seconds(5),
|
||||||
|
backup: :timer.seconds(900),
|
||||||
|
federator_incoming: :timer.seconds(10),
|
||||||
|
federator_outgoing: :timer.seconds(10),
|
||||||
|
ingestion_queue: :timer.seconds(5),
|
||||||
|
web_push: :timer.seconds(5),
|
||||||
|
mailer: :timer.seconds(5),
|
||||||
|
transmogrifier: :timer.seconds(5),
|
||||||
|
scheduled_activities: :timer.seconds(5),
|
||||||
|
poll_notifications: :timer.seconds(5),
|
||||||
|
background: :timer.seconds(5),
|
||||||
|
remote_fetcher: :timer.seconds(10),
|
||||||
|
attachments_cleanup: :timer.seconds(900),
|
||||||
|
new_users_digest: :timer.seconds(10),
|
||||||
|
mute_expire: :timer.seconds(5),
|
||||||
|
search_indexing: :timer.seconds(5),
|
||||||
|
nodeinfo_fetcher: :timer.seconds(10)
|
||||||
|
]
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
|
|
@ -14,6 +14,11 @@ def process(backup, admin_user_id \\ nil) do
|
||||||
|> Oban.insert()
|
|> Oban.insert()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def timeout(_job) do
|
||||||
|
Pleroma.Config.get([:workers, :timeout, :backup]) || :timer.minutes(1)
|
||||||
|
end
|
||||||
|
|
||||||
def schedule_deletion(backup) do
|
def schedule_deletion(backup) do
|
||||||
days = Pleroma.Config.get([Backup, :purge_after_days])
|
days = Pleroma.Config.get([Backup, :purge_after_days])
|
||||||
time = 60 * 60 * 24 * days
|
time = 60 * 60 * 24 * days
|
||||||
|
@ -30,6 +35,7 @@ def delete(backup) do
|
||||||
|> Oban.insert()
|
|> Oban.insert()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@impl true
|
||||||
def perform(%Job{
|
def perform(%Job{
|
||||||
args: %{"op" => "process", "backup_id" => backup_id, "admin_user_id" => admin_user_id}
|
args: %{"op" => "process", "backup_id" => backup_id, "admin_user_id" => admin_user_id}
|
||||||
}) do
|
}) do
|
||||||
|
|
|
@ -27,6 +27,11 @@ def enqueue(args) do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def timeout(_job) do
|
||||||
|
Pleroma.Config.get([:workers, :timeout, :activity_expiration]) || :timer.minutes(1)
|
||||||
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def perform(%Oban.Job{args: %{"activity_id" => id}}) do
|
def perform(%Oban.Job{args: %{"activity_id" => id}}) do
|
||||||
with %Activity{} = activity <- find_activity(id),
|
with %Activity{} = activity <- find_activity(id),
|
||||||
|
|
|
@ -24,6 +24,11 @@ def enqueue(args) do
|
||||||
|> Oban.insert()
|
|> Oban.insert()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def timeout(_job) do
|
||||||
|
Pleroma.Config.get([:workers, :timeout, :filter_expiration]) || :timer.minutes(1)
|
||||||
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def perform(%Job{args: %{"filter_id" => id}}) do
|
def perform(%Job{args: %{"filter_id" => id}}) do
|
||||||
Pleroma.Filter
|
Pleroma.Filter
|
||||||
|
|
|
@ -19,6 +19,11 @@ def enqueue(args) do
|
||||||
|> Oban.insert()
|
|> Oban.insert()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def timeout(_job) do
|
||||||
|
Pleroma.Config.get([:workers, :timeout, :token_expiration]) || :timer.minutes(1)
|
||||||
|
end
|
||||||
|
|
||||||
@impl true
|
@impl true
|
||||||
def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do
|
def perform(%Oban.Job{args: %{"token_id" => id, "mod" => module}}) do
|
||||||
module
|
module
|
||||||
|
|
|
@ -43,6 +43,12 @@ def enqueue(op, params, worker_args \\ []) do
|
||||||
|> apply(:new, [params, worker_args])
|
|> apply(:new, [params, worker_args])
|
||||||
|> Oban.insert()
|
|> Oban.insert()
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def timeout(_job) do
|
||||||
|
queue_atom = String.to_atom(unquote(queue))
|
||||||
|
Config.get([:workers, :timeout, queue_atom]) || :timer.minutes(1)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -56,4 +56,9 @@ test "error if actiivity was not found" do
|
||||||
assert {:error, :activity_not_found} =
|
assert {:error, :activity_not_found} =
|
||||||
perform_job(Pleroma.Workers.PurgeExpiredActivity, %{activity_id: "some_if"})
|
perform_job(Pleroma.Workers.PurgeExpiredActivity, %{activity_id: "some_if"})
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "has a timeout" do
|
||||||
|
clear_config([:workers, :timeout, :activity_expiration], 50)
|
||||||
|
assert Pleroma.Workers.PurgeExpiredActivity.timeout(%Oban.Job{}) == 50
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -49,4 +49,9 @@ test "error message for non-existent scheduled activity" do
|
||||||
ScheduledActivityWorker.perform(%Oban.Job{args: %{"activity_id" => 42}})
|
ScheduledActivityWorker.perform(%Oban.Job{args: %{"activity_id" => 42}})
|
||||||
end) =~ "Couldn't find scheduled activity: 42"
|
end) =~ "Couldn't find scheduled activity: 42"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
test "has a timeout" do
|
||||||
|
clear_config([:workers, :timeout, :scheduled_activities], :timer.minutes(5))
|
||||||
|
assert ScheduledActivityWorker.timeout(nil) == :timer.minutes(5)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in a new issue