Add tests for SigninKey module

This commit is contained in:
Oneric 2025-01-11 21:25:00 +01:00
parent 898b98e5dd
commit 8a0d130976

View file

@ -0,0 +1,305 @@
# Akkoma: Magically expressive social media
# Copyright © 2024 Akkoma Authors <https://akkoma.dev/>
# SPDX-License-Identifier: AGPL-3.0-only
defmodule Pleroma.User.SigningKeyTests do
alias Pleroma.User
alias Pleroma.User.SigningKey
alias Pleroma.Repo
use Pleroma.DataCase
use Oban.Testing, repo: Pleroma.Repo
import Pleroma.Factory
defp maybe_put(map, _, nil), do: map
defp maybe_put(map, key, val), do: Kernel.put_in(map, key, val)
defp get_body_actor(key_id \\ nil, user_id \\ nil, owner_id \\ nil) do
owner_id = owner_id || user_id
File.read!("test/fixtures/tesla_mock/admin@mastdon.example.org.json")
|> Jason.decode!()
|> maybe_put(["id"], user_id)
|> maybe_put(["publicKey", "id"], key_id)
|> maybe_put(["publicKey", "owner"], owner_id)
|> Jason.encode!()
end
defp get_body_rawkey(key_id, owner, pem \\ "RSA begin buplic key") do
%{
"type" => "CryptographicKey",
"id" => key_id,
"owner" => owner,
"publicKeyPem" => pem
}
|> Jason.encode!()
end
defmacro mock_tesla(
url,
get_body,
status \\ 200,
headers \\ []
) do
quote do
Tesla.Mock.mock(fn
%{method: :get, url: unquote(url)} ->
%Tesla.Env{
status: unquote(status),
body: unquote(get_body),
url: unquote(url),
headers: [
{"content-type",
"application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\""}
| unquote(headers)
]
}
end)
end
end
describe "succesfully" do
test "inserts key and new user on fetch" do
ap_id_actor = "https://mastodon.example.org/signing-key-test/actor"
ap_id_key = ap_id_actor <> "#main-key"
ap_doc = get_body_actor(ap_id_key, ap_id_actor)
mock_tesla(ap_id_actor, ap_doc)
{:ok, %SigningKey{} = key} = SigningKey.fetch_remote_key(ap_id_key)
user = User.get_by_id(key.user_id)
assert match?(%User{}, user)
user = SigningKey.load_key(user)
assert user.ap_id == ap_id_actor
assert user.signing_key.key_id == ap_id_key
assert user.signing_key.key_id == key.key_id
assert user.signing_key.private_key == nil
end
test "updates existing key" do
user =
insert(:user, local: false, domain: "mastodon.example.org")
|> with_signing_key()
ap_id_actor = user.ap_id
ap_doc = get_body_actor(user.signing_key.key_id, ap_id_actor)
mock_tesla(ap_id_actor, ap_doc)
old_pem = user.signing_key.public_key
old_priv = user.signing_key.private_key
# note: the returned value does not fully match the value stored in the database
# since inserted_at isn't changed on upserts
{:ok, %SigningKey{} = key} = SigningKey.fetch_remote_key(user.signing_key.key_id)
refreshed_key = Repo.get_by(SigningKey, key_id: key.key_id)
assert match?(%SigningKey{}, refreshed_key)
refute refreshed_key.public_key == old_pem
assert refreshed_key.private_key == old_priv
assert refreshed_key.user_id == user.id
assert key.public_key == refreshed_key.public_key
end
test "finds known key by key_id" do
sk = insert(:signing_key, key_id: "https://remote.example/signing-key-test/some-kown-key")
{:ok, key} = SigningKey.get_or_fetch_by_key_id(sk.key_id)
assert sk == key
end
test "finds key for remote user" do
user_with_preload =
insert(:user, local: false)
|> with_signing_key()
user = User.get_by_id(user_with_preload.id)
assert !match?(%SigningKey{}, user.signing_key)
user = SigningKey.load_key(user)
assert match?(%SigningKey{}, user.signing_key)
# the initial "with_signing_key" doesn't set timestamps, and meta differs (loaded vs built)
# thus clear affected fields before comparison
found_sk = %{user.signing_key | inserted_at: nil, updated_at: nil, __meta__: nil}
ref_sk = %{user_with_preload.signing_key | __meta__: nil}
assert found_sk == ref_sk
end
test "finds remote user id by key id" do
user =
insert(:user, local: false)
|> with_signing_key()
uid = SigningKey.key_id_to_user_id(user.signing_key.key_id)
assert uid == user.id
end
test "finds remote user ap id by key id" do
user =
insert(:user, local: false)
|> with_signing_key()
uapid = SigningKey.key_id_to_ap_id(user.signing_key.key_id)
assert uapid == user.ap_id
end
end
test "won't fetch keys for local users" do
user =
insert(:user, local: true)
|> with_signing_key()
{:error, _} = SigningKey.fetch_remote_key(user.signing_key.key_id)
end
test "fails insert with overlapping key owner" do
user =
insert(:user, local: false)
|> with_signing_key()
second_key_id =
user.signing_key.key_id
|> URI.parse()
|> Map.put(:fragment, nil)
|> Map.put(:query, nil)
|> URI.to_string()
|> then(fn id -> id <> "/second_key" end)
ap_doc = get_body_rawkey(second_key_id, user.ap_id)
mock_tesla(second_key_id, ap_doc)
res = SigningKey.fetch_remote_key(second_key_id)
assert match?({:error, %{errors: _}}, res)
{:error, cs} = res
assert Keyword.has_key?(cs.errors, :user_id)
end
test "Fetched raw SigningKeys cannot take over arbitrary users" do
# in theory cross-domain key and actor are fine, IF and ONLY IF
# the actor also links back to this key, but this isnt supported atm anyway
user =
insert(:user, local: false)
|> with_signing_key()
remote_key_id = "https://remote.example/keys/for_local"
keydoc = get_body_rawkey(remote_key_id, user.ap_id)
mock_tesla(remote_key_id, keydoc)
{:error, _} = SigningKey.fetch_remote_key(remote_key_id)
refreshed_org_key = Repo.get_by(SigningKey, key_id: user.signing_key.key_id)
refreshed_user_key = Repo.get_by(SigningKey, user_id: user.id)
assert match?(%SigningKey{}, refreshed_org_key)
assert match?(%SigningKey{}, refreshed_user_key)
actor_host = URI.parse(user.ap_id).host
org_key_host = URI.parse(refreshed_org_key.key_id).host
usr_key_host = URI.parse(refreshed_user_key.key_id).host
assert actor_host == org_key_host
assert actor_host == usr_key_host
refute usr_key_host == "remote.example"
assert refreshed_user_key == refreshed_org_key
assert user.signing_key.key_id == refreshed_org_key.key_id
end
test "Fetched non-raw SigningKey cannot take over arbitrary users" do
# this actually comes free with our fetch ID checks, but lets verify it here too for good measure
user =
insert(:user, local: false)
|> with_signing_key()
remote_key_id = "https://remote.example/keys#for_local"
keydoc = get_body_actor(remote_key_id, user.ap_id, user.ap_id)
mock_tesla(remote_key_id, keydoc)
{:error, _} = SigningKey.fetch_remote_key(remote_key_id)
refreshed_org_key = Repo.get_by(SigningKey, key_id: user.signing_key.key_id)
refreshed_user_key = Repo.get_by(SigningKey, user_id: user.id)
assert match?(%SigningKey{}, refreshed_org_key)
assert match?(%SigningKey{}, refreshed_user_key)
actor_host = URI.parse(user.ap_id).host
org_key_host = URI.parse(refreshed_org_key.key_id).host
usr_key_host = URI.parse(refreshed_user_key.key_id).host
assert actor_host == org_key_host
assert actor_host == usr_key_host
refute usr_key_host == "remote.example"
assert refreshed_user_key == refreshed_org_key
assert user.signing_key.key_id == refreshed_org_key.key_id
end
test "remote users sharing signing key ID don't break our database" do
# in principle a valid setup using this can be cosntructed,
# but so far not observed in practice and our db scheme cannot handle it.
# Thus make sure it doesn't break our db anything but gets rejected
key_id = "https://mastodon.example.org/the_one_key"
user1 =
insert(:user, local: false, domain: "mastodon.example.org")
|> with_signing_key(%{key_id: key_id})
key_owner = "https://mastodon.example.org/#"
user2_ap_id = user1.ap_id <> "22"
user2_doc = get_body_actor(user1.signing_key.key_id, user2_ap_id, key_owner)
user3_ap_id = user1.ap_id <> "333"
user3_doc = get_body_actor(user1.signing_key.key_id, user2_ap_id)
standalone_key_doc =
get_body_rawkey(key_id, "https://mastodon.example.org/#", user1.signing_key.public_key)
ap_headers = [
{"content-type", "application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\""}
]
Tesla.Mock.mock(fn
%{method: :get, url: ^key_id} ->
%Tesla.Env{
status: 200,
body: standalone_key_doc,
url: key_id,
headers: ap_headers
}
%{method: :get, url: ^user2_ap_id} ->
%Tesla.Env{
status: 200,
body: user2_doc,
url: user2_ap_id,
headers: ap_headers
}
%{method: :get, url: ^user3_ap_id} ->
%Tesla.Env{
status: 200,
body: user3_doc,
url: user3_ap_id,
headers: ap_headers
}
end)
{:error, _} = SigningKey.fetch_remote_key(key_id)
{:ok, user2} = User.get_or_fetch_by_ap_id(user2_ap_id)
{:ok, user3} = User.get_or_fetch_by_ap_id(user3_ap_id)
{:ok, db_key} = SigningKey.get_or_fetch_by_key_id(key_id)
keys =
from(s in SigningKey, where: s.key_id == ^key_id)
|> Repo.all()
assert match?([%SigningKey{}], keys)
assert [db_key] == keys
assert db_key.user_id == user1.id
assert match?({:ok, _}, SigningKey.public_key(user1))
assert {:error, "key not found"} == SigningKey.public_key(user2)
assert {:error, "key not found"} == SigningKey.public_key(user3)
end
end