From ad52135bf5c1f8e7188210766207d83cebf9d113 Mon Sep 17 00:00:00 2001 From: Floatingghost Date: Tue, 11 Jun 2024 18:06:51 +0100 Subject: [PATCH] Convert rich media backfill to oban task --- config/config.exs | 7 ++- lib/pleroma/http.ex | 1 + lib/pleroma/web/rich_media/backfill.ex | 55 +++++++++---------- mix.exs | 7 +-- mix.lock | 2 +- test/pleroma/web/rich_media/card_test.exs | 7 +++ .../parser/ttl/aws_signed_url_test.exs | 2 + .../rich_media/parser/ttl/opengraph_test.exs | 2 + 8 files changed, 47 insertions(+), 36 deletions(-) diff --git a/config/config.exs b/config/config.exs index cea2b5c44..82c64f402 100644 --- a/config/config.exs +++ b/config/config.exs @@ -583,6 +583,7 @@ config :pleroma, Oban, search_indexing: 10, nodeinfo_fetcher: 1, database_prune: 1, + rich_media_backfill: 2, rich_media_expiration: 2 ], plugins: [ @@ -599,7 +600,8 @@ config :pleroma, :workers, retries: [ federator_incoming: 5, federator_outgoing: 5, - search_indexing: 2 + search_indexing: 2, + rich_media_backfill: 3 ], timeout: [ activity_expiration: :timer.seconds(5), @@ -621,7 +623,8 @@ config :pleroma, :workers, mute_expire: :timer.seconds(5), search_indexing: :timer.seconds(5), nodeinfo_fetcher: :timer.seconds(10), - database_prune: :timer.minutes(10) + database_prune: :timer.minutes(10), + rich_media_backfill: :timer.seconds(30) ] config :pleroma, Pleroma.Formatter, diff --git a/lib/pleroma/http.ex b/lib/pleroma/http.ex index 28d6e3a36..3ff27915c 100644 --- a/lib/pleroma/http.ex +++ b/lib/pleroma/http.ex @@ -74,6 +74,7 @@ defmodule Pleroma.HTTP do request = build_request(method, headers, options, url, body, params) client = Tesla.client([Tesla.Middleware.FollowRedirects, Tesla.Middleware.Telemetry]) + Logger.debug("Outbound: #{method} #{url}") request(client, request) rescue e -> diff --git a/lib/pleroma/web/rich_media/backfill.ex b/lib/pleroma/web/rich_media/backfill.ex index 4ec50e132..6b2373b01 100644 --- a/lib/pleroma/web/rich_media/backfill.ex +++ b/lib/pleroma/web/rich_media/backfill.ex @@ -3,6 +3,10 @@ # SPDX-License-Identifier: AGPL-3.0-only defmodule Pleroma.Web.RichMedia.Backfill do + use Pleroma.Workers.WorkerHelper, + queue: "rich_media_backfill", + unique: [period: 300, states: Oban.Job.states(), keys: [:op, :url_hash]] + alias Pleroma.Web.RichMedia.Card alias Pleroma.Web.RichMedia.Parser alias Pleroma.Web.RichMedia.Parser.TTL @@ -10,61 +14,58 @@ defmodule Pleroma.Web.RichMedia.Backfill do require Logger - @backfiller Pleroma.Config.get([__MODULE__, :provider], Pleroma.Web.RichMedia.Backfill.Task) @cachex Pleroma.Config.get([:cachex, :provider], Cachex) - @max_attempts 3 - @retry 5_000 def start(%{url: url} = args) when is_binary(url) do url_hash = Card.url_to_hash(url) args = args - |> Map.put(:attempt, 1) |> Map.put(:url_hash, url_hash) - @backfiller.run(args) + __MODULE__.enqueue("rich_media_backfill", args) end - def run(%{url: url, url_hash: url_hash, attempt: attempt} = args) - when attempt <= @max_attempts do + def perform(%Oban.Job{args: %{"op" => "rich_media_backfill", "url" => url} = args}) + when is_binary(url) do + run(args) + end + + def run(%{"url" => url, "url_hash" => url_hash} = args) do case Parser.parse(url) do {:ok, fields} -> {:ok, card} = Card.create(url, fields) maybe_schedule_expiration(url, fields) - if Map.has_key?(args, :activity_id) do + if Map.has_key?(args, "activity_id") do stream_update(args) end warm_cache(url_hash, card) + :ok {:error, {:invalid_metadata, fields}} -> Logger.debug("Rich media incomplete or invalid metadata for #{url}: #{inspect(fields)}") - negative_cache(url_hash) + negative_cache(url_hash, :timer.minutes(30)) {:error, :body_too_large} -> Logger.error("Rich media error for #{url}: :body_too_large") - negative_cache(url_hash) + negative_cache(url_hash, :timer.minutes(30)) {:error, {:content_type, type}} -> Logger.debug("Rich media error for #{url}: :content_type is #{type}") - negative_cache(url_hash) + negative_cache(url_hash, :timer.minutes(30)) e -> Logger.debug("Rich media error for #{url}: #{inspect(e)}") - - :timer.sleep(@retry * attempt) - - run(%{args | attempt: attempt + 1}) + {:error, e} end end - def run(%{url: url, url_hash: url_hash}) do - Logger.debug("Rich media failure for #{url}") - - negative_cache(url_hash, :timer.minutes(15)) + def run(e) do + Logger.error("Rich media failure - invalid args: #{inspect(e)}") + {:discard, :invalid} end defp maybe_schedule_expiration(url, fields) do @@ -80,22 +81,18 @@ defmodule Pleroma.Web.RichMedia.Backfill do end end - defp stream_update(%{activity_id: activity_id}) do + defp stream_update(%{"activity_id" => activity_id}) do + Logger.info("Rich media backfill: streaming update for activity #{activity_id}") + Pleroma.Activity.get_by_id(activity_id) |> Pleroma.Activity.normalize() |> Pleroma.Web.ActivityPub.ActivityPub.stream_out() end defp warm_cache(key, val), do: @cachex.put(:rich_media_cache, key, val) - defp negative_cache(key, ttl \\ nil), do: @cachex.put(:rich_media_cache, key, nil, ttl: ttl) -end -defmodule Pleroma.Web.RichMedia.Backfill.Task do - alias Pleroma.Web.RichMedia.Backfill - - def run(args) do - Task.Supervisor.start_child(Pleroma.TaskSupervisor, Backfill, :run, [args], - name: {:global, {:rich_media, args.url_hash}} - ) + def negative_cache(key, ttl \\ :timer.minutes(30)) do + @cachex.put(:rich_media_cache, key, nil, ttl: ttl) + {:discard, :error} end end diff --git a/mix.exs b/mix.exs index a789cd967..2ce2c5e89 100644 --- a/mix.exs +++ b/mix.exs @@ -78,7 +78,8 @@ defmodule Pleroma.Mixfile do :comeonin, :fast_sanitize, :os_mon, - :ssl + :ssl, + :recon ], included_applications: [:ex_syslogger] ] @@ -158,9 +159,7 @@ defmodule Pleroma.Mixfile do {:timex, "~> 3.7"}, {:ueberauth, "== 0.10.5"}, {:linkify, "~> 0.5.3"}, - {:http_signatures, - git: "https://akkoma.dev/AkkomaGang/http_signatures.git", - ref: "6640ce7d24c783ac2ef56e27d00d12e8dc85f396"}, + {:http_signatures, "~> 0.1.2"}, {:telemetry, "~> 1.2"}, {:telemetry_poller, "~> 1.0"}, {:telemetry_metrics, "~> 0.6"}, diff --git a/mix.lock b/mix.lock index ab24b8dbc..a4e2ddc8c 100644 --- a/mix.lock +++ b/mix.lock @@ -57,7 +57,7 @@ "hackney": {:hex, :hackney, "1.20.1", "8d97aec62ddddd757d128bfd1df6c5861093419f8f7a4223823537bad5d064e2", [:rebar3], [{:certifi, "~> 2.12.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.4.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "fe9094e5f1a2a2c0a7d10918fee36bfec0ec2a979994cff8cfe8058cd9af38e3"}, "hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"}, "html_entities": {:hex, :html_entities, "0.5.2", "9e47e70598da7de2a9ff6af8758399251db6dbb7eebe2b013f2bbd2515895c3c", [:mix], [], "hexpm", "c53ba390403485615623b9531e97696f076ed415e8d8058b1dbaa28181f4fdcc"}, - "http_signatures": {:git, "https://akkoma.dev/AkkomaGang/http_signatures.git", "6640ce7d24c783ac2ef56e27d00d12e8dc85f396", [ref: "6640ce7d24c783ac2ef56e27d00d12e8dc85f396"]}, + "http_signatures": {:hex, :http_signatures, "0.1.2", "ed1cc7043abcf5bb4f30d68fb7bad9d618ec1a45c4ff6c023664e78b67d9c406", [:mix], [], "hexpm", "f08aa9ac121829dae109d608d83c84b940ef2f183ae50f2dd1e9a8bc619d8be7"}, "httpoison": {:hex, :httpoison, "1.8.2", "9eb9c63ae289296a544842ef816a85d881d4a31f518a0fec089aaa744beae290", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "2bb350d26972e30c96e2ca74a1aaf8293d61d0742ff17f01e0279fef11599921"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, "inet_cidr": {:hex, :inet_cidr, "1.0.8", "d26bb7bdbdf21ae401ead2092bf2bb4bf57fe44a62f5eaa5025280720ace8a40", [:mix], [], "hexpm", "d5b26da66603bb56c933c65214c72152f0de9a6ea53618b56d63302a68f6a90e"}, diff --git a/test/pleroma/web/rich_media/card_test.exs b/test/pleroma/web/rich_media/card_test.exs index 516ac9951..570e4034d 100644 --- a/test/pleroma/web/rich_media/card_test.exs +++ b/test/pleroma/web/rich_media/card_test.exs @@ -36,6 +36,9 @@ defmodule Pleroma.Web.RichMedia.CardTest do content_type: "text/markdown" }) + # wait for oban + Pleroma.Tests.ObanHelpers.perform_all() + assert %Card{url_hash: ^url_hash, fields: _} = Card.get_by_activity(activity) end @@ -50,6 +53,8 @@ defmodule Pleroma.Web.RichMedia.CardTest do # Force a backfill Card.get_by_activity(activity) + # wait for oban + Pleroma.Tests.ObanHelpers.perform_all() assert match?( %Card{url_hash: ^original_url_hash, fields: _}, @@ -62,6 +67,8 @@ defmodule Pleroma.Web.RichMedia.CardTest do # Force a backfill Card.get_by_activity(activity) + # wait for oban + Pleroma.Tests.ObanHelpers.perform_all() assert match?( %Card{url_hash: ^updated_url_hash, fields: _}, diff --git a/test/pleroma/web/rich_media/parser/ttl/aws_signed_url_test.exs b/test/pleroma/web/rich_media/parser/ttl/aws_signed_url_test.exs index cc28aa7f3..136e43775 100644 --- a/test/pleroma/web/rich_media/parser/ttl/aws_signed_url_test.exs +++ b/test/pleroma/web/rich_media/parser/ttl/aws_signed_url_test.exs @@ -73,6 +73,8 @@ defmodule Pleroma.Web.RichMedia.Parser.TTL.AwsSignedUrlTest do end) Card.get_or_backfill_by_url(url) + # wait for oban + Pleroma.Tests.ObanHelpers.perform_all() assert_enqueued(worker: Pleroma.Workers.RichMediaExpirationWorker, args: %{"url" => url}) diff --git a/test/pleroma/web/rich_media/parser/ttl/opengraph_test.exs b/test/pleroma/web/rich_media/parser/ttl/opengraph_test.exs index 770968d47..3360e8357 100644 --- a/test/pleroma/web/rich_media/parser/ttl/opengraph_test.exs +++ b/test/pleroma/web/rich_media/parser/ttl/opengraph_test.exs @@ -35,6 +35,8 @@ defmodule Pleroma.Web.RichMedia.Parser.TTL.OpengraphTest do end) Card.get_or_backfill_by_url(url) + # wait for oban + Pleroma.Tests.ObanHelpers.perform_all() assert_enqueued(worker: Pleroma.Workers.RichMediaExpirationWorker, args: %{"url" => url}) end