Merge branch 'remove/fedsockets' into 'develop'
Remove FedSockets See merge request pleroma/pleroma!3155
This commit is contained in:
		
						commit
						d33b9e7b33
					
				
					 23 changed files with 32 additions and 1443 deletions
				
			
		| 
						 | 
				
			
			@ -129,7 +129,6 @@
 | 
			
		|||
    dispatch: [
 | 
			
		||||
      {:_,
 | 
			
		||||
       [
 | 
			
		||||
         {"/api/fedsocket/v1", Pleroma.Web.FedSockets.IncomingHandler, []},
 | 
			
		||||
         {"/api/v1/streaming", Pleroma.Web.MastodonAPI.WebsocketHandler, []},
 | 
			
		||||
         {"/websocket", Phoenix.Endpoint.CowboyWebSocket,
 | 
			
		||||
          {Phoenix.Transports.WebSocket,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -272,19 +272,6 @@
 | 
			
		|||
      }
 | 
			
		||||
    ]
 | 
			
		||||
  },
 | 
			
		||||
  %{
 | 
			
		||||
    group: :pleroma,
 | 
			
		||||
    key: :fed_sockets,
 | 
			
		||||
    type: :group,
 | 
			
		||||
    description: "Websocket based federation",
 | 
			
		||||
    children: [
 | 
			
		||||
      %{
 | 
			
		||||
        key: :enabled,
 | 
			
		||||
        type: :boolean,
 | 
			
		||||
        description: "Enable FedSockets"
 | 
			
		||||
      }
 | 
			
		||||
    ]
 | 
			
		||||
  },
 | 
			
		||||
  %{
 | 
			
		||||
    group: :pleroma,
 | 
			
		||||
    key: Pleroma.Emails.Mailer,
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -220,18 +220,6 @@ config :pleroma, :mrf_user_allowlist, %{
 | 
			
		|||
* `total_user_limit`: the number of scheduled activities a user is allowed to create in total (Default: `300`)
 | 
			
		||||
* `enabled`: whether scheduled activities are sent to the job queue to be executed
 | 
			
		||||
 | 
			
		||||
## FedSockets
 | 
			
		||||
FedSockets is an experimental feature allowing for Pleroma backends to federate using a persistant websocket connection as opposed to making each federation a seperate http connection. This feature is currently off by default. It is configurable throught he following options.
 | 
			
		||||
 | 
			
		||||
### :fedsockets
 | 
			
		||||
* `enabled`: Enables FedSockets for this instance. `false` by default.
 | 
			
		||||
* `connection_duration`: Time an idle websocket is kept open.
 | 
			
		||||
* `rejection_duration`: Failures to connect via FedSockets will not be retried for this period of time.
 | 
			
		||||
* `fed_socket_fetches` and `fed_socket_rejections`: Settings passed to `cachex` for the fetch registry, and rejection stacks. See `Pleroma.Web.FedSockets` for more details.
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
## Frontends
 | 
			
		||||
 | 
			
		||||
### :frontend_configurations
 | 
			
		||||
 | 
			
		||||
This can be used to configure a keyword list that keeps the configuration data for any kind of frontend. By default, settings for `pleroma_fe` and `masto_fe` are configured. You can find the documentation for `pleroma_fe` configuration into [Pleroma-FE configuration and customization for instance administrators](/frontend/CONFIGURATION/#options).
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -93,9 +93,4 @@ server {
 | 
			
		|||
        chunked_transfer_encoding on;
 | 
			
		||||
        proxy_pass         http://phoenix;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    location /api/fedsocket/v1 {
 | 
			
		||||
        proxy_request_buffering off;
 | 
			
		||||
        proxy_pass http://phoenix/api/fedsocket/v1;
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -207,8 +207,7 @@ defp dont_run_in_test(_) do
 | 
			
		|||
         name: Pleroma.Web.Streamer.registry(),
 | 
			
		||||
         keys: :duplicate,
 | 
			
		||||
         partitions: System.schedulers_online()
 | 
			
		||||
       ]},
 | 
			
		||||
      Pleroma.Web.FedSockets.Supervisor
 | 
			
		||||
       ]}
 | 
			
		||||
    ]
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -12,7 +12,6 @@ defmodule Pleroma.Object.Fetcher do
 | 
			
		|||
  alias Pleroma.Web.ActivityPub.ObjectValidator
 | 
			
		||||
  alias Pleroma.Web.ActivityPub.Transmogrifier
 | 
			
		||||
  alias Pleroma.Web.Federator
 | 
			
		||||
  alias Pleroma.Web.FedSockets
 | 
			
		||||
 | 
			
		||||
  require Logger
 | 
			
		||||
  require Pleroma.Constants
 | 
			
		||||
| 
						 | 
				
			
			@ -183,16 +182,16 @@ defp maybe_date_fetch(headers, date) do
 | 
			
		|||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def fetch_and_contain_remote_object_from_id(prm, opts \\ [])
 | 
			
		||||
  def fetch_and_contain_remote_object_from_id(id)
 | 
			
		||||
 | 
			
		||||
  def fetch_and_contain_remote_object_from_id(%{"id" => id}, opts),
 | 
			
		||||
    do: fetch_and_contain_remote_object_from_id(id, opts)
 | 
			
		||||
  def fetch_and_contain_remote_object_from_id(%{"id" => id}),
 | 
			
		||||
    do: fetch_and_contain_remote_object_from_id(id)
 | 
			
		||||
 | 
			
		||||
  def fetch_and_contain_remote_object_from_id(id, opts) when is_binary(id) do
 | 
			
		||||
  def fetch_and_contain_remote_object_from_id(id) when is_binary(id) do
 | 
			
		||||
    Logger.debug("Fetching object #{id} via AP")
 | 
			
		||||
 | 
			
		||||
    with {:scheme, true} <- {:scheme, String.starts_with?(id, "http")},
 | 
			
		||||
         {:ok, body} <- get_object(id, opts),
 | 
			
		||||
         {:ok, body} <- get_object(id),
 | 
			
		||||
         {:ok, data} <- safe_json_decode(body),
 | 
			
		||||
         :ok <- Containment.contain_origin_from_id(id, data) do
 | 
			
		||||
      {:ok, data}
 | 
			
		||||
| 
						 | 
				
			
			@ -208,22 +207,10 @@ def fetch_and_contain_remote_object_from_id(id, opts) when is_binary(id) do
 | 
			
		|||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def fetch_and_contain_remote_object_from_id(_id, _opts),
 | 
			
		||||
  def fetch_and_contain_remote_object_from_id(_id),
 | 
			
		||||
    do: {:error, "id must be a string"}
 | 
			
		||||
 | 
			
		||||
  defp get_object(id, opts) do
 | 
			
		||||
    with false <- Keyword.get(opts, :force_http, false),
 | 
			
		||||
         {:ok, fedsocket} <- FedSockets.get_or_create_fed_socket(id) do
 | 
			
		||||
      Logger.debug("fetching via fedsocket - #{inspect(id)}")
 | 
			
		||||
      FedSockets.fetch(fedsocket, id)
 | 
			
		||||
    else
 | 
			
		||||
      _other ->
 | 
			
		||||
        Logger.debug("fetching via http - #{inspect(id)}")
 | 
			
		||||
        get_object_http(id)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp get_object_http(id) do
 | 
			
		||||
  defp get_object(id) do
 | 
			
		||||
    date = Pleroma.Signature.signed_date()
 | 
			
		||||
 | 
			
		||||
    headers =
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -39,7 +39,7 @@ def key_id_to_actor_id(key_id) do
 | 
			
		|||
  def fetch_public_key(conn) do
 | 
			
		||||
    with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn),
 | 
			
		||||
         {:ok, actor_id} <- key_id_to_actor_id(kid),
 | 
			
		||||
         {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id, force_http: true) do
 | 
			
		||||
         {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do
 | 
			
		||||
      {:ok, public_key}
 | 
			
		||||
    else
 | 
			
		||||
      e ->
 | 
			
		||||
| 
						 | 
				
			
			@ -50,8 +50,8 @@ def fetch_public_key(conn) do
 | 
			
		|||
  def refetch_public_key(conn) do
 | 
			
		||||
    with %{"keyId" => kid} <- HTTPSignatures.signature_for_conn(conn),
 | 
			
		||||
         {:ok, actor_id} <- key_id_to_actor_id(kid),
 | 
			
		||||
         {:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id, force_http: true),
 | 
			
		||||
         {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id, force_http: true) do
 | 
			
		||||
         {:ok, _user} <- ActivityPub.make_user_from_ap_id(actor_id),
 | 
			
		||||
         {:ok, public_key} <- User.get_public_key_for_ap_id(actor_id) do
 | 
			
		||||
      {:ok, public_key}
 | 
			
		||||
    else
 | 
			
		||||
      e ->
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1772,12 +1772,12 @@ def html_filter_policy(%User{no_rich_text: true}) do
 | 
			
		|||
 | 
			
		||||
  def html_filter_policy(_), do: Config.get([:markup, :scrub_policy])
 | 
			
		||||
 | 
			
		||||
  def fetch_by_ap_id(ap_id, opts \\ []), do: ActivityPub.make_user_from_ap_id(ap_id, opts)
 | 
			
		||||
  def fetch_by_ap_id(ap_id), do: ActivityPub.make_user_from_ap_id(ap_id)
 | 
			
		||||
 | 
			
		||||
  def get_or_fetch_by_ap_id(ap_id, opts \\ []) do
 | 
			
		||||
  def get_or_fetch_by_ap_id(ap_id) do
 | 
			
		||||
    cached_user = get_cached_by_ap_id(ap_id)
 | 
			
		||||
 | 
			
		||||
    maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id, opts)
 | 
			
		||||
    maybe_fetched_user = needs_update?(cached_user) && fetch_by_ap_id(ap_id)
 | 
			
		||||
 | 
			
		||||
    case {cached_user, maybe_fetched_user} do
 | 
			
		||||
      {_, {:ok, %User{} = user}} ->
 | 
			
		||||
| 
						 | 
				
			
			@ -1850,8 +1850,8 @@ def public_key(%{public_key: public_key_pem}) when is_binary(public_key_pem) do
 | 
			
		|||
 | 
			
		||||
  def public_key(_), do: {:error, "key not found"}
 | 
			
		||||
 | 
			
		||||
  def get_public_key_for_ap_id(ap_id, opts \\ []) do
 | 
			
		||||
    with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id, opts),
 | 
			
		||||
  def get_public_key_for_ap_id(ap_id) do
 | 
			
		||||
    with {:ok, %User{} = user} <- get_or_fetch_by_ap_id(ap_id),
 | 
			
		||||
         {:ok, public_key} <- public_key(user) do
 | 
			
		||||
      {:ok, public_key}
 | 
			
		||||
    else
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1289,12 +1289,10 @@ defp object_to_user_data(data) do
 | 
			
		|||
 | 
			
		||||
  def fetch_follow_information_for_user(user) do
 | 
			
		||||
    with {:ok, following_data} <-
 | 
			
		||||
           Fetcher.fetch_and_contain_remote_object_from_id(user.following_address,
 | 
			
		||||
             force_http: true
 | 
			
		||||
           ),
 | 
			
		||||
           Fetcher.fetch_and_contain_remote_object_from_id(user.following_address),
 | 
			
		||||
         {:ok, hide_follows} <- collection_private(following_data),
 | 
			
		||||
         {:ok, followers_data} <-
 | 
			
		||||
           Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address, force_http: true),
 | 
			
		||||
           Fetcher.fetch_and_contain_remote_object_from_id(user.follower_address),
 | 
			
		||||
         {:ok, hide_followers} <- collection_private(followers_data) do
 | 
			
		||||
      {:ok,
 | 
			
		||||
       %{
 | 
			
		||||
| 
						 | 
				
			
			@ -1368,8 +1366,8 @@ def user_data_from_user_object(data) do
 | 
			
		|||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def fetch_and_prepare_user_from_ap_id(ap_id, opts \\ []) do
 | 
			
		||||
    with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id, opts),
 | 
			
		||||
  def fetch_and_prepare_user_from_ap_id(ap_id) do
 | 
			
		||||
    with {:ok, data} <- Fetcher.fetch_and_contain_remote_object_from_id(ap_id),
 | 
			
		||||
         {:ok, data} <- user_data_from_user_object(data) do
 | 
			
		||||
      {:ok, maybe_update_follow_information(data)}
 | 
			
		||||
    else
 | 
			
		||||
| 
						 | 
				
			
			@ -1412,13 +1410,13 @@ def maybe_handle_clashing_nickname(data) do
 | 
			
		|||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def make_user_from_ap_id(ap_id, opts \\ []) do
 | 
			
		||||
  def make_user_from_ap_id(ap_id) do
 | 
			
		||||
    user = User.get_cached_by_ap_id(ap_id)
 | 
			
		||||
 | 
			
		||||
    if user && !User.ap_enabled?(user) do
 | 
			
		||||
      Transmogrifier.upgrade_user_from_ap_id(ap_id)
 | 
			
		||||
    else
 | 
			
		||||
      with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id, opts) do
 | 
			
		||||
      with {:ok, data} <- fetch_and_prepare_user_from_ap_id(ap_id) do
 | 
			
		||||
        if user do
 | 
			
		||||
          user
 | 
			
		||||
          |> User.remote_user_changeset(data)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -13,7 +13,6 @@ defmodule Pleroma.Web.ActivityPub.Publisher do
 | 
			
		|||
  alias Pleroma.User
 | 
			
		||||
  alias Pleroma.Web.ActivityPub.Relay
 | 
			
		||||
  alias Pleroma.Web.ActivityPub.Transmogrifier
 | 
			
		||||
  alias Pleroma.Web.FedSockets
 | 
			
		||||
 | 
			
		||||
  require Pleroma.Constants
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -50,28 +49,6 @@ def is_representable?(%Activity{} = activity) do
 | 
			
		|||
  """
 | 
			
		||||
  def publish_one(%{inbox: inbox, json: json, actor: %User{} = actor, id: id} = params) do
 | 
			
		||||
    Logger.debug("Federating #{id} to #{inbox}")
 | 
			
		||||
 | 
			
		||||
    case FedSockets.get_or_create_fed_socket(inbox) do
 | 
			
		||||
      {:ok, fedsocket} ->
 | 
			
		||||
        Logger.debug("publishing via fedsockets - #{inspect(inbox)}")
 | 
			
		||||
        FedSockets.publish(fedsocket, json)
 | 
			
		||||
 | 
			
		||||
      _ ->
 | 
			
		||||
        Logger.debug("publishing via http - #{inspect(inbox)}")
 | 
			
		||||
        http_publish(inbox, actor, json, params)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def publish_one(%{actor_id: actor_id} = params) do
 | 
			
		||||
    actor = User.get_cached_by_id(actor_id)
 | 
			
		||||
 | 
			
		||||
    params
 | 
			
		||||
    |> Map.delete(:actor_id)
 | 
			
		||||
    |> Map.put(:actor, actor)
 | 
			
		||||
    |> publish_one()
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp http_publish(inbox, actor, json, params) do
 | 
			
		||||
    uri = %{path: path} = URI.parse(inbox)
 | 
			
		||||
    digest = "SHA-256=" <> (:crypto.hash(:sha256, json) |> Base.encode64())
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -110,6 +87,15 @@ defp http_publish(inbox, actor, json, params) do
 | 
			
		|||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def publish_one(%{actor_id: actor_id} = params) do
 | 
			
		||||
    actor = User.get_cached_by_id(actor_id)
 | 
			
		||||
 | 
			
		||||
    params
 | 
			
		||||
    |> Map.delete(:actor_id)
 | 
			
		||||
    |> Map.put(:actor, actor)
 | 
			
		||||
    |> publish_one()
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp signature_host(%URI{port: port, scheme: scheme, host: host}) do
 | 
			
		||||
    if port == URI.default_port(scheme) do
 | 
			
		||||
      host
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1008,7 +1008,7 @@ def perform(:user_upgrade, user) do
 | 
			
		|||
 | 
			
		||||
  def upgrade_user_from_ap_id(ap_id) do
 | 
			
		||||
    with %User{local: false} = user <- User.get_cached_by_ap_id(ap_id),
 | 
			
		||||
         {:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id, force_http: true),
 | 
			
		||||
         {:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id),
 | 
			
		||||
         {:ok, user} <- update_user(user, data) do
 | 
			
		||||
      TransmogrifierWorker.enqueue("user_upgrade", %{"user_id" => user.id})
 | 
			
		||||
      {:ok, user}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -1,185 +0,0 @@
 | 
			
		|||
# Pleroma: A lightweight social networking server
 | 
			
		||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | 
			
		||||
# SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 | 
			
		||||
defmodule Pleroma.Web.FedSockets do
 | 
			
		||||
  @moduledoc """
 | 
			
		||||
  This documents the FedSockets framework. A framework for federating
 | 
			
		||||
  ActivityPub objects between servers via persistant WebSocket connections.
 | 
			
		||||
 | 
			
		||||
  FedSockets allow servers to authenticate on first contact and maintain that
 | 
			
		||||
  connection, eliminating the need to authenticate every time data needs to be shared.
 | 
			
		||||
 | 
			
		||||
  ## Protocol
 | 
			
		||||
  FedSockets currently support 2 types of data transfer:
 | 
			
		||||
    * `publish` method which doesn't require a response
 | 
			
		||||
    * `fetch` method requires a response be sent
 | 
			
		||||
 | 
			
		||||
    ### Publish
 | 
			
		||||
    The publish operation sends a json encoded map of the shape:
 | 
			
		||||
      %{action: :publish, data: json}
 | 
			
		||||
    and accepts (but does not require) a reply of form:
 | 
			
		||||
      %{"action" => "publish_reply"}
 | 
			
		||||
 | 
			
		||||
    The outgoing params represent
 | 
			
		||||
      * data: ActivityPub object encoded into json
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    ### Fetch
 | 
			
		||||
    The fetch operation sends a json encoded map of the shape:
 | 
			
		||||
      %{action: :fetch, data: id, uuid: fetch_uuid}
 | 
			
		||||
    and requires a reply of form:
 | 
			
		||||
      %{"action" => "fetch_reply", "uuid" => uuid, "data" => data}
 | 
			
		||||
 | 
			
		||||
    The outgoing params represent
 | 
			
		||||
      * id: an ActivityPub object URI
 | 
			
		||||
      * uuid: a unique uuid generated by the sender
 | 
			
		||||
 | 
			
		||||
    The reply params represent
 | 
			
		||||
      * data: an ActivityPub object encoded into json
 | 
			
		||||
      * uuid: the uuid sent along with the fetch request
 | 
			
		||||
 | 
			
		||||
  ## Examples
 | 
			
		||||
  Clients of FedSocket transfers shouldn't need to use any of the functions outside of this module.
 | 
			
		||||
 | 
			
		||||
  A typical publish operation can be performed through the following code, and a fetch operation in a similar manner.
 | 
			
		||||
 | 
			
		||||
    case FedSockets.get_or_create_fed_socket(inbox) do
 | 
			
		||||
      {:ok, fedsocket} ->
 | 
			
		||||
        FedSockets.publish(fedsocket, json)
 | 
			
		||||
 | 
			
		||||
      _ ->
 | 
			
		||||
        alternative_publish(inbox, actor, json, params)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
  ## Configuration
 | 
			
		||||
  FedSockets have the following config settings
 | 
			
		||||
 | 
			
		||||
  config :pleroma, :fed_sockets,
 | 
			
		||||
  enabled: true,
 | 
			
		||||
  ping_interval: :timer.seconds(15),
 | 
			
		||||
  connection_duration: :timer.hours(1),
 | 
			
		||||
  rejection_duration: :timer.hours(1),
 | 
			
		||||
  fed_socket_fetches: [
 | 
			
		||||
    default: 12_000,
 | 
			
		||||
    interval: 3_000,
 | 
			
		||||
    lazy: false
 | 
			
		||||
  ]
 | 
			
		||||
    * enabled - turn FedSockets on or off with this flag. Can be toggled at runtime.
 | 
			
		||||
    * connection_duration - How long a FedSocket can sit idle before it's culled.
 | 
			
		||||
    * rejection_duration - After failing to make a FedSocket connection a host will be excluded
 | 
			
		||||
    from further connections for this amount of time
 | 
			
		||||
    * fed_socket_fetches - Use these parameters to pass options to the Cachex queue backing the FetchRegistry
 | 
			
		||||
    * fed_socket_rejections - Use these parameters to pass options to the Cachex queue backing the FedRegistry
 | 
			
		||||
 | 
			
		||||
    Cachex options are
 | 
			
		||||
      * default: the minimum amount of time a fetch can wait before it times out.
 | 
			
		||||
      * interval: the interval between checks for timed out entries. This plus the default represent the maximum time allowed
 | 
			
		||||
      * lazy: leave at false for consistant and fast lookups, set to true for stricter timeout enforcement
 | 
			
		||||
 | 
			
		||||
  """
 | 
			
		||||
  require Logger
 | 
			
		||||
 | 
			
		||||
  alias Pleroma.Web.FedSockets.FedRegistry
 | 
			
		||||
  alias Pleroma.Web.FedSockets.FedSocket
 | 
			
		||||
  alias Pleroma.Web.FedSockets.SocketInfo
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  returns a FedSocket for the given origin. Will reuse an existing one or create a new one.
 | 
			
		||||
 | 
			
		||||
  address is expected to be a fully formed URL such as:
 | 
			
		||||
  "http://www.example.com" or "http://www.example.com:8080"
 | 
			
		||||
 | 
			
		||||
  It can and usually does include additional path parameters,
 | 
			
		||||
  but these are ignored as the FedSockets are organized by host and port info alone.
 | 
			
		||||
  """
 | 
			
		||||
  def get_or_create_fed_socket(address) do
 | 
			
		||||
    with {:cache, {:error, :missing}} <- {:cache, get_fed_socket(address)},
 | 
			
		||||
         {:connect, {:ok, _pid}} <- {:connect, FedSocket.connect_to_host(address)},
 | 
			
		||||
         {:cache, {:ok, fed_socket}} <- {:cache, get_fed_socket(address)} do
 | 
			
		||||
      Logger.debug("fedsocket created for - #{inspect(address)}")
 | 
			
		||||
      {:ok, fed_socket}
 | 
			
		||||
    else
 | 
			
		||||
      {:cache, {:ok, socket}} ->
 | 
			
		||||
        Logger.debug("fedsocket found in cache - #{inspect(address)}")
 | 
			
		||||
        {:ok, socket}
 | 
			
		||||
 | 
			
		||||
      {:cache, {:error, :rejected} = e} ->
 | 
			
		||||
        e
 | 
			
		||||
 | 
			
		||||
      {:connect, {:error, _host}} ->
 | 
			
		||||
        Logger.debug("set host rejected for - #{inspect(address)}")
 | 
			
		||||
        FedRegistry.set_host_rejected(address)
 | 
			
		||||
        {:error, :rejected}
 | 
			
		||||
 | 
			
		||||
      {_, {:error, :disabled}} ->
 | 
			
		||||
        {:error, :disabled}
 | 
			
		||||
 | 
			
		||||
      {_, {:error, reason}} ->
 | 
			
		||||
        Logger.warn("get_or_create_fed_socket error - #{inspect(reason)}")
 | 
			
		||||
        {:error, reason}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  returns a FedSocket for the given origin. Will not create a new FedSocket if one does not exist.
 | 
			
		||||
 | 
			
		||||
  address is expected to be a fully formed URL such as:
 | 
			
		||||
    "http://www.example.com" or "http://www.example.com:8080"
 | 
			
		||||
  """
 | 
			
		||||
  def get_fed_socket(address) do
 | 
			
		||||
    origin = SocketInfo.origin(address)
 | 
			
		||||
 | 
			
		||||
    with {:config, true} <- {:config, Pleroma.Config.get([:fed_sockets, :enabled], false)},
 | 
			
		||||
         {:ok, socket} <- FedRegistry.get_fed_socket(origin) do
 | 
			
		||||
      {:ok, socket}
 | 
			
		||||
    else
 | 
			
		||||
      {:config, _} ->
 | 
			
		||||
        {:error, :disabled}
 | 
			
		||||
 | 
			
		||||
      {:error, :rejected} ->
 | 
			
		||||
        Logger.debug("FedSocket previously rejected - #{inspect(origin)}")
 | 
			
		||||
        {:error, :rejected}
 | 
			
		||||
 | 
			
		||||
      {:error, reason} ->
 | 
			
		||||
        {:error, reason}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Sends the supplied data via the publish protocol.
 | 
			
		||||
  It will not block waiting for a reply.
 | 
			
		||||
  Returns :ok but this is not an indication of a successful transfer.
 | 
			
		||||
 | 
			
		||||
  the data is expected to be JSON encoded binary data.
 | 
			
		||||
  """
 | 
			
		||||
  def publish(%SocketInfo{} = fed_socket, json) do
 | 
			
		||||
    FedSocket.publish(fed_socket, json)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Sends the supplied data via the fetch protocol.
 | 
			
		||||
  It will block waiting for a reply or timeout.
 | 
			
		||||
 | 
			
		||||
  Returns {:ok, object} where object is the requested object (or nil)
 | 
			
		||||
          {:error, :timeout} in the event the message was not responded to
 | 
			
		||||
 | 
			
		||||
  the id is expected to be the URI of an ActivityPub object.
 | 
			
		||||
  """
 | 
			
		||||
  def fetch(%SocketInfo{} = fed_socket, id) do
 | 
			
		||||
    FedSocket.fetch(fed_socket, id)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Disconnect all and restart FedSockets.
 | 
			
		||||
  This is mainly used in development and testing but could be useful in production.
 | 
			
		||||
  """
 | 
			
		||||
  def reset do
 | 
			
		||||
    FedRegistry
 | 
			
		||||
    |> Process.whereis()
 | 
			
		||||
    |> Process.exit(:testing)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def uri_for_origin(origin),
 | 
			
		||||
    do: "ws://#{origin}/api/fedsocket/v1"
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,185 +0,0 @@
 | 
			
		|||
# Pleroma: A lightweight social networking server
 | 
			
		||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | 
			
		||||
# SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 | 
			
		||||
defmodule Pleroma.Web.FedSockets.FedRegistry do
 | 
			
		||||
  @moduledoc """
 | 
			
		||||
  The FedRegistry stores the active FedSockets for quick retrieval.
 | 
			
		||||
 | 
			
		||||
  The storage and retrieval portion of the FedRegistry is done in process through
 | 
			
		||||
  elixir's `Registry` module for speed and its ability to monitor for terminated processes.
 | 
			
		||||
 | 
			
		||||
  Dropped connections will be caught by `Registry` and deleted. Since the next
 | 
			
		||||
  message will initiate a new connection there is no reason to try and reconnect at that point.
 | 
			
		||||
 | 
			
		||||
  Normally outside modules should have no need to call or use the FedRegistry themselves.
 | 
			
		||||
  """
 | 
			
		||||
 | 
			
		||||
  alias Pleroma.Web.FedSockets.FedSocket
 | 
			
		||||
  alias Pleroma.Web.FedSockets.SocketInfo
 | 
			
		||||
 | 
			
		||||
  require Logger
 | 
			
		||||
 | 
			
		||||
  @default_rejection_duration 15 * 60 * 1000
 | 
			
		||||
  @rejections :fed_socket_rejections
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Retrieves a FedSocket from the Registry given it's origin.
 | 
			
		||||
 | 
			
		||||
  The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"
 | 
			
		||||
 | 
			
		||||
  Will return:
 | 
			
		||||
    * {:ok, fed_socket} for working FedSockets
 | 
			
		||||
    * {:error, :rejected} for origins that have been tried and refused within the rejection duration interval
 | 
			
		||||
    * {:error, some_reason} usually :missing for unknown origins
 | 
			
		||||
  """
 | 
			
		||||
  def get_fed_socket(origin) do
 | 
			
		||||
    case get_registry_data(origin) do
 | 
			
		||||
      {:error, reason} ->
 | 
			
		||||
        {:error, reason}
 | 
			
		||||
 | 
			
		||||
      {:ok, %{state: :connected} = socket_info} ->
 | 
			
		||||
        {:ok, socket_info}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Adds a connected FedSocket to the Registry.
 | 
			
		||||
 | 
			
		||||
  Always returns {:ok, fed_socket}
 | 
			
		||||
  """
 | 
			
		||||
  def add_fed_socket(origin, pid \\ nil) do
 | 
			
		||||
    origin
 | 
			
		||||
    |> SocketInfo.build(pid)
 | 
			
		||||
    |> SocketInfo.connect()
 | 
			
		||||
    |> add_socket_info
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp add_socket_info(%{origin: origin, state: :connected} = socket_info) do
 | 
			
		||||
    case Registry.register(FedSockets.Registry, origin, socket_info) do
 | 
			
		||||
      {:ok, _owner} ->
 | 
			
		||||
        clear_prior_rejection(origin)
 | 
			
		||||
        Logger.debug("fedsocket added: #{inspect(origin)}")
 | 
			
		||||
 | 
			
		||||
        {:ok, socket_info}
 | 
			
		||||
 | 
			
		||||
      {:error, {:already_registered, _pid}} ->
 | 
			
		||||
        FedSocket.close(socket_info)
 | 
			
		||||
        existing_socket_info = Registry.lookup(FedSockets.Registry, origin)
 | 
			
		||||
 | 
			
		||||
        {:ok, existing_socket_info}
 | 
			
		||||
 | 
			
		||||
      _ ->
 | 
			
		||||
        {:error, :error_adding_socket}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Mark this origin as having rejected a connection attempt.
 | 
			
		||||
  This will keep it from getting additional connection attempts
 | 
			
		||||
  for a period of time specified in the config.
 | 
			
		||||
 | 
			
		||||
  Always returns {:ok, new_reg_data}
 | 
			
		||||
  """
 | 
			
		||||
  def set_host_rejected(uri) do
 | 
			
		||||
    new_reg_data =
 | 
			
		||||
      uri
 | 
			
		||||
      |> SocketInfo.origin()
 | 
			
		||||
      |> get_or_create_registry_data()
 | 
			
		||||
      |> set_to_rejected()
 | 
			
		||||
      |> save_registry_data()
 | 
			
		||||
 | 
			
		||||
    {:ok, new_reg_data}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Retrieves the FedRegistryData from the Registry given it's origin.
 | 
			
		||||
 | 
			
		||||
  The origin is expected to be a string identifying the endpoint "example.com" or "example2.com:8080"
 | 
			
		||||
 | 
			
		||||
  Will return:
 | 
			
		||||
    * {:ok, fed_registry_data} for known origins
 | 
			
		||||
    * {:error, :missing} for uniknown origins
 | 
			
		||||
    * {:error, :cache_error} indicating some low level runtime issues
 | 
			
		||||
  """
 | 
			
		||||
  def get_registry_data(origin) do
 | 
			
		||||
    case Registry.lookup(FedSockets.Registry, origin) do
 | 
			
		||||
      [] ->
 | 
			
		||||
        if is_rejected?(origin) do
 | 
			
		||||
          Logger.debug("previously rejected fedsocket requested")
 | 
			
		||||
          {:error, :rejected}
 | 
			
		||||
        else
 | 
			
		||||
          {:error, :missing}
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
      [{_pid, %{state: :connected} = socket_info}] ->
 | 
			
		||||
        {:ok, socket_info}
 | 
			
		||||
 | 
			
		||||
      _ ->
 | 
			
		||||
        {:error, :cache_error}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Retrieves a map of all sockets from the Registry. The keys are the origins and the values are the corresponding SocketInfo
 | 
			
		||||
  """
 | 
			
		||||
  def list_all do
 | 
			
		||||
    (list_all_connected() ++ list_all_rejected())
 | 
			
		||||
    |> Enum.into(%{})
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp list_all_connected do
 | 
			
		||||
    FedSockets.Registry
 | 
			
		||||
    |> Registry.select([{{:"$1", :_, :"$3"}, [], [{{:"$1", :"$3"}}]}])
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp list_all_rejected do
 | 
			
		||||
    {:ok, keys} = Cachex.keys(@rejections)
 | 
			
		||||
 | 
			
		||||
    {:ok, registry_data} =
 | 
			
		||||
      Cachex.execute(@rejections, fn worker ->
 | 
			
		||||
        Enum.map(keys, fn k -> {k, Cachex.get!(worker, k)} end)
 | 
			
		||||
      end)
 | 
			
		||||
 | 
			
		||||
    registry_data
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp clear_prior_rejection(origin),
 | 
			
		||||
    do: Cachex.del(@rejections, origin)
 | 
			
		||||
 | 
			
		||||
  defp is_rejected?(origin) do
 | 
			
		||||
    case Cachex.get(@rejections, origin) do
 | 
			
		||||
      {:ok, nil} ->
 | 
			
		||||
        false
 | 
			
		||||
 | 
			
		||||
      {:ok, _} ->
 | 
			
		||||
        true
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp get_or_create_registry_data(origin) do
 | 
			
		||||
    case get_registry_data(origin) do
 | 
			
		||||
      {:error, :missing} ->
 | 
			
		||||
        %SocketInfo{origin: origin}
 | 
			
		||||
 | 
			
		||||
      {:ok, socket_info} ->
 | 
			
		||||
        socket_info
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp save_registry_data(%SocketInfo{origin: origin, state: :connected} = socket_info) do
 | 
			
		||||
    {:ok, true} = Registry.update_value(FedSockets.Registry, origin, fn _ -> socket_info end)
 | 
			
		||||
    socket_info
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp save_registry_data(%SocketInfo{origin: origin, state: :rejected} = socket_info) do
 | 
			
		||||
    rejection_expiration =
 | 
			
		||||
      Pleroma.Config.get([:fed_sockets, :rejection_duration], @default_rejection_duration)
 | 
			
		||||
 | 
			
		||||
    {:ok, true} = Cachex.put(@rejections, origin, socket_info, ttl: rejection_expiration)
 | 
			
		||||
    socket_info
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp set_to_rejected(%SocketInfo{} = socket_info),
 | 
			
		||||
    do: %SocketInfo{socket_info | state: :rejected}
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,137 +0,0 @@
 | 
			
		|||
# Pleroma: A lightweight social networking server
 | 
			
		||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | 
			
		||||
# SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 | 
			
		||||
defmodule Pleroma.Web.FedSockets.FedSocket do
 | 
			
		||||
  @moduledoc """
 | 
			
		||||
  The FedSocket module abstracts the actions to be taken taken on connections regardless of
 | 
			
		||||
  whether the connection started as inbound or outbound.
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
  Normally outside modules will have no need to call the FedSocket module directly.
 | 
			
		||||
  """
 | 
			
		||||
 | 
			
		||||
  alias Pleroma.Object
 | 
			
		||||
  alias Pleroma.Object.Containment
 | 
			
		||||
  alias Pleroma.User
 | 
			
		||||
  alias Pleroma.Web.ActivityPub.ObjectView
 | 
			
		||||
  alias Pleroma.Web.ActivityPub.UserView
 | 
			
		||||
  alias Pleroma.Web.ActivityPub.Visibility
 | 
			
		||||
  alias Pleroma.Web.FedSockets.FetchRegistry
 | 
			
		||||
  alias Pleroma.Web.FedSockets.IngesterWorker
 | 
			
		||||
  alias Pleroma.Web.FedSockets.OutgoingHandler
 | 
			
		||||
  alias Pleroma.Web.FedSockets.SocketInfo
 | 
			
		||||
 | 
			
		||||
  require Logger
 | 
			
		||||
 | 
			
		||||
  @shake "61dd18f7-f1e6-49a4-939a-a749fcdc1103"
 | 
			
		||||
 | 
			
		||||
  def connect_to_host(uri) do
 | 
			
		||||
    case OutgoingHandler.start_link(uri) do
 | 
			
		||||
      {:ok, pid} ->
 | 
			
		||||
        {:ok, pid}
 | 
			
		||||
 | 
			
		||||
      error ->
 | 
			
		||||
        {:error, error}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def close(%SocketInfo{pid: socket_pid}),
 | 
			
		||||
    do: Process.send(socket_pid, :close, [])
 | 
			
		||||
 | 
			
		||||
  def publish(%SocketInfo{pid: socket_pid}, json) do
 | 
			
		||||
    %{action: :publish, data: json}
 | 
			
		||||
    |> Jason.encode!()
 | 
			
		||||
    |> send_packet(socket_pid)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def fetch(%SocketInfo{pid: socket_pid}, id) do
 | 
			
		||||
    fetch_uuid = FetchRegistry.register_fetch(id)
 | 
			
		||||
 | 
			
		||||
    %{action: :fetch, data: id, uuid: fetch_uuid}
 | 
			
		||||
    |> Jason.encode!()
 | 
			
		||||
    |> send_packet(socket_pid)
 | 
			
		||||
 | 
			
		||||
    wait_for_fetch_to_return(fetch_uuid, 0)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def receive_package(%SocketInfo{} = fed_socket, json) do
 | 
			
		||||
    json
 | 
			
		||||
    |> Jason.decode!()
 | 
			
		||||
    |> process_package(fed_socket)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp wait_for_fetch_to_return(uuid, cntr) do
 | 
			
		||||
    case FetchRegistry.check_fetch(uuid) do
 | 
			
		||||
      {:error, :waiting} ->
 | 
			
		||||
        Process.sleep(:math.pow(cntr, 3) |> Kernel.trunc())
 | 
			
		||||
        wait_for_fetch_to_return(uuid, cntr + 1)
 | 
			
		||||
 | 
			
		||||
      {:error, :missing} ->
 | 
			
		||||
        Logger.error("FedSocket fetch timed out - #{inspect(uuid)}")
 | 
			
		||||
        {:error, :timeout}
 | 
			
		||||
 | 
			
		||||
      {:ok, _fr} ->
 | 
			
		||||
        FetchRegistry.pop_fetch(uuid)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp process_package(%{"action" => "publish", "data" => data}, %{origin: origin} = _fed_socket) do
 | 
			
		||||
    if Containment.contain_origin(origin, data) do
 | 
			
		||||
      IngesterWorker.enqueue("ingest", %{"object" => data})
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    {:reply, %{"action" => "publish_reply", "status" => "processed"}}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp process_package(%{"action" => "fetch_reply", "uuid" => uuid, "data" => data}, _fed_socket) do
 | 
			
		||||
    FetchRegistry.register_fetch_received(uuid, data)
 | 
			
		||||
    {:noreply, nil}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp process_package(%{"action" => "fetch", "uuid" => uuid, "data" => ap_id}, _fed_socket) do
 | 
			
		||||
    {:ok, data} = render_fetched_data(ap_id, uuid)
 | 
			
		||||
    {:reply, data}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp process_package(%{"action" => "publish_reply"}, _fed_socket) do
 | 
			
		||||
    {:noreply, nil}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp process_package(other, _fed_socket) do
 | 
			
		||||
    Logger.warn("unknown json packages received #{inspect(other)}")
 | 
			
		||||
    {:noreply, nil}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp render_fetched_data(ap_id, uuid) do
 | 
			
		||||
    {:ok,
 | 
			
		||||
     %{
 | 
			
		||||
       "action" => "fetch_reply",
 | 
			
		||||
       "status" => "processed",
 | 
			
		||||
       "uuid" => uuid,
 | 
			
		||||
       "data" => represent_item(ap_id)
 | 
			
		||||
     }}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp represent_item(ap_id) do
 | 
			
		||||
    case User.get_by_ap_id(ap_id) do
 | 
			
		||||
      nil ->
 | 
			
		||||
        object = Object.get_cached_by_ap_id(ap_id)
 | 
			
		||||
 | 
			
		||||
        if Visibility.is_public?(object) do
 | 
			
		||||
          Phoenix.View.render_to_string(ObjectView, "object.json", object: object)
 | 
			
		||||
        else
 | 
			
		||||
          nil
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
      user ->
 | 
			
		||||
        Phoenix.View.render_to_string(UserView, "user.json", user: user)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp send_packet(data, socket_pid) do
 | 
			
		||||
    Process.send(socket_pid, {:send, data}, [])
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def shake, do: @shake
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,151 +0,0 @@
 | 
			
		|||
# Pleroma: A lightweight social networking server
 | 
			
		||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | 
			
		||||
# SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 | 
			
		||||
defmodule Pleroma.Web.FedSockets.FetchRegistry do
 | 
			
		||||
  @moduledoc """
 | 
			
		||||
  The FetchRegistry acts as a broker for fetch requests and return values.
 | 
			
		||||
  This allows calling processes to block while waiting for a reply.
 | 
			
		||||
  It doesn't impose it's own process instead using `Cachex` to handle fetches in process, allowing
 | 
			
		||||
  multi threaded processes to avoid bottlenecking.
 | 
			
		||||
 | 
			
		||||
  Normally outside modules will have no need to call or use the FetchRegistry themselves.
 | 
			
		||||
 | 
			
		||||
  The `Cachex` parameters can be controlled from the config. Since exact timeout intervals
 | 
			
		||||
  aren't necessary the following settings are used by default:
 | 
			
		||||
 | 
			
		||||
  config :pleroma, :fed_sockets,
 | 
			
		||||
    fed_socket_fetches: [
 | 
			
		||||
      default: 12_000,
 | 
			
		||||
      interval: 3_000,
 | 
			
		||||
      lazy: false
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
  """
 | 
			
		||||
 | 
			
		||||
  defmodule FetchRegistryData do
 | 
			
		||||
    defstruct uuid: nil,
 | 
			
		||||
              sent_json: nil,
 | 
			
		||||
              received_json: nil,
 | 
			
		||||
              sent_at: nil,
 | 
			
		||||
              received_at: nil
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  alias Ecto.UUID
 | 
			
		||||
 | 
			
		||||
  require Logger
 | 
			
		||||
 | 
			
		||||
  @fetches :fed_socket_fetches
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Registers a json request wth the FetchRegistry and returns the identifying UUID.
 | 
			
		||||
  """
 | 
			
		||||
  def register_fetch(json) do
 | 
			
		||||
    %FetchRegistryData{uuid: uuid} =
 | 
			
		||||
      json
 | 
			
		||||
      |> new_registry_data
 | 
			
		||||
      |> save_registry_data
 | 
			
		||||
 | 
			
		||||
    uuid
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Reports on the status of a Fetch given the identifying UUID.
 | 
			
		||||
 | 
			
		||||
  Will return
 | 
			
		||||
    * {:ok, fetched_object} if a fetch has completed
 | 
			
		||||
    * {:error, :waiting} if a fetch is still pending
 | 
			
		||||
    * {:error, other_error} usually :missing to indicate a fetch that has timed out
 | 
			
		||||
  """
 | 
			
		||||
  def check_fetch(uuid) do
 | 
			
		||||
    case get_registry_data(uuid) do
 | 
			
		||||
      {:ok, %FetchRegistryData{received_at: nil}} ->
 | 
			
		||||
        {:error, :waiting}
 | 
			
		||||
 | 
			
		||||
      {:ok, %FetchRegistryData{} = reg_data} ->
 | 
			
		||||
        {:ok, reg_data}
 | 
			
		||||
 | 
			
		||||
      e ->
 | 
			
		||||
        e
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  Retrieves the response to a fetch given the identifying UUID.
 | 
			
		||||
  The completed fetch will be deleted from the FetchRegistry
 | 
			
		||||
 | 
			
		||||
  Will return
 | 
			
		||||
    * {:ok, fetched_object} if a fetch has completed
 | 
			
		||||
    * {:error, :waiting} if a fetch is still pending
 | 
			
		||||
    * {:error, other_error} usually :missing to indicate a fetch that has timed out
 | 
			
		||||
  """
 | 
			
		||||
  def pop_fetch(uuid) do
 | 
			
		||||
    case check_fetch(uuid) do
 | 
			
		||||
      {:ok, %FetchRegistryData{received_json: received_json}} ->
 | 
			
		||||
        delete_registry_data(uuid)
 | 
			
		||||
        {:ok, received_json}
 | 
			
		||||
 | 
			
		||||
      e ->
 | 
			
		||||
        e
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  @doc """
 | 
			
		||||
  This is called to register a fetch has returned.
 | 
			
		||||
  It expects the result data along with the UUID that was sent in the request
 | 
			
		||||
 | 
			
		||||
  Will return the fetched object or :error
 | 
			
		||||
  """
 | 
			
		||||
  def register_fetch_received(uuid, data) do
 | 
			
		||||
    case get_registry_data(uuid) do
 | 
			
		||||
      {:ok, %FetchRegistryData{received_at: nil} = reg_data} ->
 | 
			
		||||
        reg_data
 | 
			
		||||
        |> set_fetch_received(data)
 | 
			
		||||
        |> save_registry_data()
 | 
			
		||||
 | 
			
		||||
      {:ok, %FetchRegistryData{} = reg_data} ->
 | 
			
		||||
        Logger.warn("tried to add fetched data twice - #{uuid}")
 | 
			
		||||
        reg_data
 | 
			
		||||
 | 
			
		||||
      {:error, _} ->
 | 
			
		||||
        Logger.warn("Error adding fetch to registry - #{uuid}")
 | 
			
		||||
        :error
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp new_registry_data(json) do
 | 
			
		||||
    %FetchRegistryData{
 | 
			
		||||
      uuid: UUID.generate(),
 | 
			
		||||
      sent_json: json,
 | 
			
		||||
      sent_at: :erlang.monotonic_time(:millisecond)
 | 
			
		||||
    }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp get_registry_data(origin) do
 | 
			
		||||
    case Cachex.get(@fetches, origin) do
 | 
			
		||||
      {:ok, nil} ->
 | 
			
		||||
        {:error, :missing}
 | 
			
		||||
 | 
			
		||||
      {:ok, reg_data} ->
 | 
			
		||||
        {:ok, reg_data}
 | 
			
		||||
 | 
			
		||||
      _ ->
 | 
			
		||||
        {:error, :cache_error}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp set_fetch_received(%FetchRegistryData{} = reg_data, data),
 | 
			
		||||
    do: %FetchRegistryData{
 | 
			
		||||
      reg_data
 | 
			
		||||
      | received_at: :erlang.monotonic_time(:millisecond),
 | 
			
		||||
        received_json: data
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
  defp save_registry_data(%FetchRegistryData{uuid: uuid} = reg_data) do
 | 
			
		||||
    {:ok, true} = Cachex.put(@fetches, uuid, reg_data)
 | 
			
		||||
    reg_data
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp delete_registry_data(origin),
 | 
			
		||||
    do: {:ok, true} = Cachex.del(@fetches, origin)
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,88 +0,0 @@
 | 
			
		|||
# Pleroma: A lightweight social networking server
 | 
			
		||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | 
			
		||||
# SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 | 
			
		||||
defmodule Pleroma.Web.FedSockets.IncomingHandler do
 | 
			
		||||
  require Logger
 | 
			
		||||
 | 
			
		||||
  alias Pleroma.Web.FedSockets.FedRegistry
 | 
			
		||||
  alias Pleroma.Web.FedSockets.FedSocket
 | 
			
		||||
  alias Pleroma.Web.FedSockets.SocketInfo
 | 
			
		||||
 | 
			
		||||
  import HTTPSignatures, only: [validate_conn: 1, split_signature: 1]
 | 
			
		||||
 | 
			
		||||
  @behaviour :cowboy_websocket
 | 
			
		||||
 | 
			
		||||
  def init(req, state) do
 | 
			
		||||
    shake = FedSocket.shake()
 | 
			
		||||
 | 
			
		||||
    with true <- Pleroma.Config.get([:fed_sockets, :enabled]),
 | 
			
		||||
         sec_protocol <- :cowboy_req.header("sec-websocket-protocol", req, nil),
 | 
			
		||||
         headers = %{"(request-target)" => ^shake} <- :cowboy_req.headers(req),
 | 
			
		||||
         true <- validate_conn(%{req_headers: headers}),
 | 
			
		||||
         %{"keyId" => origin} <- split_signature(headers["signature"]) do
 | 
			
		||||
      req =
 | 
			
		||||
        if is_nil(sec_protocol) do
 | 
			
		||||
          req
 | 
			
		||||
        else
 | 
			
		||||
          :cowboy_req.set_resp_header("sec-websocket-protocol", sec_protocol, req)
 | 
			
		||||
        end
 | 
			
		||||
 | 
			
		||||
      {:cowboy_websocket, req, %{origin: origin}, %{}}
 | 
			
		||||
    else
 | 
			
		||||
      _ ->
 | 
			
		||||
        {:ok, req, state}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def websocket_init(%{origin: origin}) do
 | 
			
		||||
    case FedRegistry.add_fed_socket(origin) do
 | 
			
		||||
      {:ok, socket_info} ->
 | 
			
		||||
        {:ok, socket_info}
 | 
			
		||||
 | 
			
		||||
      e ->
 | 
			
		||||
        Logger.error("FedSocket websocket_init failed - #{inspect(e)}")
 | 
			
		||||
        {:error, inspect(e)}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  # Use the ping to  check if the connection should be expired
 | 
			
		||||
  def websocket_handle(:ping, socket_info) do
 | 
			
		||||
    if SocketInfo.expired?(socket_info) do
 | 
			
		||||
      {:stop, socket_info}
 | 
			
		||||
    else
 | 
			
		||||
      {:ok, socket_info, :hibernate}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def websocket_handle({:text, data}, socket_info) do
 | 
			
		||||
    socket_info = SocketInfo.touch(socket_info)
 | 
			
		||||
 | 
			
		||||
    case FedSocket.receive_package(socket_info, data) do
 | 
			
		||||
      {:noreply, _} ->
 | 
			
		||||
        {:ok, socket_info}
 | 
			
		||||
 | 
			
		||||
      {:reply, reply} ->
 | 
			
		||||
        {:reply, {:text, Jason.encode!(reply)}, socket_info}
 | 
			
		||||
 | 
			
		||||
      {:error, reason} ->
 | 
			
		||||
        Logger.error("incoming error - receive_package: #{inspect(reason)}")
 | 
			
		||||
        {:ok, socket_info}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def websocket_info({:send, message}, socket_info) do
 | 
			
		||||
    socket_info = SocketInfo.touch(socket_info)
 | 
			
		||||
 | 
			
		||||
    {:reply, {:text, message}, socket_info}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def websocket_info(:close, state) do
 | 
			
		||||
    {:stop, state}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def websocket_info(message, state) do
 | 
			
		||||
    Logger.debug("#{__MODULE__} unknown message #{inspect(message)}")
 | 
			
		||||
    {:ok, state}
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,33 +0,0 @@
 | 
			
		|||
# Pleroma: A lightweight social networking server
 | 
			
		||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | 
			
		||||
# SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 | 
			
		||||
defmodule Pleroma.Web.FedSockets.IngesterWorker do
 | 
			
		||||
  use Pleroma.Workers.WorkerHelper, queue: "ingestion_queue"
 | 
			
		||||
  require Logger
 | 
			
		||||
 | 
			
		||||
  alias Pleroma.Web.Federator
 | 
			
		||||
 | 
			
		||||
  @impl Oban.Worker
 | 
			
		||||
  def perform(%Job{args: %{"op" => "ingest", "object" => ingestee}}) do
 | 
			
		||||
    try do
 | 
			
		||||
      ingestee
 | 
			
		||||
      |> Jason.decode!()
 | 
			
		||||
      |> do_ingestion()
 | 
			
		||||
    rescue
 | 
			
		||||
      e ->
 | 
			
		||||
        Logger.error("IngesterWorker error - #{inspect(e)}")
 | 
			
		||||
        e
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp do_ingestion(params) do
 | 
			
		||||
    case Federator.incoming_ap_doc(params) do
 | 
			
		||||
      {:error, reason} ->
 | 
			
		||||
        {:error, reason}
 | 
			
		||||
 | 
			
		||||
      {:ok, object} ->
 | 
			
		||||
        {:ok, object}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,151 +0,0 @@
 | 
			
		|||
# Pleroma: A lightweight social networking server
 | 
			
		||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | 
			
		||||
# SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 | 
			
		||||
defmodule Pleroma.Web.FedSockets.OutgoingHandler do
 | 
			
		||||
  use GenServer
 | 
			
		||||
 | 
			
		||||
  require Logger
 | 
			
		||||
 | 
			
		||||
  alias Pleroma.Application
 | 
			
		||||
  alias Pleroma.Web.ActivityPub.InternalFetchActor
 | 
			
		||||
  alias Pleroma.Web.FedSockets
 | 
			
		||||
  alias Pleroma.Web.FedSockets.FedRegistry
 | 
			
		||||
  alias Pleroma.Web.FedSockets.FedSocket
 | 
			
		||||
  alias Pleroma.Web.FedSockets.SocketInfo
 | 
			
		||||
 | 
			
		||||
  def start_link(uri) do
 | 
			
		||||
    GenServer.start_link(__MODULE__, %{uri: uri})
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def init(%{uri: uri}) do
 | 
			
		||||
    case initiate_connection(uri) do
 | 
			
		||||
      {:ok, ws_origin, conn_pid} ->
 | 
			
		||||
        FedRegistry.add_fed_socket(ws_origin, conn_pid)
 | 
			
		||||
 | 
			
		||||
      {:error, reason} ->
 | 
			
		||||
        Logger.debug("Outgoing connection failed - #{inspect(reason)}")
 | 
			
		||||
        :ignore
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def handle_info({:gun_ws, conn_pid, _ref, {:text, data}}, socket_info) do
 | 
			
		||||
    socket_info = SocketInfo.touch(socket_info)
 | 
			
		||||
 | 
			
		||||
    case FedSocket.receive_package(socket_info, data) do
 | 
			
		||||
      {:noreply, _} ->
 | 
			
		||||
        {:noreply, socket_info}
 | 
			
		||||
 | 
			
		||||
      {:reply, reply} ->
 | 
			
		||||
        :gun.ws_send(conn_pid, {:text, Jason.encode!(reply)})
 | 
			
		||||
        {:noreply, socket_info}
 | 
			
		||||
 | 
			
		||||
      {:error, reason} ->
 | 
			
		||||
        Logger.error("incoming error - receive_package: #{inspect(reason)}")
 | 
			
		||||
        {:noreply, socket_info}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def handle_info(:close, state) do
 | 
			
		||||
    Logger.debug("Sending close frame !!!!!!!")
 | 
			
		||||
    {:close, state}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def handle_info({:gun_down, _pid, _prot, :closed, _}, state) do
 | 
			
		||||
    {:stop, :normal, state}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def handle_info({:send, data}, %{conn_pid: conn_pid} = socket_info) do
 | 
			
		||||
    socket_info = SocketInfo.touch(socket_info)
 | 
			
		||||
    :gun.ws_send(conn_pid, {:text, data})
 | 
			
		||||
    {:noreply, socket_info}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def handle_info({:gun_ws, _, _, :pong}, state) do
 | 
			
		||||
    {:noreply, state, :hibernate}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def handle_info(msg, state) do
 | 
			
		||||
    Logger.debug("#{__MODULE__} unhandled event #{inspect(msg)}")
 | 
			
		||||
    {:noreply, state}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def terminate(reason, state) do
 | 
			
		||||
    Logger.debug(
 | 
			
		||||
      "#{__MODULE__} terminating outgoing connection for #{inspect(state)} for #{inspect(reason)}"
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    {:ok, state}
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def initiate_connection(uri) do
 | 
			
		||||
    ws_uri =
 | 
			
		||||
      uri
 | 
			
		||||
      |> SocketInfo.origin()
 | 
			
		||||
      |> FedSockets.uri_for_origin()
 | 
			
		||||
 | 
			
		||||
    %{host: host, port: port, path: path} = URI.parse(ws_uri)
 | 
			
		||||
 | 
			
		||||
    with {:ok, conn_pid} <- :gun.open(to_charlist(host), port, %{protocols: [:http]}),
 | 
			
		||||
         {:ok, _} <- :gun.await_up(conn_pid),
 | 
			
		||||
         reference <-
 | 
			
		||||
           :gun.get(conn_pid, to_charlist(path), [
 | 
			
		||||
             {'user-agent', to_charlist(Application.user_agent())}
 | 
			
		||||
           ]),
 | 
			
		||||
         {:response, :fin, 204, _} <- :gun.await(conn_pid, reference),
 | 
			
		||||
         headers <- build_headers(uri),
 | 
			
		||||
         ref <- :gun.ws_upgrade(conn_pid, to_charlist(path), headers, %{silence_pings: false}) do
 | 
			
		||||
      receive do
 | 
			
		||||
        {:gun_upgrade, ^conn_pid, ^ref, [<<"websocket">>], _} ->
 | 
			
		||||
          {:ok, ws_uri, conn_pid}
 | 
			
		||||
      after
 | 
			
		||||
        15_000 ->
 | 
			
		||||
          Logger.debug("Fedsocket timeout connecting to #{inspect(uri)}")
 | 
			
		||||
          {:error, :timeout}
 | 
			
		||||
      end
 | 
			
		||||
    else
 | 
			
		||||
      {:response, :nofin, 404, _} ->
 | 
			
		||||
        {:error, :fedsockets_not_supported}
 | 
			
		||||
 | 
			
		||||
      e ->
 | 
			
		||||
        Logger.debug("Fedsocket error connecting to #{inspect(uri)}")
 | 
			
		||||
        {:error, e}
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp build_headers(uri) do
 | 
			
		||||
    host_for_sig = uri |> URI.parse() |> host_signature()
 | 
			
		||||
 | 
			
		||||
    shake = FedSocket.shake()
 | 
			
		||||
    digest = "SHA-256=" <> (:crypto.hash(:sha256, shake) |> Base.encode64())
 | 
			
		||||
    date = Pleroma.Signature.signed_date()
 | 
			
		||||
    shake_size = byte_size(shake)
 | 
			
		||||
 | 
			
		||||
    signature_opts = %{
 | 
			
		||||
      "(request-target)": shake,
 | 
			
		||||
      "content-length": to_charlist("#{shake_size}"),
 | 
			
		||||
      date: date,
 | 
			
		||||
      digest: digest,
 | 
			
		||||
      host: host_for_sig
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    signature = Pleroma.Signature.sign(InternalFetchActor.get_actor(), signature_opts)
 | 
			
		||||
 | 
			
		||||
    [
 | 
			
		||||
      {'signature', to_charlist(signature)},
 | 
			
		||||
      {'date', date},
 | 
			
		||||
      {'digest', to_charlist(digest)},
 | 
			
		||||
      {'content-length', to_charlist("#{shake_size}")},
 | 
			
		||||
      {to_charlist("(request-target)"), to_charlist(shake)},
 | 
			
		||||
      {'user-agent', to_charlist(Application.user_agent())}
 | 
			
		||||
    ]
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp host_signature(%{host: host, scheme: scheme, port: port}) do
 | 
			
		||||
    if port == URI.default_port(scheme) do
 | 
			
		||||
      host
 | 
			
		||||
    else
 | 
			
		||||
      "#{host}:#{port}"
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,52 +0,0 @@
 | 
			
		|||
# Pleroma: A lightweight social networking server
 | 
			
		||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | 
			
		||||
# SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 | 
			
		||||
defmodule Pleroma.Web.FedSockets.SocketInfo do
 | 
			
		||||
  defstruct origin: nil,
 | 
			
		||||
            pid: nil,
 | 
			
		||||
            conn_pid: nil,
 | 
			
		||||
            state: :default,
 | 
			
		||||
            connected_until: nil
 | 
			
		||||
 | 
			
		||||
  alias Pleroma.Web.FedSockets.SocketInfo
 | 
			
		||||
  @default_connection_duration 15 * 60 * 1000
 | 
			
		||||
 | 
			
		||||
  def build(uri, conn_pid \\ nil) do
 | 
			
		||||
    uri
 | 
			
		||||
    |> build_origin()
 | 
			
		||||
    |> build_pids(conn_pid)
 | 
			
		||||
    |> touch()
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def touch(%SocketInfo{} = socket_info),
 | 
			
		||||
    do: %{socket_info | connected_until: new_ttl()}
 | 
			
		||||
 | 
			
		||||
  def connect(%SocketInfo{} = socket_info),
 | 
			
		||||
    do: %{socket_info | state: :connected}
 | 
			
		||||
 | 
			
		||||
  def expired?(%{connected_until: connected_until}),
 | 
			
		||||
    do: connected_until < :erlang.monotonic_time(:millisecond)
 | 
			
		||||
 | 
			
		||||
  def origin(uri),
 | 
			
		||||
    do: build_origin(uri).origin
 | 
			
		||||
 | 
			
		||||
  defp build_pids(socket_info, conn_pid),
 | 
			
		||||
    do: struct(socket_info, pid: self(), conn_pid: conn_pid)
 | 
			
		||||
 | 
			
		||||
  defp build_origin(uri) when is_binary(uri),
 | 
			
		||||
    do: uri |> URI.parse() |> build_origin
 | 
			
		||||
 | 
			
		||||
  defp build_origin(%{host: host, port: nil, scheme: scheme}),
 | 
			
		||||
    do: build_origin(%{host: host, port: URI.default_port(scheme)})
 | 
			
		||||
 | 
			
		||||
  defp build_origin(%{host: host, port: port}),
 | 
			
		||||
    do: %SocketInfo{origin: "#{host}:#{port}"}
 | 
			
		||||
 | 
			
		||||
  defp new_ttl do
 | 
			
		||||
    connection_duration =
 | 
			
		||||
      Pleroma.Config.get([:fed_sockets, :connection_duration], @default_connection_duration)
 | 
			
		||||
 | 
			
		||||
    :erlang.monotonic_time(:millisecond) + connection_duration
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,59 +0,0 @@
 | 
			
		|||
# Pleroma: A lightweight social networking server
 | 
			
		||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | 
			
		||||
# SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 | 
			
		||||
defmodule Pleroma.Web.FedSockets.Supervisor do
 | 
			
		||||
  use Supervisor
 | 
			
		||||
  import Cachex.Spec
 | 
			
		||||
 | 
			
		||||
  def start_link(opts) do
 | 
			
		||||
    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def init(args) do
 | 
			
		||||
    children = [
 | 
			
		||||
      build_cache(:fed_socket_fetches, args),
 | 
			
		||||
      build_cache(:fed_socket_rejections, args),
 | 
			
		||||
      {Registry, keys: :unique, name: FedSockets.Registry, meta: [rejected: %{}]}
 | 
			
		||||
    ]
 | 
			
		||||
 | 
			
		||||
    opts = [strategy: :one_for_all, name: Pleroma.Web.Streamer.Supervisor]
 | 
			
		||||
    Supervisor.init(children, opts)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp build_cache(name, args) do
 | 
			
		||||
    opts = get_opts(name, args)
 | 
			
		||||
 | 
			
		||||
    %{
 | 
			
		||||
      id: String.to_atom("#{name}_cache"),
 | 
			
		||||
      start: {Cachex, :start_link, [name, opts]},
 | 
			
		||||
      type: :worker
 | 
			
		||||
    }
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp get_opts(cache_name, args)
 | 
			
		||||
       when cache_name in [:fed_socket_fetches, :fed_socket_rejections] do
 | 
			
		||||
    default = get_opts_or_config(args, cache_name, :default, 15_000)
 | 
			
		||||
    interval = get_opts_or_config(args, cache_name, :interval, 3_000)
 | 
			
		||||
    lazy = get_opts_or_config(args, cache_name, :lazy, false)
 | 
			
		||||
 | 
			
		||||
    [expiration: expiration(default: default, interval: interval, lazy: lazy)]
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp get_opts(name, args) do
 | 
			
		||||
    Keyword.get(args, name, [])
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  defp get_opts_or_config(args, name, key, default) do
 | 
			
		||||
    args
 | 
			
		||||
    |> Keyword.get(name, [])
 | 
			
		||||
    |> Keyword.get(key)
 | 
			
		||||
    |> case do
 | 
			
		||||
      nil ->
 | 
			
		||||
        Pleroma.Config.get([:fed_sockets, name, key], default)
 | 
			
		||||
 | 
			
		||||
      value ->
 | 
			
		||||
        value
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,124 +0,0 @@
 | 
			
		|||
# Pleroma: A lightweight social networking server
 | 
			
		||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | 
			
		||||
# SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 | 
			
		||||
defmodule Pleroma.Web.FedSockets.FedRegistryTest do
 | 
			
		||||
  use ExUnit.Case
 | 
			
		||||
 | 
			
		||||
  alias Pleroma.Web.FedSockets
 | 
			
		||||
  alias Pleroma.Web.FedSockets.FedRegistry
 | 
			
		||||
  alias Pleroma.Web.FedSockets.SocketInfo
 | 
			
		||||
 | 
			
		||||
  @good_domain "http://good.domain"
 | 
			
		||||
  @good_domain_origin "good.domain:80"
 | 
			
		||||
 | 
			
		||||
  setup do
 | 
			
		||||
    start_supervised({Pleroma.Web.FedSockets.Supervisor, []})
 | 
			
		||||
    build_test_socket(@good_domain)
 | 
			
		||||
    Process.sleep(10)
 | 
			
		||||
 | 
			
		||||
    :ok
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe "add_fed_socket/1 without conflicting sockets" do
 | 
			
		||||
    test "can be added" do
 | 
			
		||||
      Process.sleep(10)
 | 
			
		||||
      assert {:ok, %SocketInfo{origin: origin}} = FedRegistry.get_fed_socket(@good_domain_origin)
 | 
			
		||||
      assert origin == "good.domain:80"
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "multiple origins can be added" do
 | 
			
		||||
      build_test_socket("http://anothergood.domain")
 | 
			
		||||
      Process.sleep(10)
 | 
			
		||||
 | 
			
		||||
      assert {:ok, %SocketInfo{origin: origin_1}} =
 | 
			
		||||
               FedRegistry.get_fed_socket(@good_domain_origin)
 | 
			
		||||
 | 
			
		||||
      assert {:ok, %SocketInfo{origin: origin_2}} =
 | 
			
		||||
               FedRegistry.get_fed_socket("anothergood.domain:80")
 | 
			
		||||
 | 
			
		||||
      assert origin_1 == "good.domain:80"
 | 
			
		||||
      assert origin_2 == "anothergood.domain:80"
 | 
			
		||||
      assert FedRegistry.list_all() |> Enum.count() == 2
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe "add_fed_socket/1 when duplicate sockets conflict" do
 | 
			
		||||
    setup do
 | 
			
		||||
      build_test_socket(@good_domain)
 | 
			
		||||
      build_test_socket(@good_domain)
 | 
			
		||||
      Process.sleep(10)
 | 
			
		||||
      :ok
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "will be ignored" do
 | 
			
		||||
      assert {:ok, %SocketInfo{origin: origin, pid: _pid_one}} =
 | 
			
		||||
               FedRegistry.get_fed_socket(@good_domain_origin)
 | 
			
		||||
 | 
			
		||||
      assert origin == "good.domain:80"
 | 
			
		||||
 | 
			
		||||
      assert FedRegistry.list_all() |> Enum.count() == 1
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "the newer process will be closed" do
 | 
			
		||||
      pid_two = build_test_socket(@good_domain)
 | 
			
		||||
 | 
			
		||||
      assert {:ok, %SocketInfo{origin: origin, pid: _pid_one}} =
 | 
			
		||||
               FedRegistry.get_fed_socket(@good_domain_origin)
 | 
			
		||||
 | 
			
		||||
      assert origin == "good.domain:80"
 | 
			
		||||
      Process.sleep(10)
 | 
			
		||||
 | 
			
		||||
      refute Process.alive?(pid_two)
 | 
			
		||||
 | 
			
		||||
      assert FedRegistry.list_all() |> Enum.count() == 1
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe "get_fed_socket/1" do
 | 
			
		||||
    test "returns missing for unknown hosts" do
 | 
			
		||||
      assert {:error, :missing} = FedRegistry.get_fed_socket("not_a_dmoain")
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "returns rejected for hosts previously rejected" do
 | 
			
		||||
      "rejected.domain:80"
 | 
			
		||||
      |> FedSockets.uri_for_origin()
 | 
			
		||||
      |> FedRegistry.set_host_rejected()
 | 
			
		||||
 | 
			
		||||
      assert {:error, :rejected} = FedRegistry.get_fed_socket("rejected.domain:80")
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "can retrieve a previously added SocketInfo" do
 | 
			
		||||
      build_test_socket(@good_domain)
 | 
			
		||||
      Process.sleep(10)
 | 
			
		||||
      assert {:ok, %SocketInfo{origin: origin}} = FedRegistry.get_fed_socket(@good_domain_origin)
 | 
			
		||||
      assert origin == "good.domain:80"
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "removes references to SocketInfos when the process crashes" do
 | 
			
		||||
      assert {:ok, %SocketInfo{origin: origin, pid: pid}} =
 | 
			
		||||
               FedRegistry.get_fed_socket(@good_domain_origin)
 | 
			
		||||
 | 
			
		||||
      assert origin == "good.domain:80"
 | 
			
		||||
 | 
			
		||||
      Process.exit(pid, :testing)
 | 
			
		||||
      Process.sleep(100)
 | 
			
		||||
      assert {:error, :missing} = FedRegistry.get_fed_socket(@good_domain_origin)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def build_test_socket(uri) do
 | 
			
		||||
    Kernel.spawn(fn -> fed_socket_almost(uri) end)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  def fed_socket_almost(origin) do
 | 
			
		||||
    FedRegistry.add_fed_socket(origin)
 | 
			
		||||
 | 
			
		||||
    receive do
 | 
			
		||||
      :close ->
 | 
			
		||||
        :ok
 | 
			
		||||
    after
 | 
			
		||||
      5_000 -> :timeout
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,67 +0,0 @@
 | 
			
		|||
# Pleroma: A lightweight social networking server
 | 
			
		||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | 
			
		||||
# SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 | 
			
		||||
defmodule Pleroma.Web.FedSockets.FetchRegistryTest do
 | 
			
		||||
  use ExUnit.Case
 | 
			
		||||
 | 
			
		||||
  alias Pleroma.Web.FedSockets.FetchRegistry
 | 
			
		||||
  alias Pleroma.Web.FedSockets.FetchRegistry.FetchRegistryData
 | 
			
		||||
 | 
			
		||||
  @json_message "hello"
 | 
			
		||||
  @json_reply "hello back"
 | 
			
		||||
 | 
			
		||||
  setup do
 | 
			
		||||
    start_supervised(
 | 
			
		||||
      {Pleroma.Web.FedSockets.Supervisor,
 | 
			
		||||
       [
 | 
			
		||||
         ping_interval: 8,
 | 
			
		||||
         connection_duration: 15,
 | 
			
		||||
         rejection_duration: 5,
 | 
			
		||||
         fed_socket_fetches: [default: 10, interval: 10]
 | 
			
		||||
       ]}
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    :ok
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  test "fetches can be stored" do
 | 
			
		||||
    uuid = FetchRegistry.register_fetch(@json_message)
 | 
			
		||||
 | 
			
		||||
    assert {:error, :waiting} = FetchRegistry.check_fetch(uuid)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  test "fetches can return" do
 | 
			
		||||
    uuid = FetchRegistry.register_fetch(@json_message)
 | 
			
		||||
    task = Task.async(fn -> FetchRegistry.register_fetch_received(uuid, @json_reply) end)
 | 
			
		||||
 | 
			
		||||
    assert {:error, :waiting} = FetchRegistry.check_fetch(uuid)
 | 
			
		||||
    Task.await(task)
 | 
			
		||||
 | 
			
		||||
    assert {:ok, %FetchRegistryData{received_json: received_json}} =
 | 
			
		||||
             FetchRegistry.check_fetch(uuid)
 | 
			
		||||
 | 
			
		||||
    assert received_json == @json_reply
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  test "fetches are deleted once popped from stack" do
 | 
			
		||||
    uuid = FetchRegistry.register_fetch(@json_message)
 | 
			
		||||
    task = Task.async(fn -> FetchRegistry.register_fetch_received(uuid, @json_reply) end)
 | 
			
		||||
    Task.await(task)
 | 
			
		||||
 | 
			
		||||
    assert {:ok, %FetchRegistryData{received_json: received_json}} =
 | 
			
		||||
             FetchRegistry.check_fetch(uuid)
 | 
			
		||||
 | 
			
		||||
    assert received_json == @json_reply
 | 
			
		||||
    assert {:ok, @json_reply} = FetchRegistry.pop_fetch(uuid)
 | 
			
		||||
 | 
			
		||||
    assert {:error, :missing} = FetchRegistry.check_fetch(uuid)
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  test "fetches can time out" do
 | 
			
		||||
    uuid = FetchRegistry.register_fetch(@json_message)
 | 
			
		||||
    assert {:error, :waiting} = FetchRegistry.check_fetch(uuid)
 | 
			
		||||
    Process.sleep(500)
 | 
			
		||||
    assert {:error, :missing} = FetchRegistry.check_fetch(uuid)
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
| 
						 | 
				
			
			@ -1,118 +0,0 @@
 | 
			
		|||
# Pleroma: A lightweight social networking server
 | 
			
		||||
# Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | 
			
		||||
# SPDX-License-Identifier: AGPL-3.0-only
 | 
			
		||||
 | 
			
		||||
defmodule Pleroma.Web.FedSockets.SocketInfoTest do
 | 
			
		||||
  use ExUnit.Case
 | 
			
		||||
 | 
			
		||||
  alias Pleroma.Web.FedSockets
 | 
			
		||||
  alias Pleroma.Web.FedSockets.SocketInfo
 | 
			
		||||
 | 
			
		||||
  describe "uri_for_origin" do
 | 
			
		||||
    test "provides the fed_socket URL given the origin information" do
 | 
			
		||||
      endpoint = "example.com:4000"
 | 
			
		||||
      assert FedSockets.uri_for_origin(endpoint) =~ "ws://"
 | 
			
		||||
      assert FedSockets.uri_for_origin(endpoint) =~ endpoint
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe "origin" do
 | 
			
		||||
    test "will provide the origin field given a url" do
 | 
			
		||||
      endpoint = "example.com:4000"
 | 
			
		||||
      assert SocketInfo.origin("ws://#{endpoint}") == endpoint
 | 
			
		||||
      assert SocketInfo.origin("http://#{endpoint}") == endpoint
 | 
			
		||||
      assert SocketInfo.origin("https://#{endpoint}") == endpoint
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "will proide the origin field given a uri" do
 | 
			
		||||
      endpoint = "example.com:4000"
 | 
			
		||||
      uri = URI.parse("http://#{endpoint}")
 | 
			
		||||
 | 
			
		||||
      assert SocketInfo.origin(uri) == endpoint
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe "touch" do
 | 
			
		||||
    test "will update the TTL" do
 | 
			
		||||
      endpoint = "example.com:4000"
 | 
			
		||||
      socket = SocketInfo.build("ws://#{endpoint}")
 | 
			
		||||
      Process.sleep(2)
 | 
			
		||||
      touched_socket = SocketInfo.touch(socket)
 | 
			
		||||
 | 
			
		||||
      assert socket.connected_until < touched_socket.connected_until
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe "expired?" do
 | 
			
		||||
    setup do
 | 
			
		||||
      start_supervised(
 | 
			
		||||
        {Pleroma.Web.FedSockets.Supervisor,
 | 
			
		||||
         [
 | 
			
		||||
           ping_interval: 8,
 | 
			
		||||
           connection_duration: 5,
 | 
			
		||||
           rejection_duration: 5,
 | 
			
		||||
           fed_socket_rejections: [lazy: true]
 | 
			
		||||
         ]}
 | 
			
		||||
      )
 | 
			
		||||
 | 
			
		||||
      :ok
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "tests if the TTL is exceeded" do
 | 
			
		||||
      endpoint = "example.com:4000"
 | 
			
		||||
      socket = SocketInfo.build("ws://#{endpoint}")
 | 
			
		||||
      refute SocketInfo.expired?(socket)
 | 
			
		||||
      Process.sleep(10)
 | 
			
		||||
 | 
			
		||||
      assert SocketInfo.expired?(socket)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe "creating outgoing connection records" do
 | 
			
		||||
    test "can be passed a string" do
 | 
			
		||||
      assert %{conn_pid: :pid, origin: _origin} = SocketInfo.build("example.com:4000", :pid)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "can be passed a URI" do
 | 
			
		||||
      uri = URI.parse("http://example.com:4000")
 | 
			
		||||
      assert %{conn_pid: :pid, origin: origin} = SocketInfo.build(uri, :pid)
 | 
			
		||||
      assert origin =~ "example.com:4000"
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "will include the port number" do
 | 
			
		||||
      assert %{conn_pid: :pid, origin: origin} = SocketInfo.build("http://example.com:4000", :pid)
 | 
			
		||||
 | 
			
		||||
      assert origin =~ ":4000"
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "will provide the port if missing" do
 | 
			
		||||
      assert %{conn_pid: :pid, origin: "example.com:80"} =
 | 
			
		||||
               SocketInfo.build("http://example.com", :pid)
 | 
			
		||||
 | 
			
		||||
      assert %{conn_pid: :pid, origin: "example.com:443"} =
 | 
			
		||||
               SocketInfo.build("https://example.com", :pid)
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
 | 
			
		||||
  describe "creating incoming connection records" do
 | 
			
		||||
    test "can be passed a string" do
 | 
			
		||||
      assert %{pid: _, origin: _origin} = SocketInfo.build("example.com:4000")
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "can be passed a URI" do
 | 
			
		||||
      uri = URI.parse("example.com:4000")
 | 
			
		||||
      assert %{pid: _, origin: _origin} = SocketInfo.build(uri)
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "will include the port number" do
 | 
			
		||||
      assert %{pid: _, origin: origin} = SocketInfo.build("http://example.com:4000")
 | 
			
		||||
 | 
			
		||||
      assert origin =~ ":4000"
 | 
			
		||||
    end
 | 
			
		||||
 | 
			
		||||
    test "will provide the port if missing" do
 | 
			
		||||
      assert %{pid: _, origin: "example.com:80"} = SocketInfo.build("http://example.com")
 | 
			
		||||
      assert %{pid: _, origin: "example.com:443"} = SocketInfo.build("https://example.com")
 | 
			
		||||
    end
 | 
			
		||||
  end
 | 
			
		||||
end
 | 
			
		||||
		Loading…
	
		Reference in a new issue