MastoAPI: Implement all streaming functions.
This commit is contained in:
		
							parent
							
								
									5719f69ae3
								
							
						
					
					
						commit
						a743940463
					
				
					 7 changed files with 73 additions and 6 deletions
				
			
		| 
						 | 
				
			
			@ -78,8 +78,9 @@ def create_notifications(_), do: {:ok, []}
 | 
			
		|||
  # TODO move to sql, too.
 | 
			
		||||
  def create_notification(%Activity{} = activity, %User{} = user) do
 | 
			
		||||
    unless User.blocks?(user, %{ap_id: activity.data["actor"]}) do
 | 
			
		||||
      notification = %Notification{user_id: user.id, activity_id: activity.id}
 | 
			
		||||
      notification = %Notification{user_id: user.id, activity: activity}
 | 
			
		||||
      {:ok, notification} = Repo.insert(notification)
 | 
			
		||||
      Pleroma.Web.Streamer.stream("user", notification)
 | 
			
		||||
      notification
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -284,6 +284,17 @@ def get_notified_from_activity(%Activity{data: %{"to" => to}} = activity) do
 | 
			
		|||
    Repo.all(query)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def get_recipients_from_activity(%Activity{data: %{"to" => to}} = activity) do
 | 
			
		||||
    query = from u in User,
 | 
			
		||||
      where: u.local == true
 | 
			
		||||
 | 
			
		||||
    query = from u in query,
 | 
			
		||||
      where: u.ap_id in ^to,
 | 
			
		||||
      or_where: fragment("? \\\?| ?", u.following, ^to)
 | 
			
		||||
 | 
			
		||||
    Repo.all(query)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def search(query, resolve) do
 | 
			
		||||
    if resolve do
 | 
			
		||||
      User.get_or_fetch_by_nickname(query)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -24,6 +24,7 @@ def create(to, actor, context, object, additional \\ %{}, published \\ nil, loca
 | 
			
		|||
         :ok <- maybe_federate(activity) do
 | 
			
		||||
      if activity.data["type"] == "Create" and Enum.member?(activity.data["to"], "https://www.w3.org/ns/activitystreams#Public") do
 | 
			
		||||
        Pleroma.Web.Streamer.stream("public", activity)
 | 
			
		||||
        Pleroma.Web.Streamer.stream("user", activity)
 | 
			
		||||
        if local do
 | 
			
		||||
          Pleroma.Web.Streamer.stream("public:local", activity)
 | 
			
		||||
        end
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -595,7 +595,7 @@ def empty_array(conn, _) do
 | 
			
		|||
    json(conn, [])
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp render_notification(user, %{id: id, activity: activity, inserted_at: created_at} = _params) do
 | 
			
		||||
  def render_notification(user, %{id: id, activity: activity, inserted_at: created_at} = _params) do
 | 
			
		||||
    actor = User.get_cached_by_ap_id(activity.data["actor"])
 | 
			
		||||
    created_at = NaiveDateTime.to_iso8601(created_at)
 | 
			
		||||
    |> String.replace(~r/(\.\d+)?$/, ".000Z", global: false)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -11,7 +11,7 @@ def connect(params, socket) do
 | 
			
		|||
    with token when not is_nil(token) <- params["access_token"],
 | 
			
		||||
         %Token{user_id: user_id} <- Repo.get_by(Token, token: token),
 | 
			
		||||
         %User{} = user <- Repo.get(User, user_id),
 | 
			
		||||
         stream when stream in ["public", "public:local"] <- params["stream"] do
 | 
			
		||||
         stream when stream in ["public", "public:local", "user"] <- params["stream"] do
 | 
			
		||||
      socket = socket
 | 
			
		||||
      |> assign(:topic, params["stream"])
 | 
			
		||||
      |> assign(:user, user)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,6 +2,7 @@ defmodule Pleroma.Web.Streamer do
 | 
			
		|||
  use GenServer
 | 
			
		||||
  require Logger
 | 
			
		||||
  import Plug.Conn
 | 
			
		||||
  alias Pleroma.{User, Notification}
 | 
			
		||||
 | 
			
		||||
  def start_link do
 | 
			
		||||
    spawn(fn ->
 | 
			
		||||
| 
						 | 
				
			
			@ -37,21 +38,55 @@ def handle_cast(%{action: :ping}, topics) do
 | 
			
		|||
    {:noreply, topics}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
 | 
			
		||||
    Logger.debug("Trying to push to #{topic}")
 | 
			
		||||
    Logger.debug("Pushing item to #{topic}")
 | 
			
		||||
  def push_to_socket(topics, topic, item) do
 | 
			
		||||
    Enum.each(topics[topic] || [], fn (socket) ->
 | 
			
		||||
      json = %{
 | 
			
		||||
        event: "update",
 | 
			
		||||
        payload: Pleroma.Web.MastodonAPI.StatusView.render("status.json", activity: item, for: socket.assigns[:user]) |> Poison.encode!
 | 
			
		||||
      } |> Poison.encode!
 | 
			
		||||
 | 
			
		||||
      send socket.transport_pid, {:text, json}
 | 
			
		||||
    end)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def handle_cast(%{action: :stream, topic: "user", item: %Notification{} = item}, topics) do
 | 
			
		||||
    topic = "user:#{item.user_id}"
 | 
			
		||||
    Enum.each(topics[topic] || [], fn (socket) ->
 | 
			
		||||
      json = %{
 | 
			
		||||
        event: "notification",
 | 
			
		||||
        payload: Pleroma.Web.MastodonAPI.MastodonAPIController.render_notification(socket.assigns["user"], item) |> Poison.encode!
 | 
			
		||||
      } |> Poison.encode!
 | 
			
		||||
 | 
			
		||||
      send socket.transport_pid, {:text, json}
 | 
			
		||||
    end)
 | 
			
		||||
    {:noreply, topics}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
 | 
			
		||||
    Logger.debug("Trying to push to users")
 | 
			
		||||
    recipient_topics = User.get_recipients_from_activity(item)
 | 
			
		||||
    |> Enum.map(fn (%{id: id}) -> "user:#{id}" end)
 | 
			
		||||
 | 
			
		||||
    Enum.each(recipient_topics, fn (topic) ->
 | 
			
		||||
      push_to_socket(topics, topic, item)
 | 
			
		||||
    end)
 | 
			
		||||
    {:noreply, topics}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
 | 
			
		||||
    Logger.debug("Trying to push to #{topic}")
 | 
			
		||||
    Logger.debug("Pushing item to #{topic}")
 | 
			
		||||
    push_to_socket(topics, topic, item)
 | 
			
		||||
    {:noreply, topics}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp internal_topic("user", socket) do
 | 
			
		||||
    "user:#{socket.assigns[:user].id}"
 | 
			
		||||
  end
 | 
			
		||||
  defp internal_topic(topic, socket), do: topic
 | 
			
		||||
 | 
			
		||||
  def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
 | 
			
		||||
    topic = internal_topic(topic, socket)
 | 
			
		||||
    sockets_for_topic = sockets[topic] || []
 | 
			
		||||
    sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
 | 
			
		||||
    sockets = Map.put(sockets, topic, sockets_for_topic)
 | 
			
		||||
| 
						 | 
				
			
			@ -61,6 +96,7 @@ def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
 | 
			
		|||
  end
 | 
			
		||||
 | 
			
		||||
  def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
 | 
			
		||||
    topic = internal_topic(topic, socket)
 | 
			
		||||
    sockets_for_topic = sockets[topic] || []
 | 
			
		||||
    sockets_for_topic = List.delete(sockets_for_topic, socket)
 | 
			
		||||
    sockets = Map.put(sockets, topic, sockets_for_topic)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,6 +3,7 @@ defmodule Pleroma.UserTest do
 | 
			
		|||
  alias Pleroma.{User, Repo}
 | 
			
		||||
  alias Pleroma.Web.OStatus
 | 
			
		||||
  alias Pleroma.Web.Websub.WebsubClientSubscription
 | 
			
		||||
  alias Pleroma.Web.CommonAPI
 | 
			
		||||
  use Pleroma.DataCase
 | 
			
		||||
 | 
			
		||||
  import Pleroma.Factory
 | 
			
		||||
| 
						 | 
				
			
			@ -296,5 +297,22 @@ test "it unblocks users" do
 | 
			
		|||
      refute User.blocks?(user, blocked_user)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  test "get recipients from activity" do
 | 
			
		||||
    actor = insert(:user)
 | 
			
		||||
    user = insert(:user, local: true)
 | 
			
		||||
    user_two = insert(:user, local: false)
 | 
			
		||||
    addressed = insert(:user, local: true)
 | 
			
		||||
    addressed_remote = insert(:user, local: false)
 | 
			
		||||
    {:ok, activity} = CommonAPI.post(actor, %{"status" => "hey @#{addressed.nickname} @#{addressed_remote.nickname}"})
 | 
			
		||||
 | 
			
		||||
    assert [addressed] == User.get_recipients_from_activity(activity)
 | 
			
		||||
 | 
			
		||||
    {:ok, user} = User.follow(user, actor)
 | 
			
		||||
    recipients = User.get_recipients_from_activity(activity)
 | 
			
		||||
    assert length(recipients) == 2
 | 
			
		||||
    assert user in recipients
 | 
			
		||||
    assert addressed in recipients
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in a new issue