220 lines
		
	
	
	
		
			7.3 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
			
		
		
	
	
			220 lines
		
	
	
	
		
			7.3 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
defmodule Pleroma.Web.Streamer.Worker do
 | 
						|
  use GenServer
 | 
						|
 | 
						|
  require Logger
 | 
						|
 | 
						|
  alias Pleroma.Activity
 | 
						|
  alias Pleroma.Config
 | 
						|
  alias Pleroma.Conversation.Participation
 | 
						|
  alias Pleroma.Notification
 | 
						|
  alias Pleroma.Object
 | 
						|
  alias Pleroma.User
 | 
						|
  alias Pleroma.Web.ActivityPub.ActivityPub
 | 
						|
  alias Pleroma.Web.ActivityPub.Visibility
 | 
						|
  alias Pleroma.Web.CommonAPI
 | 
						|
  alias Pleroma.Web.Streamer.State
 | 
						|
  alias Pleroma.Web.Streamer.StreamerSocket
 | 
						|
  alias Pleroma.Web.StreamerView
 | 
						|
 | 
						|
  def start_link(_) do
 | 
						|
    GenServer.start_link(__MODULE__, %{}, [])
 | 
						|
  end
 | 
						|
 | 
						|
  def init(init_arg) do
 | 
						|
    {:ok, init_arg}
 | 
						|
  end
 | 
						|
 | 
						|
  def stream(pid, topics, items) do
 | 
						|
    GenServer.call(pid, {:stream, topics, items})
 | 
						|
  end
 | 
						|
 | 
						|
  def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
 | 
						|
    Enum.each(topics, fn t ->
 | 
						|
      do_stream(%{topic: t, item: item})
 | 
						|
    end)
 | 
						|
 | 
						|
    {:reply, state, state}
 | 
						|
  end
 | 
						|
 | 
						|
  def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
 | 
						|
    Enum.each(items, fn i ->
 | 
						|
      do_stream(%{topic: topic, item: i})
 | 
						|
    end)
 | 
						|
 | 
						|
    {:reply, state, state}
 | 
						|
  end
 | 
						|
 | 
						|
  def handle_call({:stream, topic, item}, _from, state) do
 | 
						|
    do_stream(%{topic: topic, item: item})
 | 
						|
 | 
						|
    {:reply, state, state}
 | 
						|
  end
 | 
						|
 | 
						|
  defp do_stream(%{topic: "direct", item: item}) do
 | 
						|
    recipient_topics =
 | 
						|
      User.get_recipients_from_activity(item)
 | 
						|
      |> Enum.map(fn %{id: id} -> "direct:#{id}" end)
 | 
						|
 | 
						|
    Enum.each(recipient_topics, fn user_topic ->
 | 
						|
      Logger.debug("Trying to push direct message to #{user_topic}\n\n")
 | 
						|
      push_to_socket(State.get_sockets(), user_topic, item)
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  defp do_stream(%{topic: "participation", item: participation}) do
 | 
						|
    user_topic = "direct:#{participation.user_id}"
 | 
						|
    Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
 | 
						|
 | 
						|
    push_to_socket(State.get_sockets(), user_topic, participation)
 | 
						|
  end
 | 
						|
 | 
						|
  defp do_stream(%{topic: "list", item: item}) do
 | 
						|
    # filter the recipient list if the activity is not public, see #270.
 | 
						|
    recipient_lists =
 | 
						|
      case Visibility.is_public?(item) do
 | 
						|
        true ->
 | 
						|
          Pleroma.List.get_lists_from_activity(item)
 | 
						|
 | 
						|
        _ ->
 | 
						|
          Pleroma.List.get_lists_from_activity(item)
 | 
						|
          |> Enum.filter(fn list ->
 | 
						|
            owner = User.get_cached_by_id(list.user_id)
 | 
						|
 | 
						|
            Visibility.visible_for_user?(item, owner)
 | 
						|
          end)
 | 
						|
      end
 | 
						|
 | 
						|
    recipient_topics =
 | 
						|
      recipient_lists
 | 
						|
      |> Enum.map(fn %{id: id} -> "list:#{id}" end)
 | 
						|
 | 
						|
    Enum.each(recipient_topics, fn list_topic ->
 | 
						|
      Logger.debug("Trying to push message to #{list_topic}\n\n")
 | 
						|
      push_to_socket(State.get_sockets(), list_topic, item)
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  defp do_stream(%{topic: topic, item: %Notification{} = item})
 | 
						|
       when topic in ["user", "user:notification"] do
 | 
						|
    State.get_sockets()
 | 
						|
    |> Map.get("#{topic}:#{item.user_id}", [])
 | 
						|
    |> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
 | 
						|
      with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
 | 
						|
           true <- should_send?(user, item) do
 | 
						|
        send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
 | 
						|
      end
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  defp do_stream(%{topic: "user", item: item}) do
 | 
						|
    Logger.debug("Trying to push to users")
 | 
						|
 | 
						|
    recipient_topics =
 | 
						|
      User.get_recipients_from_activity(item)
 | 
						|
      |> Enum.map(fn %{id: id} -> "user:#{id}" end)
 | 
						|
 | 
						|
    Enum.each(recipient_topics, fn topic ->
 | 
						|
      push_to_socket(State.get_sockets(), topic, item)
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  defp do_stream(%{topic: topic, item: item}) do
 | 
						|
    Logger.debug("Trying to push to #{topic}")
 | 
						|
    Logger.debug("Pushing item to #{topic}")
 | 
						|
    push_to_socket(State.get_sockets(), topic, item)
 | 
						|
  end
 | 
						|
 | 
						|
  defp should_send?(%User{} = user, %Activity{} = item) do
 | 
						|
    blocks = user.info.blocks || []
 | 
						|
    mutes = user.info.mutes || []
 | 
						|
    reblog_mutes = user.info.muted_reblogs || []
 | 
						|
    domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
 | 
						|
 | 
						|
    with parent when not is_nil(parent) <- Object.normalize(item),
 | 
						|
         true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
 | 
						|
         true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
 | 
						|
         %{host: item_host} <- URI.parse(item.actor),
 | 
						|
         %{host: parent_host} <- URI.parse(parent.data["actor"]),
 | 
						|
         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
 | 
						|
         false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
 | 
						|
         true <- thread_containment(item, user),
 | 
						|
         false <- CommonAPI.thread_muted?(user, item) do
 | 
						|
      true
 | 
						|
    else
 | 
						|
      _ -> false
 | 
						|
    end
 | 
						|
  end
 | 
						|
 | 
						|
  defp should_send?(%User{} = user, %Notification{activity: activity}) do
 | 
						|
    should_send?(user, activity)
 | 
						|
  end
 | 
						|
 | 
						|
  def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
 | 
						|
    Enum.each(topics[topic] || [], fn %StreamerSocket{
 | 
						|
                                        transport_pid: transport_pid,
 | 
						|
                                        user: socket_user
 | 
						|
                                      } ->
 | 
						|
      # Get the current user so we have up-to-date blocks etc.
 | 
						|
      if socket_user do
 | 
						|
        user = User.get_cached_by_ap_id(socket_user.ap_id)
 | 
						|
 | 
						|
        if should_send?(user, item) do
 | 
						|
          send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
 | 
						|
        end
 | 
						|
      else
 | 
						|
        send(transport_pid, {:text, StreamerView.render("update.json", item)})
 | 
						|
      end
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  def push_to_socket(topics, topic, %Participation{} = participation) do
 | 
						|
    Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
 | 
						|
      send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  def push_to_socket(topics, topic, %Activity{
 | 
						|
        data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
 | 
						|
      }) do
 | 
						|
    Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
 | 
						|
      send(
 | 
						|
        transport_pid,
 | 
						|
        {:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
 | 
						|
      )
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
 | 
						|
 | 
						|
  def push_to_socket(topics, topic, item) do
 | 
						|
    Enum.each(topics[topic] || [], fn %StreamerSocket{
 | 
						|
                                        transport_pid: transport_pid,
 | 
						|
                                        user: socket_user
 | 
						|
                                      } ->
 | 
						|
      # Get the current user so we have up-to-date blocks etc.
 | 
						|
      if socket_user do
 | 
						|
        user = User.get_cached_by_ap_id(socket_user.ap_id)
 | 
						|
        blocks = user.info.blocks || []
 | 
						|
        mutes = user.info.mutes || []
 | 
						|
 | 
						|
        with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
 | 
						|
             true <- thread_containment(item, user) do
 | 
						|
          send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
 | 
						|
        end
 | 
						|
      else
 | 
						|
        send(transport_pid, {:text, StreamerView.render("update.json", item)})
 | 
						|
      end
 | 
						|
    end)
 | 
						|
  end
 | 
						|
 | 
						|
  @spec thread_containment(Activity.t(), User.t()) :: boolean()
 | 
						|
  defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
 | 
						|
 | 
						|
  defp thread_containment(activity, user) do
 | 
						|
    if Config.get([:instance, :skip_thread_containment]) do
 | 
						|
      true
 | 
						|
    else
 | 
						|
      ActivityPub.contain_activity(activity, user)
 | 
						|
    end
 | 
						|
  end
 | 
						|
end
 |