 45be1fe00e
			
		
	
	
		45be1fe00e
		
	
	
	
	
		
			
			When gun shuts down due to the host being unreachable, the worker process shuts down with the same shutdown reason since they are linked. Gun doesn't have error tuples in it's shutdown reason though, so we need to handle it in get_conn. Closes #2008
		
			
				
	
	
		
			82 lines
		
	
	
	
		
			2.4 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
			
		
		
	
	
			82 lines
		
	
	
	
		
			2.4 KiB
		
	
	
	
		
			Elixir
		
	
	
	
	
	
| defmodule Pleroma.Gun.ConnectionPool do
 | |
|   @registry __MODULE__
 | |
| 
 | |
|   alias Pleroma.Gun.ConnectionPool.WorkerSupervisor
 | |
| 
 | |
|   def children do
 | |
|     [
 | |
|       {Registry, keys: :unique, name: @registry},
 | |
|       Pleroma.Gun.ConnectionPool.WorkerSupervisor
 | |
|     ]
 | |
|   end
 | |
| 
 | |
|   @spec get_conn(URI.t(), keyword()) :: {:ok, pid()} | {:error, term()}
 | |
|   def get_conn(uri, opts) do
 | |
|     key = "#{uri.scheme}:#{uri.host}:#{uri.port}"
 | |
| 
 | |
|     case Registry.lookup(@registry, key) do
 | |
|       # The key has already been registered, but connection is not up yet
 | |
|       [{worker_pid, nil}] ->
 | |
|         get_gun_pid_from_worker(worker_pid, true)
 | |
| 
 | |
|       [{worker_pid, {gun_pid, _used_by, _crf, _last_reference}}] ->
 | |
|         GenServer.call(worker_pid, :add_client)
 | |
|         {:ok, gun_pid}
 | |
| 
 | |
|       [] ->
 | |
|         # :gun.set_owner fails in :connected state for whatevever reason,
 | |
|         # so we open the connection in the process directly and send it's pid back
 | |
|         # We trust gun to handle timeouts by itself
 | |
|         case WorkerSupervisor.start_worker([key, uri, opts, self()]) do
 | |
|           {:ok, worker_pid} ->
 | |
|             get_gun_pid_from_worker(worker_pid, false)
 | |
| 
 | |
|           {:error, {:already_started, worker_pid}} ->
 | |
|             get_gun_pid_from_worker(worker_pid, true)
 | |
| 
 | |
|           err ->
 | |
|             err
 | |
|         end
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   defp get_gun_pid_from_worker(worker_pid, register) do
 | |
|     # GenServer.call will block the process for timeout length if
 | |
|     # the server crashes on startup (which will happen if gun fails to connect)
 | |
|     # so instead we use cast + monitor
 | |
| 
 | |
|     ref = Process.monitor(worker_pid)
 | |
|     if register, do: GenServer.cast(worker_pid, {:add_client, self()})
 | |
| 
 | |
|     receive do
 | |
|       {:conn_pid, pid} ->
 | |
|         Process.demonitor(ref)
 | |
|         {:ok, pid}
 | |
| 
 | |
|       {:DOWN, ^ref, :process, ^worker_pid, reason} ->
 | |
|         case reason do
 | |
|           {:shutdown, {:error, _} = error} -> error
 | |
|           {:shutdown, error} -> {:error, error}
 | |
|           _ -> {:error, reason}
 | |
|         end
 | |
|     end
 | |
|   end
 | |
| 
 | |
|   @spec release_conn(pid()) :: :ok
 | |
|   def release_conn(conn_pid) do
 | |
|     # :ets.fun2ms(fn {_, {worker_pid, {gun_pid, _, _, _}}} when gun_pid == conn_pid ->
 | |
|     #    worker_pid end)
 | |
|     query_result =
 | |
|       Registry.select(@registry, [
 | |
|         {{:_, :"$1", {:"$2", :_, :_, :_}}, [{:==, :"$2", conn_pid}], [:"$1"]}
 | |
|       ])
 | |
| 
 | |
|     case query_result do
 | |
|       [worker_pid] ->
 | |
|         GenServer.call(worker_pid, :remove_client)
 | |
| 
 | |
|       [] ->
 | |
|         :ok
 | |
|     end
 | |
|   end
 | |
| end
 |