227 lines
		
	
	
	
		
			5.7 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
			
		
		
	
	
			227 lines
		
	
	
	
		
			5.7 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
# Pleroma: A lightweight social networking server
 | 
						|
# Copyright © 2017-2021 Pleroma Authors <https://pleroma.social/>
 | 
						|
# SPDX-License-Identifier: AGPL-3.0-only
 | 
						|
 | 
						|
defmodule Pleroma.Conversation.Participation do
 | 
						|
  use Ecto.Schema
 | 
						|
  alias Pleroma.Conversation
 | 
						|
  alias Pleroma.Conversation.Participation.RecipientShip
 | 
						|
  alias Pleroma.Repo
 | 
						|
  alias Pleroma.User
 | 
						|
  alias Pleroma.Web.ActivityPub.ActivityPub
 | 
						|
  import Ecto.Changeset
 | 
						|
  import Ecto.Query
 | 
						|
 | 
						|
  schema "conversation_participations" do
 | 
						|
    belongs_to(:user, User, type: FlakeId.Ecto.CompatType)
 | 
						|
    belongs_to(:conversation, Conversation)
 | 
						|
    field(:read, :boolean, default: false)
 | 
						|
    field(:last_activity_id, FlakeId.Ecto.CompatType, virtual: true)
 | 
						|
 | 
						|
    has_many(:recipient_ships, RecipientShip)
 | 
						|
    has_many(:recipients, through: [:recipient_ships, :user])
 | 
						|
 | 
						|
    timestamps()
 | 
						|
  end
 | 
						|
 | 
						|
  def creation_cng(struct, params) do
 | 
						|
    struct
 | 
						|
    |> cast(params, [:user_id, :conversation_id, :read])
 | 
						|
    |> validate_required([:user_id, :conversation_id])
 | 
						|
  end
 | 
						|
 | 
						|
  def create_for_user_and_conversation(user, conversation, opts \\ []) do
 | 
						|
    read = !!opts[:read]
 | 
						|
    invisible_conversation = !!opts[:invisible_conversation]
 | 
						|
 | 
						|
    update_on_conflict =
 | 
						|
      if(invisible_conversation, do: [], else: [read: read])
 | 
						|
      |> Keyword.put(:updated_at, NaiveDateTime.utc_now())
 | 
						|
 | 
						|
    %__MODULE__{}
 | 
						|
    |> creation_cng(%{
 | 
						|
      user_id: user.id,
 | 
						|
      conversation_id: conversation.id,
 | 
						|
      read: invisible_conversation || read
 | 
						|
    })
 | 
						|
    |> Repo.insert(
 | 
						|
      on_conflict: [set: update_on_conflict],
 | 
						|
      returning: true,
 | 
						|
      conflict_target: [:user_id, :conversation_id]
 | 
						|
    )
 | 
						|
  end
 | 
						|
 | 
						|
  def read_cng(struct, params) do
 | 
						|
    struct
 | 
						|
    |> cast(params, [:read])
 | 
						|
    |> validate_required([:read])
 | 
						|
  end
 | 
						|
 | 
						|
  def mark_as_read(%User{} = user, %Conversation{} = conversation) do
 | 
						|
    with %__MODULE__{} = participation <- for_user_and_conversation(user, conversation) do
 | 
						|
      mark_as_read(participation)
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  def mark_as_read(%__MODULE__{} = participation) do
 | 
						|
    participation
 | 
						|
    |> change(read: true)
 | 
						|
    |> Repo.update()
 | 
						|
  end
 | 
						|
 | 
						|
  def mark_all_as_read(%User{local: true} = user, %User{} = target_user) do
 | 
						|
    target_conversation_ids =
 | 
						|
      __MODULE__
 | 
						|
      |> where([p], p.user_id == ^target_user.id)
 | 
						|
      |> select([p], p.conversation_id)
 | 
						|
      |> Repo.all()
 | 
						|
 | 
						|
    __MODULE__
 | 
						|
    |> where([p], p.user_id == ^user.id)
 | 
						|
    |> where([p], p.conversation_id in ^target_conversation_ids)
 | 
						|
    |> update([p], set: [read: true])
 | 
						|
    |> Repo.update_all([])
 | 
						|
 | 
						|
    {:ok, user, []}
 | 
						|
  end
 | 
						|
 | 
						|
  def mark_all_as_read(%User{} = user, %User{}), do: {:ok, user, []}
 | 
						|
 | 
						|
  def mark_all_as_read(%User{} = user) do
 | 
						|
    {_, participations} =
 | 
						|
      __MODULE__
 | 
						|
      |> where([p], p.user_id == ^user.id)
 | 
						|
      |> where([p], not p.read)
 | 
						|
      |> update([p], set: [read: true])
 | 
						|
      |> select([p], p)
 | 
						|
      |> Repo.update_all([])
 | 
						|
 | 
						|
    {:ok, user, participations}
 | 
						|
  end
 | 
						|
 | 
						|
  def mark_as_unread(participation) do
 | 
						|
    participation
 | 
						|
    |> read_cng(%{read: false})
 | 
						|
    |> Repo.update()
 | 
						|
  end
 | 
						|
 | 
						|
  def for_user(user, params \\ %{}) do
 | 
						|
    from(p in __MODULE__,
 | 
						|
      where: p.user_id == ^user.id,
 | 
						|
      order_by: [desc: p.updated_at],
 | 
						|
      preload: [conversation: [:users]]
 | 
						|
    )
 | 
						|
    |> restrict_recipients(user, params)
 | 
						|
    |> Pleroma.Pagination.fetch_paginated(params)
 | 
						|
  end
 | 
						|
 | 
						|
  def restrict_recipients(query, user, %{recipients: user_ids}) do
 | 
						|
    user_binary_ids =
 | 
						|
      [user.id | user_ids]
 | 
						|
      |> Enum.uniq()
 | 
						|
      |> User.binary_id()
 | 
						|
 | 
						|
    conversation_subquery =
 | 
						|
      __MODULE__
 | 
						|
      |> group_by([p], p.conversation_id)
 | 
						|
      |> having(
 | 
						|
        [p],
 | 
						|
        count(p.user_id) == ^length(user_binary_ids) and
 | 
						|
          fragment("array_agg(?) @> ?", p.user_id, ^user_binary_ids)
 | 
						|
      )
 | 
						|
      |> select([p], %{id: p.conversation_id})
 | 
						|
 | 
						|
    query
 | 
						|
    |> join(:inner, [p], c in subquery(conversation_subquery), on: p.conversation_id == c.id)
 | 
						|
  end
 | 
						|
 | 
						|
  def restrict_recipients(query, _, _), do: query
 | 
						|
 | 
						|
  def for_user_and_conversation(user, conversation) do
 | 
						|
    from(p in __MODULE__,
 | 
						|
      where: p.user_id == ^user.id,
 | 
						|
      where: p.conversation_id == ^conversation.id
 | 
						|
    )
 | 
						|
    |> Repo.one()
 | 
						|
  end
 | 
						|
 | 
						|
  def for_user_with_last_activity_id(user, params \\ %{}) do
 | 
						|
    for_user(user, params)
 | 
						|
    |> Enum.map(fn participation ->
 | 
						|
      activity_id =
 | 
						|
        ActivityPub.fetch_latest_direct_activity_id_for_context(
 | 
						|
          participation.conversation.ap_id,
 | 
						|
          %{
 | 
						|
            user: user,
 | 
						|
            blocking_user: user
 | 
						|
          }
 | 
						|
        )
 | 
						|
 | 
						|
      %{
 | 
						|
        participation
 | 
						|
        | last_activity_id: activity_id
 | 
						|
      }
 | 
						|
    end)
 | 
						|
    |> Enum.reject(&is_nil(&1.last_activity_id))
 | 
						|
  end
 | 
						|
 | 
						|
  def get(_, _ \\ [])
 | 
						|
  def get(nil, _), do: nil
 | 
						|
 | 
						|
  def get(id, params) do
 | 
						|
    query =
 | 
						|
      if preload = params[:preload] do
 | 
						|
        from(p in __MODULE__,
 | 
						|
          preload: ^preload
 | 
						|
        )
 | 
						|
      else
 | 
						|
        __MODULE__
 | 
						|
      end
 | 
						|
 | 
						|
    Repo.get(query, id)
 | 
						|
  end
 | 
						|
 | 
						|
  def set_recipients(participation, user_ids) do
 | 
						|
    user_ids =
 | 
						|
      [participation.user_id | user_ids]
 | 
						|
      |> Enum.uniq()
 | 
						|
 | 
						|
    Repo.transaction(fn ->
 | 
						|
      query =
 | 
						|
        from(r in RecipientShip,
 | 
						|
          where: r.participation_id == ^participation.id
 | 
						|
        )
 | 
						|
 | 
						|
      Repo.delete_all(query)
 | 
						|
 | 
						|
      users =
 | 
						|
        from(u in User,
 | 
						|
          where: u.id in ^user_ids
 | 
						|
        )
 | 
						|
        |> Repo.all()
 | 
						|
 | 
						|
      RecipientShip.create(users, participation)
 | 
						|
      :ok
 | 
						|
    end)
 | 
						|
 | 
						|
    {:ok, Repo.preload(participation, :recipients, force: true)}
 | 
						|
  end
 | 
						|
 | 
						|
  @spec unread_count(User.t()) :: integer()
 | 
						|
  def unread_count(%User{id: user_id}) do
 | 
						|
    from(q in __MODULE__, where: q.user_id == ^user_id and q.read == false)
 | 
						|
    |> Repo.aggregate(:count, :id)
 | 
						|
  end
 | 
						|
 | 
						|
  def unread_conversation_count_for_user(user) do
 | 
						|
    from(p in __MODULE__,
 | 
						|
      where: p.user_id == ^user.id,
 | 
						|
      where: not p.read,
 | 
						|
      select: %{count: count(p.id)}
 | 
						|
    )
 | 
						|
  end
 | 
						|
 | 
						|
  def delete(%__MODULE__{} = participation) do
 | 
						|
    Repo.delete(participation)
 | 
						|
  end
 | 
						|
end
 |