Merge pull request 'Backoff on HTTP requests when 429 is recieved' (#762) from backoff-http into develop
Reviewed-on: https://akkoma.dev/AkkomaGang/akkoma/pulls/762
This commit is contained in:
commit
76ded10a70
6 changed files with 233 additions and 4 deletions
|
@ -37,6 +37,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
|
||||||
- Issue leading to Mastodon bot accounts being rejected
|
- Issue leading to Mastodon bot accounts being rejected
|
||||||
- Scope misdetection of remote posts resulting from not recognising
|
- Scope misdetection of remote posts resulting from not recognising
|
||||||
JSON-LD-compacted forms of public scope; affected e.g. federation with bovine
|
JSON-LD-compacted forms of public scope; affected e.g. federation with bovine
|
||||||
|
- Ratelimits encountered when fetching objects are now respected; 429 responses will cause a backoff when we get one.
|
||||||
|
|
||||||
## Removed
|
## Removed
|
||||||
- ActivityPub Client-To-Server write API endpoints have been disabled;
|
- ActivityPub Client-To-Server write API endpoints have been disabled;
|
||||||
|
|
|
@ -179,7 +179,8 @@ defp cachex_children do
|
||||||
build_cachex("translations", default_ttl: :timer.hours(24 * 30), limit: 2500),
|
build_cachex("translations", default_ttl: :timer.hours(24 * 30), limit: 2500),
|
||||||
build_cachex("instances", default_ttl: :timer.hours(24), ttl_interval: 1000, limit: 2500),
|
build_cachex("instances", default_ttl: :timer.hours(24), ttl_interval: 1000, limit: 2500),
|
||||||
build_cachex("request_signatures", default_ttl: :timer.hours(24 * 30), limit: 3000),
|
build_cachex("request_signatures", default_ttl: :timer.hours(24 * 30), limit: 3000),
|
||||||
build_cachex("rel_me", default_ttl: :timer.hours(24 * 30), limit: 300)
|
build_cachex("rel_me", default_ttl: :timer.hours(24 * 30), limit: 300),
|
||||||
|
build_cachex("http_backoff", default_ttl: :timer.hours(24 * 30), limit: 10000)
|
||||||
]
|
]
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
121
lib/pleroma/http/backoff.ex
Normal file
121
lib/pleroma/http/backoff.ex
Normal file
|
@ -0,0 +1,121 @@
|
||||||
|
defmodule Pleroma.HTTP.Backoff do
|
||||||
|
alias Pleroma.HTTP
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
@cachex Pleroma.Config.get([:cachex, :provider], Cachex)
|
||||||
|
@backoff_cache :http_backoff_cache
|
||||||
|
|
||||||
|
# attempt to parse a timestamp from a header
|
||||||
|
# returns nil if it can't parse the timestamp
|
||||||
|
@spec timestamp_or_nil(binary) :: DateTime.t() | nil
|
||||||
|
defp timestamp_or_nil(header) do
|
||||||
|
case DateTime.from_iso8601(header) do
|
||||||
|
{:ok, stamp, _} ->
|
||||||
|
stamp
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# attempt to parse the x-ratelimit-reset header from the headers
|
||||||
|
@spec x_ratelimit_reset(headers :: list) :: DateTime.t() | nil
|
||||||
|
defp x_ratelimit_reset(headers) do
|
||||||
|
with {_header, value} <- List.keyfind(headers, "x-ratelimit-reset", 0),
|
||||||
|
true <- is_binary(value) do
|
||||||
|
timestamp_or_nil(value)
|
||||||
|
else
|
||||||
|
_ ->
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# attempt to parse the Retry-After header from the headers
|
||||||
|
# this can be either a timestamp _or_ a number of seconds to wait!
|
||||||
|
# we'll return a datetime if we can parse it, or nil if we can't
|
||||||
|
@spec retry_after(headers :: list) :: DateTime.t() | nil
|
||||||
|
defp retry_after(headers) do
|
||||||
|
with {_header, value} <- List.keyfind(headers, "retry-after", 0),
|
||||||
|
true <- is_binary(value) do
|
||||||
|
# first, see if it's an integer
|
||||||
|
case Integer.parse(value) do
|
||||||
|
{seconds, ""} ->
|
||||||
|
Logger.debug("Parsed Retry-After header: #{seconds} seconds")
|
||||||
|
DateTime.utc_now() |> Timex.shift(seconds: seconds)
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
# if it's not an integer, try to parse it as a timestamp
|
||||||
|
timestamp_or_nil(value)
|
||||||
|
end
|
||||||
|
else
|
||||||
|
_ ->
|
||||||
|
nil
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
# given a set of headers, will attempt to find the next backoff timestamp
|
||||||
|
# if it can't find one, it will default to 5 minutes from now
|
||||||
|
@spec next_backoff_timestamp(%{headers: list}) :: DateTime.t()
|
||||||
|
defp next_backoff_timestamp(%{headers: headers}) when is_list(headers) do
|
||||||
|
default_5_minute_backoff =
|
||||||
|
DateTime.utc_now()
|
||||||
|
|> Timex.shift(seconds: 5 * 60)
|
||||||
|
|
||||||
|
backoff =
|
||||||
|
[&x_ratelimit_reset/1, &retry_after/1]
|
||||||
|
|> Enum.map(& &1.(headers))
|
||||||
|
|> Enum.find(&(&1 != nil))
|
||||||
|
|
||||||
|
if is_nil(backoff) do
|
||||||
|
Logger.debug("No backoff headers found, defaulting to 5 minutes from now")
|
||||||
|
default_5_minute_backoff
|
||||||
|
else
|
||||||
|
Logger.debug("Found backoff header, will back off until: #{backoff}")
|
||||||
|
backoff
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp next_backoff_timestamp(_), do: DateTime.utc_now() |> Timex.shift(seconds: 5 * 60)
|
||||||
|
|
||||||
|
# utility function to check the HTTP response for potential backoff headers
|
||||||
|
# will check if we get a 429 or 503 response, and if we do, will back off for a bit
|
||||||
|
@spec check_backoff({:ok | :error, HTTP.Env.t()}, binary()) ::
|
||||||
|
{:ok | :error, HTTP.Env.t()} | {:error, :ratelimit}
|
||||||
|
defp check_backoff({:ok, env}, host) do
|
||||||
|
case env.status do
|
||||||
|
status when status in [429, 503] ->
|
||||||
|
Logger.error("Rate limited on #{host}! Backing off...")
|
||||||
|
timestamp = next_backoff_timestamp(env)
|
||||||
|
ttl = Timex.diff(timestamp, DateTime.utc_now(), :seconds)
|
||||||
|
# we will cache the host for 5 minutes
|
||||||
|
@cachex.put(@backoff_cache, host, true, ttl: ttl)
|
||||||
|
{:error, :ratelimit}
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
{:ok, env}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp check_backoff(env, _), do: env
|
||||||
|
|
||||||
|
@doc """
|
||||||
|
this acts as a single throughput for all GET requests
|
||||||
|
we will check if the host is in the cache, and if it is, we will automatically fail the request
|
||||||
|
this ensures that we don't hammer the server with requests, and instead wait for the backoff to expire
|
||||||
|
this is a very simple implementation, and can be improved upon!
|
||||||
|
"""
|
||||||
|
@spec get(binary, list, list) :: {:ok | :error, HTTP.Env.t()} | {:error, :ratelimit}
|
||||||
|
def get(url, headers \\ [], options \\ []) do
|
||||||
|
%{host: host} = URI.parse(url)
|
||||||
|
|
||||||
|
case @cachex.get(@backoff_cache, host) do
|
||||||
|
{:ok, nil} ->
|
||||||
|
url
|
||||||
|
|> HTTP.get(headers, options)
|
||||||
|
|> check_backoff(host)
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
{:error, :ratelimit}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
|
@ -354,7 +354,7 @@ def get_object(id) do
|
||||||
|
|
||||||
with {:ok, %{body: body, status: code, headers: headers, url: final_url}}
|
with {:ok, %{body: body, status: code, headers: headers, url: final_url}}
|
||||||
when code in 200..299 <-
|
when code in 200..299 <-
|
||||||
HTTP.get(id, headers),
|
HTTP.Backoff.get(id, headers),
|
||||||
remote_host <-
|
remote_host <-
|
||||||
URI.parse(final_url).host,
|
URI.parse(final_url).host,
|
||||||
{:cross_domain_redirect, false} <-
|
{:cross_domain_redirect, false} <-
|
||||||
|
|
|
@ -160,7 +160,8 @@ def find_lrdd_template(domain) do
|
||||||
# WebFinger is restricted to HTTPS - https://tools.ietf.org/html/rfc7033#section-9.1
|
# WebFinger is restricted to HTTPS - https://tools.ietf.org/html/rfc7033#section-9.1
|
||||||
meta_url = "https://#{domain}/.well-known/host-meta"
|
meta_url = "https://#{domain}/.well-known/host-meta"
|
||||||
|
|
||||||
with {:ok, %{status: status, body: body}} when status in 200..299 <- HTTP.get(meta_url) do
|
with {:ok, %{status: status, body: body}} when status in 200..299 <-
|
||||||
|
HTTP.Backoff.get(meta_url) do
|
||||||
get_template_from_xml(body)
|
get_template_from_xml(body)
|
||||||
else
|
else
|
||||||
error ->
|
error ->
|
||||||
|
@ -197,7 +198,7 @@ def finger(account) do
|
||||||
|
|
||||||
with address when is_binary(address) <- get_address_from_domain(domain, encoded_account),
|
with address when is_binary(address) <- get_address_from_domain(domain, encoded_account),
|
||||||
{:ok, %{status: status, body: body, headers: headers}} when status in 200..299 <-
|
{:ok, %{status: status, body: body, headers: headers}} when status in 200..299 <-
|
||||||
HTTP.get(
|
HTTP.Backoff.get(
|
||||||
address,
|
address,
|
||||||
[{"accept", "application/xrd+xml,application/jrd+json"}]
|
[{"accept", "application/xrd+xml,application/jrd+json"}]
|
||||||
) do
|
) do
|
||||||
|
|
105
test/pleroma/http/backoff_test.exs
Normal file
105
test/pleroma/http/backoff_test.exs
Normal file
|
@ -0,0 +1,105 @@
|
||||||
|
defmodule Pleroma.HTTP.BackoffTest do
|
||||||
|
@backoff_cache :http_backoff_cache
|
||||||
|
use Pleroma.DataCase, async: false
|
||||||
|
alias Pleroma.HTTP.Backoff
|
||||||
|
|
||||||
|
defp within_tolerance?(ttl, expected) do
|
||||||
|
ttl > expected - 10 and ttl < expected + 10
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "get/3" do
|
||||||
|
test "should return {:ok, env} when not rate limited" do
|
||||||
|
Tesla.Mock.mock_global(fn
|
||||||
|
%Tesla.Env{url: "https://akkoma.dev/api/v1/instance"} ->
|
||||||
|
{:ok, %Tesla.Env{status: 200, body: "ok"}}
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert {:ok, env} = Backoff.get("https://akkoma.dev/api/v1/instance")
|
||||||
|
assert env.status == 200
|
||||||
|
end
|
||||||
|
|
||||||
|
test "should return {:error, env} when rate limited" do
|
||||||
|
# Shove a value into the cache to simulate a rate limit
|
||||||
|
Cachex.put(@backoff_cache, "akkoma.dev", true)
|
||||||
|
assert {:error, :ratelimit} = Backoff.get("https://akkoma.dev/api/v1/instance")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "should insert a value into the cache when rate limited" do
|
||||||
|
Tesla.Mock.mock_global(fn
|
||||||
|
%Tesla.Env{url: "https://ratelimited.dev/api/v1/instance"} ->
|
||||||
|
{:ok, %Tesla.Env{status: 429, body: "Rate limited"}}
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert {:error, :ratelimit} = Backoff.get("https://ratelimited.dev/api/v1/instance")
|
||||||
|
assert {:ok, true} = Cachex.get(@backoff_cache, "ratelimited.dev")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "should insert a value into the cache when rate limited with a 503 response" do
|
||||||
|
Tesla.Mock.mock_global(fn
|
||||||
|
%Tesla.Env{url: "https://ratelimited.dev/api/v1/instance"} ->
|
||||||
|
{:ok, %Tesla.Env{status: 503, body: "Rate limited"}}
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert {:error, :ratelimit} = Backoff.get("https://ratelimited.dev/api/v1/instance")
|
||||||
|
assert {:ok, true} = Cachex.get(@backoff_cache, "ratelimited.dev")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "should parse the value of x-ratelimit-reset, if present" do
|
||||||
|
ten_minutes_from_now =
|
||||||
|
DateTime.utc_now() |> Timex.shift(minutes: 10) |> DateTime.to_iso8601()
|
||||||
|
|
||||||
|
Tesla.Mock.mock_global(fn
|
||||||
|
%Tesla.Env{url: "https://ratelimited.dev/api/v1/instance"} ->
|
||||||
|
{:ok,
|
||||||
|
%Tesla.Env{
|
||||||
|
status: 429,
|
||||||
|
body: "Rate limited",
|
||||||
|
headers: [{"x-ratelimit-reset", ten_minutes_from_now}]
|
||||||
|
}}
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert {:error, :ratelimit} = Backoff.get("https://ratelimited.dev/api/v1/instance")
|
||||||
|
assert {:ok, true} = Cachex.get(@backoff_cache, "ratelimited.dev")
|
||||||
|
{:ok, ttl} = Cachex.ttl(@backoff_cache, "ratelimited.dev")
|
||||||
|
assert within_tolerance?(ttl, 600)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "should parse the value of retry-after when it's a timestamp" do
|
||||||
|
ten_minutes_from_now =
|
||||||
|
DateTime.utc_now() |> Timex.shift(minutes: 10) |> DateTime.to_iso8601()
|
||||||
|
|
||||||
|
Tesla.Mock.mock_global(fn
|
||||||
|
%Tesla.Env{url: "https://ratelimited.dev/api/v1/instance"} ->
|
||||||
|
{:ok,
|
||||||
|
%Tesla.Env{
|
||||||
|
status: 429,
|
||||||
|
body: "Rate limited",
|
||||||
|
headers: [{"retry-after", ten_minutes_from_now}]
|
||||||
|
}}
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert {:error, :ratelimit} = Backoff.get("https://ratelimited.dev/api/v1/instance")
|
||||||
|
assert {:ok, true} = Cachex.get(@backoff_cache, "ratelimited.dev")
|
||||||
|
{:ok, ttl} = Cachex.ttl(@backoff_cache, "ratelimited.dev")
|
||||||
|
assert within_tolerance?(ttl, 600)
|
||||||
|
end
|
||||||
|
|
||||||
|
test "should parse the value of retry-after when it's a number of seconds" do
|
||||||
|
Tesla.Mock.mock_global(fn
|
||||||
|
%Tesla.Env{url: "https://ratelimited.dev/api/v1/instance"} ->
|
||||||
|
{:ok,
|
||||||
|
%Tesla.Env{
|
||||||
|
status: 429,
|
||||||
|
body: "Rate limited",
|
||||||
|
headers: [{"retry-after", "600"}]
|
||||||
|
}}
|
||||||
|
end)
|
||||||
|
|
||||||
|
assert {:error, :ratelimit} = Backoff.get("https://ratelimited.dev/api/v1/instance")
|
||||||
|
assert {:ok, true} = Cachex.get(@backoff_cache, "ratelimited.dev")
|
||||||
|
# assert that the value is 10 minutes from now
|
||||||
|
{:ok, ttl} = Cachex.ttl(@backoff_cache, "ratelimited.dev")
|
||||||
|
assert within_tolerance?(ttl, 600)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Loading…
Reference in a new issue