 7695010268
			
		
	
	
		7695010268
		
	
	
	
	
		
			
			This adds an option to the prune_objects mix task.
The original way deleted all non-local public posts older than a certain time frame.
Here we add a different query which you can call using the option --keep-threads.
We query from the activities table all context id's where
    1. the newest activity with this context is still old
    2. none of the activities with this context is is local
    3. none of the activities with this context is bookmarked
and delete all objects with these contexts.
The idea is that posts with local activities (posts, replies, likes, repeats...) may be interesting to keep.
Besides that, a post lives in a certain context (the thread), so we keep the whole thread as well.
Caveats:
* ~~Quotes have a different context. Therefore, when someone quotes a post, it's possible the quoted post will still be deleted.~~ fixed in https://akkoma.dev/AkkomaGang/akkoma/pulls/379
* Although undocumented (in docs/docs/administration/CLI_tasks/database.md/#prune-old-remote-posts-from-the-database), the 'normal' delete action still kept old remote non-public posts. I added an option to keep this behaviour, but this also means that you now have to explicitly provide that option. **This could be considered a breaking change!**
* ~~Note that this removes from the objects table, but not from the activities.~~ See https://akkoma.dev/AkkomaGang/akkoma/pulls/427 for that.
Some statistics from explain analyse:
(cost=1402845.92..1933782.00 rows=3810907 width=62) (actual time=2562455.486..2562455.495 rows=0 loops=1)
 Planning Time: 505.327 ms
 Trigger for constraint chat_message_references_object_id_fkey: time=651939.797 calls=921740
 Trigger for constraint deliveries_object_id_fkey: time=52036.009 calls=921740
 Trigger for constraint hashtags_objects_object_id_fkey: time=20665.778 calls=921740
 Execution Time: 3287933.902 ms
***
**TODO**
1. [x] **Question:** Is it OK to keep it like this in regard to quote posts? If not (ie post quoted by local users should also be kept), should we give quotes the same context as the post they are quoting? (If we don't want to give them the same context, I'll have to see how/if I can do it without being too costly)
    * See https://akkoma.dev/AkkomaGang/akkoma/pulls/379
2. [x] **Question:** the "original" query only deletes public posts (this is undocumented, but you can check the code). This new one doesn't care for scope. From the docs I get that the idea is that posts can be refetched when needed. But I have from a trusted source that Pleroma can't refetch non-public posts. I assume that's the reason why they are kept here. I see different options to deal with this
    1. ~~We keep it as currently implemented and just don't care about scope with this option~~
    2. ~~We add logic to not delete non-public posts either (I'll have to see how costly that becomes)~~
    3. We add an extra --keep-non-public parameter. This is technically speaking breakage (you didn't have to provide a param before for this, now you do), but I'm inclined to not care much because it wasn't documented nor tested in the first place.
3. [x] See if we can do the query using Elixir
4. [x] Test on a bigger DB to see that we don't run into a timeout
5. [x] Add docs
Co-authored-by: ilja <git@ilja.space>
Reviewed-on: https://akkoma.dev/AkkomaGang/akkoma/pulls/350
Co-authored-by: ilja <akkoma.dev@ilja.space>
Co-committed-by: ilja <akkoma.dev@ilja.space>
		
	
			
		
			
				
	
	
		
			327 lines
		
	
	
	
		
			9.6 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
			
		
		
	
	
			327 lines
		
	
	
	
		
			9.6 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
| # Pleroma: A lightweight social networking server
 | |
| # Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
 | |
| # SPDX-License-Identifier: AGPL-3.0-only
 | |
| 
 | |
| defmodule Mix.Tasks.Pleroma.Database do
 | |
|   alias Pleroma.Conversation
 | |
|   alias Pleroma.Maintenance
 | |
|   alias Pleroma.Object
 | |
|   alias Pleroma.Repo
 | |
|   alias Pleroma.User
 | |
| 
 | |
|   require Logger
 | |
|   require Pleroma.Constants
 | |
| 
 | |
|   import Ecto.Query
 | |
|   import Mix.Pleroma
 | |
| 
 | |
|   use Mix.Task
 | |
| 
 | |
|   @shortdoc "A collection of database related tasks"
 | |
|   @moduledoc File.read!("docs/docs/administration/CLI_tasks/database.md")
 | |
| 
 | |
|   def run(["remove_embedded_objects" | args]) do
 | |
|     {options, [], []} =
 | |
|       OptionParser.parse(
 | |
|         args,
 | |
|         strict: [
 | |
|           vacuum: :boolean
 | |
|         ]
 | |
|       )
 | |
| 
 | |
|     start_pleroma()
 | |
|     Logger.info("Removing embedded objects")
 | |
| 
 | |
|     Repo.query!(
 | |
|       "update activities set data = safe_jsonb_set(data, '{object}'::text[], data->'object'->'id') where data->'object'->>'id' is not null;",
 | |
|       [],
 | |
|       timeout: :infinity
 | |
|     )
 | |
| 
 | |
|     if Keyword.get(options, :vacuum) do
 | |
|       Maintenance.vacuum("full")
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   def run(["bump_all_conversations"]) do
 | |
|     start_pleroma()
 | |
|     Conversation.bump_for_all_activities()
 | |
|   end
 | |
| 
 | |
|   def run(["update_users_following_followers_counts"]) do
 | |
|     start_pleroma()
 | |
| 
 | |
|     Repo.transaction(
 | |
|       fn ->
 | |
|         from(u in User, select: u)
 | |
|         |> Repo.stream()
 | |
|         |> Stream.each(&User.update_follower_count/1)
 | |
|         |> Stream.run()
 | |
|       end,
 | |
|       timeout: :infinity
 | |
|     )
 | |
|   end
 | |
| 
 | |
|   def run(["prune_objects" | args]) do
 | |
|     {options, [], []} =
 | |
|       OptionParser.parse(
 | |
|         args,
 | |
|         strict: [
 | |
|           vacuum: :boolean,
 | |
|           keep_threads: :boolean,
 | |
|           keep_non_public: :boolean
 | |
|         ]
 | |
|       )
 | |
| 
 | |
|     start_pleroma()
 | |
| 
 | |
|     deadline = Pleroma.Config.get([:instance, :remote_post_retention_days])
 | |
|     time_deadline = NaiveDateTime.utc_now() |> NaiveDateTime.add(-(deadline * 86_400))
 | |
| 
 | |
|     log_message = "Pruning objects older than #{deadline} days"
 | |
| 
 | |
|     log_message =
 | |
|       if Keyword.get(options, :keep_non_public) do
 | |
|         log_message <> ", keeping non public posts"
 | |
|       else
 | |
|         log_message
 | |
|       end
 | |
| 
 | |
|     log_message =
 | |
|       if Keyword.get(options, :keep_threads) do
 | |
|         log_message <> ", keeping threads intact"
 | |
|       else
 | |
|         log_message
 | |
|       end
 | |
| 
 | |
|     Logger.info(log_message)
 | |
| 
 | |
|     if Keyword.get(options, :keep_threads) do
 | |
|       # We want to delete objects from threads where
 | |
|       # 1. the newest post is still old
 | |
|       # 2. none of the activities is local
 | |
|       # 3. none of the activities is bookmarked
 | |
|       # 4. optionally none of the posts is non-public
 | |
|       deletable_context =
 | |
|         if Keyword.get(options, :keep_non_public) do
 | |
|           Pleroma.Activity
 | |
|           |> join(:left, [a], b in Pleroma.Bookmark, on: a.id == b.activity_id)
 | |
|           |> group_by([a], fragment("? ->> 'context'::text", a.data))
 | |
|           |> having(
 | |
|             [a],
 | |
|             not fragment(
 | |
|               # Posts (checked on Create Activity) is non-public
 | |
|               "bool_or((not(?->'to' \\? ? OR ?->'cc' \\? ?)) and ? ->> 'type' = 'Create')",
 | |
|               a.data,
 | |
|               ^Pleroma.Constants.as_public(),
 | |
|               a.data,
 | |
|               ^Pleroma.Constants.as_public(),
 | |
|               a.data
 | |
|             )
 | |
|           )
 | |
|         else
 | |
|           Pleroma.Activity
 | |
|           |> join(:left, [a], b in Pleroma.Bookmark, on: a.id == b.activity_id)
 | |
|           |> group_by([a], fragment("? ->> 'context'::text", a.data))
 | |
|         end
 | |
|         |> having([a], max(a.updated_at) < ^time_deadline)
 | |
|         |> having([a], not fragment("bool_or(?)", a.local))
 | |
|         |> having([_, b], fragment("max(?::text) is null", b.id))
 | |
|         |> select([a], fragment("? ->> 'context'::text", a.data))
 | |
| 
 | |
|       Pleroma.Object
 | |
|       |> where([o], fragment("? ->> 'context'::text", o.data) in subquery(deletable_context))
 | |
|     else
 | |
|       if Keyword.get(options, :keep_non_public) do
 | |
|         Pleroma.Object
 | |
|         |> where(
 | |
|           [o],
 | |
|           fragment(
 | |
|             "?->'to' \\? ? OR ?->'cc' \\? ?",
 | |
|             o.data,
 | |
|             ^Pleroma.Constants.as_public(),
 | |
|             o.data,
 | |
|             ^Pleroma.Constants.as_public()
 | |
|           )
 | |
|         )
 | |
|       else
 | |
|         Pleroma.Object
 | |
|       end
 | |
|       |> where([o], o.updated_at < ^time_deadline)
 | |
|       |> where(
 | |
|         [o],
 | |
|         fragment("split_part(?->>'actor', '/', 3) != ?", o.data, ^Pleroma.Web.Endpoint.host())
 | |
|       )
 | |
|     end
 | |
|     |> Repo.delete_all(timeout: :infinity)
 | |
| 
 | |
|     prune_hashtags_query = """
 | |
|     DELETE FROM hashtags AS ht
 | |
|     WHERE NOT EXISTS (
 | |
|       SELECT 1 FROM hashtags_objects hto
 | |
|       WHERE ht.id = hto.hashtag_id)
 | |
|     """
 | |
| 
 | |
|     Repo.query(prune_hashtags_query)
 | |
| 
 | |
|     if Keyword.get(options, :vacuum) do
 | |
|       Maintenance.vacuum("full")
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   def run(["prune_task"]) do
 | |
|     start_pleroma()
 | |
| 
 | |
|     nil
 | |
|     |> Pleroma.Workers.Cron.PruneDatabaseWorker.perform()
 | |
|   end
 | |
| 
 | |
|   def run(["fix_likes_collections"]) do
 | |
|     start_pleroma()
 | |
| 
 | |
|     from(object in Object,
 | |
|       where: fragment("(?)->>'likes' is not null", object.data),
 | |
|       select: %{id: object.id, likes: fragment("(?)->>'likes'", object.data)}
 | |
|     )
 | |
|     |> Pleroma.Repo.chunk_stream(100, :batches)
 | |
|     |> Stream.each(fn objects ->
 | |
|       ids =
 | |
|         objects
 | |
|         |> Enum.filter(fn object -> object.likes |> Jason.decode!() |> is_map() end)
 | |
|         |> Enum.map(& &1.id)
 | |
| 
 | |
|       Object
 | |
|       |> where([object], object.id in ^ids)
 | |
|       |> update([object],
 | |
|         set: [
 | |
|           data:
 | |
|             fragment(
 | |
|               "safe_jsonb_set(?, '{likes}', '[]'::jsonb, true)",
 | |
|               object.data
 | |
|             )
 | |
|         ]
 | |
|       )
 | |
|       |> Repo.update_all([], timeout: :infinity)
 | |
|     end)
 | |
|     |> Stream.run()
 | |
|   end
 | |
| 
 | |
|   def run(["vacuum", args]) do
 | |
|     start_pleroma()
 | |
| 
 | |
|     Maintenance.vacuum(args)
 | |
|   end
 | |
| 
 | |
|   def run(["ensure_expiration"]) do
 | |
|     start_pleroma()
 | |
|     days = Pleroma.Config.get([:mrf_activity_expiration, :days], 365)
 | |
| 
 | |
|     Pleroma.Activity
 | |
|     |> join(:inner, [a], o in Object,
 | |
|       on:
 | |
|         fragment(
 | |
|           "(?->>'id') = COALESCE((?)->'object'->> 'id', (?)->>'object')",
 | |
|           o.data,
 | |
|           a.data,
 | |
|           a.data
 | |
|         )
 | |
|     )
 | |
|     |> where(local: true)
 | |
|     |> where([a], fragment("(? ->> 'type'::text) = 'Create'", a.data))
 | |
|     |> where([_a, o], fragment("?->>'type' = 'Note'", o.data))
 | |
|     |> Pleroma.Repo.chunk_stream(100, :batches)
 | |
|     |> Stream.each(fn activities ->
 | |
|       Enum.each(activities, fn activity ->
 | |
|         expires_at =
 | |
|           activity.inserted_at
 | |
|           |> DateTime.from_naive!("Etc/UTC")
 | |
|           |> Timex.shift(days: days)
 | |
| 
 | |
|         Pleroma.Workers.PurgeExpiredActivity.enqueue(%{
 | |
|           activity_id: activity.id,
 | |
|           expires_at: expires_at
 | |
|         })
 | |
|       end)
 | |
|     end)
 | |
|     |> Stream.run()
 | |
|   end
 | |
| 
 | |
|   def run(["set_text_search_config", tsconfig]) do
 | |
|     start_pleroma()
 | |
|     %{rows: [[tsc]]} = Ecto.Adapters.SQL.query!(Pleroma.Repo, "SHOW default_text_search_config;")
 | |
|     shell_info("Current default_text_search_config: #{tsc}")
 | |
| 
 | |
|     %{rows: [[db]]} = Ecto.Adapters.SQL.query!(Pleroma.Repo, "SELECT current_database();")
 | |
|     shell_info("Update default_text_search_config: #{tsconfig}")
 | |
| 
 | |
|     %{messages: msg} =
 | |
|       Ecto.Adapters.SQL.query!(
 | |
|         Pleroma.Repo,
 | |
|         "ALTER DATABASE #{db} SET default_text_search_config = '#{tsconfig}';"
 | |
|       )
 | |
| 
 | |
|     # non-exist config will not raise excpetion but only give >0 messages
 | |
|     if length(msg) > 0 do
 | |
|       shell_info("Error: #{inspect(msg, pretty: true)}")
 | |
|     else
 | |
|       rum_enabled = Pleroma.Config.get([:database, :rum_enabled])
 | |
|       shell_info("Recreate index, RUM: #{rum_enabled}")
 | |
| 
 | |
|       # Note SQL below needs to be kept up-to-date with latest GIN or RUM index definition in future
 | |
|       if rum_enabled do
 | |
|         Ecto.Adapters.SQL.query!(
 | |
|           Pleroma.Repo,
 | |
|           "CREATE OR REPLACE FUNCTION objects_fts_update() RETURNS trigger AS $$ BEGIN
 | |
|           new.fts_content := to_tsvector(new.data->>'content');
 | |
|           RETURN new;
 | |
|           END
 | |
|           $$ LANGUAGE plpgsql",
 | |
|           [],
 | |
|           timeout: :infinity
 | |
|         )
 | |
| 
 | |
|         shell_info("Refresh RUM index")
 | |
|         Ecto.Adapters.SQL.query!(Pleroma.Repo, "UPDATE objects SET updated_at = NOW();")
 | |
|       else
 | |
|         Ecto.Adapters.SQL.query!(Pleroma.Repo, "DROP INDEX IF EXISTS objects_fts;")
 | |
| 
 | |
|         Ecto.Adapters.SQL.query!(
 | |
|           Pleroma.Repo,
 | |
|           "CREATE INDEX CONCURRENTLY objects_fts ON objects USING gin(to_tsvector('#{tsconfig}', data->>'content')); ",
 | |
|           [],
 | |
|           timeout: :infinity
 | |
|         )
 | |
|       end
 | |
| 
 | |
|       shell_info('Done.')
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   # Rolls back a specific migration (leaving subsequent migrations applied).
 | |
|   # WARNING: imposes a risk of unrecoverable data loss — proceed at your own responsibility.
 | |
|   # Based on https://stackoverflow.com/a/53825840
 | |
|   def run(["rollback", version]) do
 | |
|     prompt = "SEVERE WARNING: this operation may result in unrecoverable data loss. Continue?"
 | |
| 
 | |
|     if shell_prompt(prompt, "n") in ~w(Yn Y y) do
 | |
|       {_, result, _} =
 | |
|         Ecto.Migrator.with_repo(Pleroma.Repo, fn repo ->
 | |
|           version = String.to_integer(version)
 | |
|           re = ~r/^#{version}_.*\.exs/
 | |
|           path = Ecto.Migrator.migrations_path(repo)
 | |
| 
 | |
|           with {_, "" <> file} <- {:find, Enum.find(File.ls!(path), &String.match?(&1, re))},
 | |
|                {_, [{mod, _} | _]} <- {:compile, Code.compile_file(Path.join(path, file))},
 | |
|                {_, :ok} <- {:rollback, Ecto.Migrator.down(repo, version, mod)} do
 | |
|             {:ok, "Reversed migration: #{file}"}
 | |
|           else
 | |
|             {:find, _} -> {:error, "No migration found with version prefix: #{version}"}
 | |
|             {:compile, e} -> {:error, "Problem compiling migration module: #{inspect(e)}"}
 | |
|             {:rollback, e} -> {:error, "Problem reversing migration: #{inspect(e)}"}
 | |
|           end
 | |
|         end)
 | |
| 
 | |
|       shell_info(inspect(result))
 | |
|     end
 | |
|   end
 | |
| end
 |