Until now only a limited number of self-replies were inlined as an anonymous, unordered ActivityPub collection. Notably the advertised replies might be private posts. However, providing all (non-private) replies allows for better thread consistency across instances if the remote server cooperates. The collection existing as a stndalone object has two advantages for this. For one, if it was still anonymous, _all_ replies would need to be inlined, which might be too bloated in pathological cases. Secondly, it allows remote servers to update the thread by traversing the reply collection independent of the original post. (If the remote part knows about chronological ordering, it can in theory even efficiently resume from where it previously stopped)
402 lines
11 KiB
Elixir
402 lines
11 KiB
Elixir
# Pleroma: A lightweight social networking server
|
|
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
|
|
# SPDX-License-Identifier: AGPL-3.0-only
|
|
|
|
defmodule Pleroma.Object do
|
|
use Ecto.Schema
|
|
|
|
import Ecto.Query
|
|
import Ecto.Changeset
|
|
|
|
alias Pleroma.Activity
|
|
alias Pleroma.Hashtag
|
|
alias Pleroma.Object
|
|
alias Pleroma.Object.Fetcher
|
|
alias Pleroma.ObjectTombstone
|
|
alias Pleroma.Repo
|
|
alias Pleroma.User
|
|
alias Pleroma.Workers.AttachmentsCleanupWorker
|
|
|
|
require Logger
|
|
|
|
@type t() :: %__MODULE__{}
|
|
|
|
@derive {Jason.Encoder, only: [:data]}
|
|
|
|
@cachex Pleroma.Config.get([:cachex, :provider], Cachex)
|
|
|
|
schema "objects" do
|
|
field(:data, :map)
|
|
|
|
many_to_many(:hashtags, Hashtag, join_through: "hashtags_objects", on_replace: :delete)
|
|
|
|
timestamps()
|
|
end
|
|
|
|
def with_joined_activity(query, activity_type \\ "Create", join_type \\ :inner) do
|
|
object_position = Map.get(query.aliases, :object, 0)
|
|
|
|
join(query, join_type, [{object, object_position}], a in Activity,
|
|
on:
|
|
fragment(
|
|
"COALESCE(?->'object'->>'id', ?->>'object') = (? ->> 'id') AND (?->>'type' = ?) ",
|
|
a.data,
|
|
a.data,
|
|
object.data,
|
|
a.data,
|
|
^activity_type
|
|
),
|
|
as: :object_activity
|
|
)
|
|
end
|
|
|
|
def create(data) do
|
|
%Object{}
|
|
|> Object.change(%{data: data})
|
|
|> Repo.insert()
|
|
end
|
|
|
|
def change(struct, params \\ %{}) do
|
|
struct
|
|
|> cast(params, [:data])
|
|
|> validate_required([:data])
|
|
|> unique_constraint(:ap_id, name: :objects_unique_apid_index)
|
|
# Expecting `maybe_handle_hashtags_change/1` to run last:
|
|
|> maybe_handle_hashtags_change(struct)
|
|
end
|
|
|
|
# Note: not checking activity type (assuming non-legacy objects are associated with Create act.)
|
|
defp maybe_handle_hashtags_change(changeset, struct) do
|
|
with %Ecto.Changeset{valid?: true} <- changeset,
|
|
data_hashtags_change = get_change(changeset, :data),
|
|
{_, true} <- {:changed, hashtags_changed?(struct, data_hashtags_change)},
|
|
{:ok, hashtag_records} <-
|
|
data_hashtags_change
|
|
|> object_data_hashtags()
|
|
|> Hashtag.get_or_create_by_names() do
|
|
put_assoc(changeset, :hashtags, hashtag_records)
|
|
else
|
|
%{valid?: false} ->
|
|
changeset
|
|
|
|
{:changed, false} ->
|
|
changeset
|
|
|
|
{:error, _} ->
|
|
validate_change(changeset, :data, fn _, _ ->
|
|
[data: "error referencing hashtags"]
|
|
end)
|
|
end
|
|
end
|
|
|
|
defp hashtags_changed?(%Object{} = struct, %{"tag" => _} = data) do
|
|
Enum.sort(embedded_hashtags(struct)) !=
|
|
Enum.sort(object_data_hashtags(data))
|
|
end
|
|
|
|
defp hashtags_changed?(_, _), do: false
|
|
|
|
def get_by_id(nil), do: nil
|
|
def get_by_id(id), do: Repo.get(Object, id)
|
|
|
|
def get_by_id_and_maybe_refetch(id, opts \\ []) do
|
|
%{updated_at: updated_at} = object = get_by_id(id)
|
|
|
|
if opts[:interval] &&
|
|
NaiveDateTime.diff(NaiveDateTime.utc_now(), updated_at) > opts[:interval] do
|
|
case Fetcher.refetch_object(object) do
|
|
{:ok, %Object{} = object} ->
|
|
object
|
|
|
|
e ->
|
|
Logger.error("Couldn't refresh #{object.data["id"]}:\n#{inspect(e)}")
|
|
object
|
|
end
|
|
else
|
|
object
|
|
end
|
|
end
|
|
|
|
def get_by_ap_id(nil), do: nil
|
|
|
|
def get_by_ap_id(ap_id) do
|
|
Repo.one(from(object in Object, where: fragment("(?)->>'id' = ?", object.data, ^ap_id)))
|
|
end
|
|
|
|
@doc """
|
|
Get a single attachment by it's name and href
|
|
"""
|
|
@spec get_attachment_by_name_and_href(String.t(), String.t()) :: Object.t() | nil
|
|
def get_attachment_by_name_and_href(name, href) do
|
|
query =
|
|
from(o in Object,
|
|
where: fragment("(?)->>'name' = ?", o.data, ^name),
|
|
where: fragment("(?)->>'href' = ?", o.data, ^href)
|
|
)
|
|
|
|
Repo.one(query)
|
|
end
|
|
|
|
defp warn_on_no_object_preloaded(ap_id) do
|
|
"Object.normalize() called without preloaded object (#{inspect(ap_id)}). Consider preloading the object"
|
|
|> Logger.debug()
|
|
|
|
Logger.debug("Backtrace: #{inspect(Process.info(:erlang.self(), :current_stacktrace))}")
|
|
end
|
|
|
|
def normalize(_, options \\ [fetch: false, id_only: false])
|
|
|
|
# If we pass an Activity to Object.normalize(), we can try to use the preloaded object.
|
|
# Use this whenever possible, especially when walking graphs in an O(N) loop!
|
|
def normalize(%Object{} = object, _), do: object
|
|
def normalize(%Activity{object: %Object{} = object}, _), do: object
|
|
|
|
# A hack for fake activities
|
|
def normalize(%Activity{data: %{"object" => %{"fake" => true} = data}}, _) do
|
|
%Object{id: "pleroma:fake_object_id", data: data}
|
|
end
|
|
|
|
# No preloaded object
|
|
def normalize(%Activity{data: %{"object" => %{"id" => ap_id}}}, options) do
|
|
warn_on_no_object_preloaded(ap_id)
|
|
normalize(ap_id, options)
|
|
end
|
|
|
|
# No preloaded object
|
|
def normalize(%Activity{data: %{"object" => ap_id}}, options) do
|
|
warn_on_no_object_preloaded(ap_id)
|
|
normalize(ap_id, options)
|
|
end
|
|
|
|
# Old way, try fetching the object through cache.
|
|
def normalize(%{"id" => ap_id}, options), do: normalize(ap_id, options)
|
|
|
|
def normalize(ap_id, options) when is_binary(ap_id) do
|
|
cond do
|
|
Keyword.get(options, :id_only) ->
|
|
ap_id
|
|
|
|
Keyword.get(options, :fetch) ->
|
|
case Fetcher.fetch_object_from_id(ap_id, options) do
|
|
{:ok, object} -> object
|
|
_ -> nil
|
|
end
|
|
|
|
true ->
|
|
get_cached_by_ap_id(ap_id)
|
|
end
|
|
end
|
|
|
|
def normalize(_, _), do: nil
|
|
|
|
# Owned objects can only be accessed by their owner
|
|
def authorize_access(%Object{data: %{"actor" => actor}}, %User{ap_id: ap_id}) do
|
|
if actor == ap_id do
|
|
:ok
|
|
else
|
|
{:error, :forbidden}
|
|
end
|
|
end
|
|
|
|
# Legacy objects can be accessed by anybody
|
|
def authorize_access(%Object{}, %User{}), do: :ok
|
|
|
|
@spec get_cached_by_ap_id(String.t()) :: Object.t() | nil
|
|
def get_cached_by_ap_id(ap_id) do
|
|
key = "object:#{ap_id}"
|
|
|
|
with {:ok, nil} <- @cachex.get(:object_cache, key),
|
|
object when not is_nil(object) <- get_by_ap_id(ap_id),
|
|
{:ok, true} <- @cachex.put(:object_cache, key, object) do
|
|
object
|
|
else
|
|
{:ok, object} -> object
|
|
nil -> nil
|
|
end
|
|
end
|
|
|
|
# Intentionally accepts non-Object arguments!
|
|
@spec tombstone_object?(term()) :: boolean()
|
|
def tombstone_object?(%Object{data: %{"type" => "Tombstone"}}), do: true
|
|
def tombstone_object?(_), do: false
|
|
|
|
def make_tombstone(%Object{data: %{"id" => id, "type" => type}}, deleted \\ DateTime.utc_now()) do
|
|
%ObjectTombstone{
|
|
id: id,
|
|
formerType: type,
|
|
deleted: deleted
|
|
}
|
|
|> Map.from_struct()
|
|
end
|
|
|
|
def swap_object_with_tombstone(object) do
|
|
tombstone = make_tombstone(object)
|
|
|
|
with {:ok, object} <-
|
|
object
|
|
|> Object.change(%{data: tombstone})
|
|
|> Repo.update() do
|
|
Hashtag.unlink(object)
|
|
{:ok, object}
|
|
end
|
|
end
|
|
|
|
def delete(%Object{data: %{"id" => id}} = object) do
|
|
with {:ok, _obj} = swap_object_with_tombstone(object),
|
|
deleted_activity = Activity.delete_all_by_object_ap_id(id),
|
|
{:ok, _} <- invalid_object_cache(object) do
|
|
AttachmentsCleanupWorker.enqueue_if_needed(object.data)
|
|
{:ok, object, deleted_activity}
|
|
end
|
|
end
|
|
|
|
def prune(%Object{data: %{"id" => _id}} = object) do
|
|
with {:ok, object} <- Repo.delete(object),
|
|
{:ok, _} <- invalid_object_cache(object) do
|
|
{:ok, object}
|
|
end
|
|
end
|
|
|
|
def invalid_object_cache(%Object{data: %{"id" => id}}) do
|
|
with {:ok, true} <- @cachex.del(:object_cache, "object:#{id}") do
|
|
@cachex.del(:web_resp_cache, URI.parse(id).path)
|
|
end
|
|
end
|
|
|
|
def set_cache(%Object{data: %{"id" => ap_id}} = object) do
|
|
@cachex.put(:object_cache, "object:#{ap_id}", object)
|
|
{:ok, object}
|
|
end
|
|
|
|
def update_and_set_cache(changeset) do
|
|
with {:ok, object} <- Repo.update(changeset) do
|
|
set_cache(object)
|
|
end
|
|
end
|
|
|
|
def increase_replies_count(ap_id) do
|
|
Object
|
|
|> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
|
|
|> update([o],
|
|
set: [
|
|
data:
|
|
fragment(
|
|
"""
|
|
safe_jsonb_set(?, '{repliesCount}',
|
|
(coalesce((?->>'repliesCount')::int, 0) + 1)::varchar::jsonb, true)
|
|
""",
|
|
o.data,
|
|
o.data
|
|
)
|
|
]
|
|
)
|
|
|> Repo.update_all([])
|
|
|> case do
|
|
{1, [object]} -> set_cache(object)
|
|
_ -> {:error, "Not found"}
|
|
end
|
|
end
|
|
|
|
defp poll_is_multiple?(%Object{data: %{"anyOf" => [_ | _]}}), do: true
|
|
|
|
defp poll_is_multiple?(_), do: false
|
|
|
|
def decrease_replies_count(ap_id) do
|
|
Object
|
|
|> where([o], fragment("?->>'id' = ?::text", o.data, ^to_string(ap_id)))
|
|
|> update([o],
|
|
set: [
|
|
data:
|
|
fragment(
|
|
"""
|
|
safe_jsonb_set(?, '{repliesCount}',
|
|
(greatest(0, (?->>'repliesCount')::int - 1))::varchar::jsonb, true)
|
|
""",
|
|
o.data,
|
|
o.data
|
|
)
|
|
]
|
|
)
|
|
|> Repo.update_all([])
|
|
|> case do
|
|
{1, [object]} -> set_cache(object)
|
|
_ -> {:error, "Not found"}
|
|
end
|
|
end
|
|
|
|
def increase_vote_count(ap_id, name, actor) do
|
|
with %Object{} = object <- Object.normalize(ap_id, fetch: false),
|
|
"Question" <- object.data["type"] do
|
|
key = if poll_is_multiple?(object), do: "anyOf", else: "oneOf"
|
|
|
|
options =
|
|
object.data[key]
|
|
|> Enum.map(fn
|
|
%{"name" => ^name} = option ->
|
|
Kernel.update_in(option["replies"]["totalItems"], &(&1 + 1))
|
|
|
|
option ->
|
|
option
|
|
end)
|
|
|
|
voters = [actor | object.data["voters"] || []] |> Enum.uniq()
|
|
|
|
data =
|
|
object.data
|
|
|> Map.put(key, options)
|
|
|> Map.put("voters", voters)
|
|
|
|
object
|
|
|> Object.change(%{data: data})
|
|
|> update_and_set_cache()
|
|
else
|
|
_ -> :noop
|
|
end
|
|
end
|
|
|
|
@doc "Updates data field of an object"
|
|
def update_data(%Object{data: data} = object, attrs \\ %{}) do
|
|
object
|
|
|> Object.change(%{data: Map.merge(data || %{}, attrs)})
|
|
|> Repo.update()
|
|
end
|
|
|
|
def local?(%Object{data: %{"id" => id}}) do
|
|
String.starts_with?(id, Pleroma.Web.Endpoint.url() <> "/")
|
|
end
|
|
|
|
def tags(%Object{data: %{"tag" => tags}}) when is_list(tags), do: tags
|
|
|
|
def tags(_), do: []
|
|
|
|
def hashtags(%Object{} = object) do
|
|
# Note: always using embedded hashtags regardless whether they are migrated to hashtags table
|
|
# (embedded hashtags stay in sync anyways, and we avoid extra joins and preload hassle)
|
|
embedded_hashtags(object)
|
|
end
|
|
|
|
def embedded_hashtags(%Object{data: data}) do
|
|
object_data_hashtags(data)
|
|
end
|
|
|
|
def embedded_hashtags(_), do: []
|
|
|
|
def object_data_hashtags(%{"tag" => tags}) when is_list(tags) do
|
|
tags
|
|
|> Enum.filter(fn
|
|
%{"type" => "Hashtag"} = data -> Map.has_key?(data, "name")
|
|
plain_text when is_bitstring(plain_text) -> true
|
|
_ -> false
|
|
end)
|
|
|> Enum.map(fn
|
|
%{"name" => "#" <> hashtag} -> String.downcase(hashtag)
|
|
%{"name" => hashtag} -> String.downcase(hashtag)
|
|
hashtag when is_bitstring(hashtag) -> String.downcase(hashtag)
|
|
end)
|
|
|> Enum.uniq()
|
|
# Note: "" elements (plain text) might occur in `data.tag` for incoming objects
|
|
|> Enum.filter(&(&1 not in [nil, ""]))
|
|
end
|
|
|
|
def object_data_hashtags(_), do: []
|
|
end
|