151 lines
		
	
	
	
		
			3.9 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
			
		
		
	
	
			151 lines
		
	
	
	
		
			3.9 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
| # Pleroma: A lightweight social networking server
 | |
| # Copyright © 2017-2020 Pleroma Authors <https://pleroma.social/>
 | |
| # SPDX-License-Identifier: AGPL-3.0-only
 | |
| 
 | |
| defmodule Pleroma.Web.FedSockets.FetchRegistry do
 | |
|   @moduledoc """
 | |
|   The FetchRegistry acts as a broker for fetch requests and return values.
 | |
|   This allows calling processes to block while waiting for a reply.
 | |
|   It doesn't impose it's own process instead using `Cachex` to handle fetches in process, allowing
 | |
|   multi threaded processes to avoid bottlenecking.
 | |
| 
 | |
|   Normally outside modules will have no need to call or use the FetchRegistry themselves.
 | |
| 
 | |
|   The `Cachex` parameters can be controlled from the config. Since exact timeout intervals
 | |
|   aren't necessary the following settings are used by default:
 | |
| 
 | |
|   config :pleroma, :fed_sockets,
 | |
|     fed_socket_fetches: [
 | |
|       default: 12_000,
 | |
|       interval: 3_000,
 | |
|       lazy: false
 | |
|     ]
 | |
| 
 | |
|   """
 | |
| 
 | |
|   defmodule FetchRegistryData do
 | |
|     defstruct uuid: nil,
 | |
|               sent_json: nil,
 | |
|               received_json: nil,
 | |
|               sent_at: nil,
 | |
|               received_at: nil
 | |
|   end
 | |
| 
 | |
|   alias Ecto.UUID
 | |
| 
 | |
|   require Logger
 | |
| 
 | |
|   @fetches :fed_socket_fetches
 | |
| 
 | |
|   @doc """
 | |
|   Registers a json request wth the FetchRegistry and returns the identifying UUID.
 | |
|   """
 | |
|   def register_fetch(json) do
 | |
|     %FetchRegistryData{uuid: uuid} =
 | |
|       json
 | |
|       |> new_registry_data
 | |
|       |> save_registry_data
 | |
| 
 | |
|     uuid
 | |
|   end
 | |
| 
 | |
|   @doc """
 | |
|   Reports on the status of a Fetch given the identifying UUID.
 | |
| 
 | |
|   Will return
 | |
|     * {:ok, fetched_object} if a fetch has completed
 | |
|     * {:error, :waiting} if a fetch is still pending
 | |
|     * {:error, other_error} usually :missing to indicate a fetch that has timed out
 | |
|   """
 | |
|   def check_fetch(uuid) do
 | |
|     case get_registry_data(uuid) do
 | |
|       {:ok, %FetchRegistryData{received_at: nil}} ->
 | |
|         {:error, :waiting}
 | |
| 
 | |
|       {:ok, %FetchRegistryData{} = reg_data} ->
 | |
|         {:ok, reg_data}
 | |
| 
 | |
|       e ->
 | |
|         e
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   @doc """
 | |
|   Retrieves the response to a fetch given the identifying UUID.
 | |
|   The completed fetch will be deleted from the FetchRegistry
 | |
| 
 | |
|   Will return
 | |
|     * {:ok, fetched_object} if a fetch has completed
 | |
|     * {:error, :waiting} if a fetch is still pending
 | |
|     * {:error, other_error} usually :missing to indicate a fetch that has timed out
 | |
|   """
 | |
|   def pop_fetch(uuid) do
 | |
|     case check_fetch(uuid) do
 | |
|       {:ok, %FetchRegistryData{received_json: received_json}} ->
 | |
|         delete_registry_data(uuid)
 | |
|         {:ok, received_json}
 | |
| 
 | |
|       e ->
 | |
|         e
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   @doc """
 | |
|   This is called to register a fetch has returned.
 | |
|   It expects the result data along with the UUID that was sent in the request
 | |
| 
 | |
|   Will return the fetched object or :error
 | |
|   """
 | |
|   def register_fetch_received(uuid, data) do
 | |
|     case get_registry_data(uuid) do
 | |
|       {:ok, %FetchRegistryData{received_at: nil} = reg_data} ->
 | |
|         reg_data
 | |
|         |> set_fetch_received(data)
 | |
|         |> save_registry_data()
 | |
| 
 | |
|       {:ok, %FetchRegistryData{} = reg_data} ->
 | |
|         Logger.warn("tried to add fetched data twice - #{uuid}")
 | |
|         reg_data
 | |
| 
 | |
|       {:error, _} ->
 | |
|         Logger.warn("Error adding fetch to registry - #{uuid}")
 | |
|         :error
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   defp new_registry_data(json) do
 | |
|     %FetchRegistryData{
 | |
|       uuid: UUID.generate(),
 | |
|       sent_json: json,
 | |
|       sent_at: :erlang.monotonic_time(:millisecond)
 | |
|     }
 | |
|   end
 | |
| 
 | |
|   defp get_registry_data(origin) do
 | |
|     case Cachex.get(@fetches, origin) do
 | |
|       {:ok, nil} ->
 | |
|         {:error, :missing}
 | |
| 
 | |
|       {:ok, reg_data} ->
 | |
|         {:ok, reg_data}
 | |
| 
 | |
|       _ ->
 | |
|         {:error, :cache_error}
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   defp set_fetch_received(%FetchRegistryData{} = reg_data, data),
 | |
|     do: %FetchRegistryData{
 | |
|       reg_data
 | |
|       | received_at: :erlang.monotonic_time(:millisecond),
 | |
|         received_json: data
 | |
|     }
 | |
| 
 | |
|   defp save_registry_data(%FetchRegistryData{uuid: uuid} = reg_data) do
 | |
|     {:ok, true} = Cachex.put(@fetches, uuid, reg_data)
 | |
|     reg_data
 | |
|   end
 | |
| 
 | |
|   defp delete_registry_data(origin),
 | |
|     do: {:ok, true} = Cachex.del(@fetches, origin)
 | |
| end
 |