137 lines
		
	
	
	
		
			3.7 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
			
		
		
	
	
			137 lines
		
	
	
	
		
			3.7 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
| # Pleroma: A lightweight social networking server
 | |
| # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | |
| # SPDX-License-Identifier: AGPL-3.0-only
 | |
| 
 | |
| defmodule Pleroma.Web.FedSockets.FedSocket do
 | |
|   @moduledoc """
 | |
|   The FedSocket module abstracts the actions to be taken taken on connections regardless of
 | |
|   whether the connection started as inbound or outbound.
 | |
| 
 | |
| 
 | |
|   Normally outside modules will have no need to call the FedSocket module directly.
 | |
|   """
 | |
| 
 | |
|   alias Pleroma.Object
 | |
|   alias Pleroma.Object.Containment
 | |
|   alias Pleroma.User
 | |
|   alias Pleroma.Web.ActivityPub.ObjectView
 | |
|   alias Pleroma.Web.ActivityPub.UserView
 | |
|   alias Pleroma.Web.ActivityPub.Visibility
 | |
|   alias Pleroma.Web.FedSockets.FetchRegistry
 | |
|   alias Pleroma.Web.FedSockets.IngesterWorker
 | |
|   alias Pleroma.Web.FedSockets.OutgoingHandler
 | |
|   alias Pleroma.Web.FedSockets.SocketInfo
 | |
| 
 | |
|   require Logger
 | |
| 
 | |
|   @shake "61dd18f7-f1e6-49a4-939a-a749fcdc1103"
 | |
| 
 | |
|   def connect_to_host(uri) do
 | |
|     case OutgoingHandler.start_link(uri) do
 | |
|       {:ok, pid} ->
 | |
|         {:ok, pid}
 | |
| 
 | |
|       error ->
 | |
|         {:error, error}
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   def close(%SocketInfo{pid: socket_pid}),
 | |
|     do: Process.send(socket_pid, :close, [])
 | |
| 
 | |
|   def publish(%SocketInfo{pid: socket_pid}, json) do
 | |
|     %{action: :publish, data: json}
 | |
|     |> Jason.encode!()
 | |
|     |> send_packet(socket_pid)
 | |
|   end
 | |
| 
 | |
|   def fetch(%SocketInfo{pid: socket_pid}, id) do
 | |
|     fetch_uuid = FetchRegistry.register_fetch(id)
 | |
| 
 | |
|     %{action: :fetch, data: id, uuid: fetch_uuid}
 | |
|     |> Jason.encode!()
 | |
|     |> send_packet(socket_pid)
 | |
| 
 | |
|     wait_for_fetch_to_return(fetch_uuid, 0)
 | |
|   end
 | |
| 
 | |
|   def receive_package(%SocketInfo{} = fed_socket, json) do
 | |
|     json
 | |
|     |> Jason.decode!()
 | |
|     |> process_package(fed_socket)
 | |
|   end
 | |
| 
 | |
|   defp wait_for_fetch_to_return(uuid, cntr) do
 | |
|     case FetchRegistry.check_fetch(uuid) do
 | |
|       {:error, :waiting} ->
 | |
|         Process.sleep(:math.pow(cntr, 3) |> Kernel.trunc())
 | |
|         wait_for_fetch_to_return(uuid, cntr + 1)
 | |
| 
 | |
|       {:error, :missing} ->
 | |
|         Logger.error("FedSocket fetch timed out - #{inspect(uuid)}")
 | |
|         {:error, :timeout}
 | |
| 
 | |
|       {:ok, _fr} ->
 | |
|         FetchRegistry.pop_fetch(uuid)
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do
 | |
|     if Containment.contain_origin(origin, data) do
 | |
|       IngesterWorker.enqueue("ingest", %{"object" => data})
 | |
|     end
 | |
| 
 | |
|     {:reply, %{"action" => "publish_reply", "status" => "processed"}}
 | |
|   end
 | |
| 
 | |
|   defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do
 | |
|     FetchRegistry.register_fetch_received(uuid, data)
 | |
|     {:noreply, nil}
 | |
|   end
 | |
| 
 | |
|   defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do
 | |
|     {:ok, data} = render_fetched_data(ap_id, uuid)
 | |
|     {:reply, data}
 | |
|   end
 | |
| 
 | |
|   defp process_package(%{"action" => "publish_reply"}, _fed_socket) do
 | |
|     {:noreply, nil}
 | |
|   end
 | |
| 
 | |
|   defp process_package(other, _fed_socket) do
 | |
|     Logger.warn("unknown json packages received #{inspect(other)}")
 | |
|     {:noreply, nil}
 | |
|   end
 | |
| 
 | |
|   defp render_fetched_data(ap_id, uuid) do
 | |
|     {:ok,
 | |
|      %{
 | |
|        "action" => "fetch_reply",
 | |
|        "status" => "processed",
 | |
|        "uuid" => uuid,
 | |
|        "data" => represent_item(ap_id)
 | |
|      }}
 | |
|   end
 | |
| 
 | |
|   defp represent_item(ap_id) do
 | |
|     case User.get_by_ap_id(ap_id) do
 | |
|       nil ->
 | |
|         object = Object.get_cached_by_ap_id(ap_id)
 | |
| 
 | |
|         if Visibility.is_public?(object) do
 | |
|           Phoenix.View.render_to_string(ObjectView, "object.json", object: object)
 | |
|         else
 | |
|           nil
 | |
|         end
 | |
| 
 | |
|       user ->
 | |
|         Phoenix.View.render_to_string(UserView, "user.json", user: user)
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   defp send_packet(data, socket_pid) do
 | |
|     Process.send(socket_pid, {:send, data}, [])
 | |
|   end
 | |
| 
 | |
|   def shake, do: @shake
 | |
| end
 | 
