Merge remote-tracking branch 'upstream/main'

This commit is contained in:
Maxime Augier 2023-07-11 12:42:53 +02:00
commit 1414cc518c
54 changed files with 1957 additions and 1401 deletions

2
.env
View File

@ -9,3 +9,5 @@ FOOTER_BLURB="Opéré par <a href=\"https://mastodon.xolus.net/@max\">@max</a>"
LOCAL_DOMAINS="xolus.net" LOCAL_DOMAINS="xolus.net"
LOCAL_BLURB="<p>Relais ActivityPub francophone</p>" LOCAL_BLURB="<p>Relais ActivityPub francophone</p>"
# OPENTELEMETRY_URL=http://localhost:4317 # OPENTELEMETRY_URL=http://localhost:4317
PROMETHEUS_ADDR=127.0.0.1
PROMETHEUS_PORT=9000

3
.gitignore vendored
View File

@ -1,3 +1,6 @@
/target /target
/artifacts /artifacts
/sled /sled
/.direnv
/.envrc
/result

1633
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,7 @@
[package] [package]
name = "ap-relay" name = "ap-relay"
description = "A simple activitypub relay" description = "A simple activitypub relay"
version = "0.3.66" version = "0.3.85"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0" license = "AGPL-3.0"
readme = "README.md" readme = "README.md"
@ -29,49 +29,51 @@ actix-web = { version = "4.0.1", default-features = false, features = [
"compress-gzip", "compress-gzip",
] } ] }
actix-webfinger = "0.4.0" actix-webfinger = "0.4.0"
activitystreams = "0.7.0-alpha.19" activitystreams = "0.7.0-alpha.21"
activitystreams-ext = "0.1.0-alpha.2" activitystreams-ext = "0.1.0-alpha.3"
ammonia = "3.1.0" ammonia = "3.1.0"
awc = { version = "3.0.0", default-features = false, features = ["rustls"] } awc = { version = "3.0.0", default-features = false, features = ["rustls"] }
bcrypt = "0.13" bcrypt = "0.14"
base64 = "0.13" base64 = "0.21"
clap = { version = "4.0.0", features = ["derive"] } clap = { version = "4.0.0", features = ["derive"] }
config = "0.13.0" config = "0.13.0"
console-subscriber = { version = "0.1", optional = true } console-subscriber = { version = "0.1", optional = true }
dashmap = "5.1.0" dashmap = "5.1.0"
dotenv = "0.15.0" dotenv = "0.15.0"
futures-util = "0.3.17" futures-util = "0.3.17"
lru = "0.8.0" lru = "0.10.0"
metrics = "0.20.1" metrics = "0.21.0"
metrics-util = "0.14.0" metrics-exporter-prometheus = { version = "0.12.0", default-features = false, features = [
"http-listener",
] }
metrics-util = "0.15.0"
mime = "0.3.16" mime = "0.3.16"
minify-html = "0.10.0" minify-html = "0.11.0"
opentelemetry = { version = "0.18", features = ["rt-tokio"] } opentelemetry = { version = "0.19", features = ["rt-tokio"] }
opentelemetry-otlp = "0.11" opentelemetry-otlp = "0.12"
pin-project-lite = "0.2.9" pin-project-lite = "0.2.9"
quanta = "0.10.1" quanta = "0.11.0"
rand = "0.8" rand = "0.8"
rsa = "0.7" rsa = { version = "0.9", features = ["sha2"] }
rsa-magic-public-key = "0.6.0" rsa-magic-public-key = "0.8.0"
rustls = "0.20.7" rustls = "0.20.7"
rustls-pemfile = "1.0.1" rustls-pemfile = "1.0.1"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sha2 = { version = "0.10", features = ["oid"] }
signature = "1.6.4"
sled = "0.34.7" sled = "0.34.7"
teloxide = { version = "0.11.1", default-features = false, features = [ teloxide = { version = "0.12.0", default-features = false, features = [
"ctrlc_handler", "ctrlc_handler",
"macros", "macros",
"rustls", "rustls",
] } ] }
thiserror = "1.0" thiserror = "1.0"
time = { version = "0.3.17", features = ["serde"] }
tracing = "0.1" tracing = "0.1"
tracing-awc = "0.1.6" tracing-awc = "0.1.7"
tracing-error = "0.2" tracing-error = "0.2"
tracing-futures = "0.2" tracing-futures = "0.2"
tracing-log = "0.1" tracing-log = "0.1"
tracing-opentelemetry = "0.18" tracing-opentelemetry = "0.19"
tracing-subscriber = { version = "0.3", features = [ tracing-subscriber = { version = "0.3", features = [
"ansi", "ansi",
"env-filter", "env-filter",
@ -81,23 +83,23 @@ tokio = { version = "1", features = ["macros", "sync"] }
uuid = { version = "1", features = ["v4", "serde"] } uuid = { version = "1", features = ["v4", "serde"] }
[dependencies.background-jobs] [dependencies.background-jobs]
version = "0.14.0" version = "0.15.0"
default-features = false default-features = false
features = ["background-jobs-actix", "error-logging"] features = ["background-jobs-actix", "error-logging"]
[dependencies.http-signature-normalization-actix] [dependencies.http-signature-normalization-actix]
version = "0.6.0" version = "0.8.0"
default-features = false default-features = false
features = ["client", "server", "sha-2"] features = ["client", "server", "sha-2"]
[dependencies.tracing-actix-web] [dependencies.tracing-actix-web]
version = "0.6.1" version = "0.7.5"
[build-dependencies] [build-dependencies]
anyhow = "1.0" anyhow = "1.0"
dotenv = "0.15.0" dotenv = "0.15.0"
ructe = { version = "0.15.0", features = ["sass", "mime03"] } ructe = { version = "0.16.0", features = ["sass", "mime03"] }
toml = "0.5.8" toml = "0.7.0"
[profile.dev.package.rsa] [profile.dev.package.rsa]
opt-level = 3 opt-level = 3

View File

@ -6,11 +6,11 @@ _A simple and efficient activitypub relay_
If running docker, you can start the relay with the following command: If running docker, you can start the relay with the following command:
``` ```
$ sudo docker run --rm -it \ $ sudo docker run --rm -it \
-v "./:/mnt/" \ -v "$(pwd):/mnt/" \
-e ADDR=0.0.0.0 \ -e ADDR=0.0.0.0 \
-e SLED_PATH=/mnt/sled/db-0.34 \ -e SLED_PATH=/mnt/sled/db-0.34 \
-p 8080:8080 \ -p 8080:8080 \
asonix/relay:0.3.52 asonix/relay:0.3.85
``` ```
This will launch the relay with the database stored in "./sled/db-0.34" and listening on port 8080 This will launch the relay with the database stored in "./sled/db-0.34" and listening on port 8080
#### Cargo #### Cargo
@ -103,6 +103,9 @@ TLS_CERT=/path/to/cert
FOOTER_BLURB="Contact <a href=\"https://masto.asonix.dog/@asonix\">@asonix</a> for inquiries" FOOTER_BLURB="Contact <a href=\"https://masto.asonix.dog/@asonix\">@asonix</a> for inquiries"
LOCAL_DOMAINS=masto.asonix.dog LOCAL_DOMAINS=masto.asonix.dog
LOCAL_BLURB="<p>Welcome to my cool relay where I have cool relay things happening. I hope you enjoy your stay!</p>" LOCAL_BLURB="<p>Welcome to my cool relay where I have cool relay things happening. I hope you enjoy your stay!</p>"
PROMETHEUS_ADDR=0.0.0.0
PROMETHEUS_PORT=9000
CLIENT_POOL_SIZE=20
``` ```
#### Descriptions #### Descriptions
@ -128,6 +131,8 @@ Where to store the on-disk database of connected servers. This defaults to `./sl
The log level to print. Available levels are `ERROR`, `WARN`, `INFO`, `DEBUG`, and `TRACE`. You can also specify module paths to enable some logs but not others, such as `RUST_LOG=warn,tracing_actix_web=info,relay=info`. This defaults to `warn` The log level to print. Available levels are `ERROR`, `WARN`, `INFO`, `DEBUG`, and `TRACE`. You can also specify module paths to enable some logs but not others, such as `RUST_LOG=warn,tracing_actix_web=info,relay=info`. This defaults to `warn`
##### `SOURCE_REPO` ##### `SOURCE_REPO`
The URL to the source code for the relay. This defaults to `https://git.asonix.dog/asonix/relay`, but should be changed if you're running a fork hosted elsewhere. The URL to the source code for the relay. This defaults to `https://git.asonix.dog/asonix/relay`, but should be changed if you're running a fork hosted elsewhere.
##### `REPOSITORY_COMMIT_BASE`
The base path of the repository commit hash reference. For example, `/src/commit/` for Gitea, `/tree/` for GitLab.
##### `API_TOKEN` ##### `API_TOKEN`
The Secret token used to access the admin APIs. This must be set for the commandline to function The Secret token used to access the admin APIs. This must be set for the commandline to function
##### `OPENTELEMETRY_URL` ##### `OPENTELEMETRY_URL`
@ -146,6 +151,15 @@ Optional - Add custom notes in the footer of the page
Optional - domains of mastodon servers run by the same admin as the relay Optional - domains of mastodon servers run by the same admin as the relay
##### `LOCAL_BLURB` ##### `LOCAL_BLURB`
Optional - description for the relay Optional - description for the relay
##### `PROMETHEUS_ADDR`
Optional - Address to bind to for serving the prometheus scrape endpoint
##### `PROMETHEUS_PORT`
Optional - Port to bind to for serving the prometheus scrape endpoint
##### `CLIENT_POOL_SIZE`
Optional - How many connections the relay should maintain per thread. This value will be multiplied
by the number of cores available to the relay. This defaults to 20, so a 4-core machine will have a
maximum of 160 simultaneous outbound connections. If you run into problems related to "Too many open
files", you can either decrease this number or increase the ulimit for your system.
### Subscribing ### Subscribing
Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings. Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings.
@ -165,10 +179,16 @@ example, if the server is `https://relay.my.tld`, the correct URL would be
- Follow Public, become a listener of the relay - Follow Public, become a listener of the relay
- Undo Follow {self-actor}, stop listening on the relay, an Undo Follow will be sent back - Undo Follow {self-actor}, stop listening on the relay, an Undo Follow will be sent back
- Undo Follow Public, stop listening on the relay - Undo Follow Public, stop listening on the relay
- Delete {anything}, the Delete {anything} is relayed verbatim to listening servers - Delete {anything}, the Delete {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature signed with a JSON-LD signature
- Update {anything}, the Update {anything} is relayed verbatim to listening servers - Update {anything}, the Update {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature
- Add {anything}, the Add {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature
- Remove {anything}, the Remove {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature signed with a JSON-LD signature
@ -176,6 +196,9 @@ example, if the server is `https://relay.my.tld`, the correct URL would be
- Webfinger - Webfinger
- NodeInfo - NodeInfo
### Known issues
Pleroma and Akkoma do not support validating JSON-LD signatures, meaning many activities such as Delete, Update, Add, and Remove will be rejected with a message similar to `WARN: Response from https://example.com/inbox, "Invalid HTTP Signature"`. This is normal and not an issue with the relay.
### Contributing ### Contributing
Feel free to open issues for anything you find an issue with. Please note that any contributed code will be licensed under the AGPLv3. Feel free to open issues for anything you find an issue with. Please note that any contributed code will be licensed under the AGPLv3.

View File

@ -1,41 +0,0 @@
ARG REPO_ARCH=amd64
# cross-build environment
FROM asonix/rust-builder:$REPO_ARCH-latest AS builder
ARG TAG=main
ARG BINARY=relay
ARG PROJECT=relay
ARG GIT_REPOSITORY=https://git.asonix.dog/asonix/$PROJECT
ENV \
BINARY=${BINARY}
ADD \
--chown=build:build \
$GIT_REPOSITORY/archive/$TAG.tar.gz \
/opt/build/repo.tar.gz
RUN \
tar zxf repo.tar.gz
WORKDIR /opt/build/$PROJECT
RUN \
build
# production environment
FROM asonix/rust-runner:$REPO_ARCH-latest
ARG BINARY=relay
ENV \
BINARY=${BINARY}
COPY \
--from=builder \
/opt/build/binary \
/usr/bin/${BINARY}
ENTRYPOINT ["/sbin/tini", "--"]
CMD /usr/bin/${BINARY}

View File

@ -1,37 +0,0 @@
#!/usr/bin/env bash
function require() {
if [ "$1" = "" ]; then
echo "input '$2' required"
print_help
exit 1
fi
}
function print_help() {
echo "deploy.sh"
echo ""
echo "Usage:"
echo " deploy.sh [repo] [tag] [arch]"
echo ""
echo "Args:"
echo " repo: The docker repository to publish the image"
echo " tag: The tag applied to the docker image"
echo " arch: The architecuture of the doker image"
}
REPO=$1
TAG=$2
ARCH=$3
require "$REPO" repo
require "$TAG" tag
require "$ARCH" arch
sudo docker build \
--pull \
--build-arg TAG=$TAG \
--build-arg REPO_ARCH=$ARCH \
-t $REPO:$ARCH-$TAG \
-f Dockerfile \
.

View File

@ -1,87 +0,0 @@
#!/usr/bin/env bash
function require() {
if [ "$1" = "" ]; then
echo "input '$2' required"
print_help
exit 1
fi
}
function print_help() {
echo "deploy.sh"
echo ""
echo "Usage:"
echo " deploy.sh [tag] [branch] [push]"
echo ""
echo "Args:"
echo " tag: The git tag to be applied to the repository and docker build"
echo " branch: The git branch to use for tagging and publishing"
echo " push: Whether or not to push the image"
echo ""
echo "Examples:"
echo " ./deploy.sh v0.3.0-alpha.13 main true"
echo " ./deploy.sh v0.3.0-alpha.13-shell-out asonix/shell-out false"
}
function build_image() {
tag=$1
arch=$2
push=$3
./build-image.sh asonix/relay $tag $arch
sudo docker tag asonix/relay:$arch-$tag asonix/relay:$arch-latest
if [ "$push" == "true" ]; then
sudo docker push asonix/relay:$arch-$tag
sudo docker push asonix/relay:$arch-latest
fi
}
# Creating the new tag
new_tag="$1"
branch="$2"
push=$3
require "$new_tag" "tag"
require "$branch" "branch"
require "$push" "push"
if ! sudo docker run --rm -it arm64v8/alpine:3.11 /bin/sh -c 'echo "docker is configured correctly"'
then
echo "docker is not configured to run on qemu-emulated architectures, fixing will require sudo"
sudo docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
fi
set -xe
git checkout $branch
# Changing the docker-compose prod
sed -i "s/asonix\/relay:.*/asonix\/relay:$new_tag/" docker-compose.yml
git add ../prod/docker-compose.yml
# The commit
git commit -m"Version $new_tag"
git tag $new_tag
# Push
git push origin $new_tag
git push
# Build for arm64v8, arm32v7 and amd64
build_image $new_tag arm64v8 $push
build_image $new_tag arm32v7 $push
build_image $new_tag amd64 $push
# Build for other archs
# TODO
if [ "$push" == "true" ]; then
./manifest.sh relay $new_tag
./manifest.sh relay latest
# pushd ../../
# cargo publish
# popd
fi

View File

@ -2,7 +2,7 @@ version: '3.3'
services: services:
relay: relay:
image: asonix/relay:v0.3.8 image: asonix/relay:0.3.85
ports: ports:
- "8079:8079" - "8079:8079"
restart: always restart: always
@ -14,6 +14,7 @@ services:
- RESTRICTED_MODE=false - RESTRICTED_MODE=false
- VALIDATE_SIGNATURES=true - VALIDATE_SIGNATURES=true
- HTTPS=true - HTTPS=true
- DATABASE_URL=postgres://pg_user:pg_pass@pg_host:pg_port/pg_database - SLED_PATH=/mnt/sled/db-0.34
- PRETTY_LOG=false - PRETTY_LOG=false
- PUBLISH_BLOCKS=true - PUBLISH_BLOCKS=true
- API_TOKEN=somepasswordishtoken

View File

@ -1,43 +0,0 @@
#!/usr/bin/env bash
function require() {
if [ "$1" = "" ]; then
echo "input '$2' required"
print_help
exit 1
fi
}
function print_help() {
echo "deploy.sh"
echo ""
echo "Usage:"
echo " manifest.sh [repo] [tag]"
echo ""
echo "Args:"
echo " repo: The docker repository to update"
echo " tag: The git tag to be applied to the image manifest"
}
REPO=$1
TAG=$2
require "$REPO" "repo"
require "$TAG" "tag"
set -xe
sudo docker manifest create asonix/$REPO:$TAG \
-a asonix/$REPO:arm64v8-$TAG \
-a asonix/$REPO:arm32v7-$TAG \
-a asonix/$REPO:amd64-$TAG
sudo docker manifest annotate asonix/$REPO:$TAG \
asonix/$REPO:arm64v8-$TAG --os linux --arch arm64 --variant v8
sudo docker manifest annotate asonix/$REPO:$TAG \
asonix/$REPO:arm32v7-$TAG --os linux --arch arm --variant v7
sudo docker manifest annotate asonix/$REPO:$TAG \
asonix/$REPO:amd64-$TAG --os linux --arch amd64
sudo docker manifest push asonix/$REPO:$TAG --purge

61
flake.lock Normal file
View File

@ -0,0 +1,61 @@
{
"nodes": {
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1687171271,
"narHash": "sha256-BJlq+ozK2B1sJDQXS3tzJM5a+oVZmi1q0FlBK/Xqv7M=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "abfb11bd1aec8ced1c9bb9adfe68018230f4fb3c",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1687412861,
"narHash": "sha256-Z/g0wbL68C+mSGerYS2quv9FXQ1RRP082cAC0Bh4vcs=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "e603dc5f061ca1d8a19b3ede6a8cf9c9fcba6cdc",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

34
flake.nix Normal file
View File

@ -0,0 +1,34 @@
{
description = "relay";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-utils.url = "github:numtide/flake-utils";
};
outputs = { self, nixpkgs, flake-utils }:
flake-utils.lib.eachDefaultSystem (system:
let
pkgs = import nixpkgs {
inherit system;
};
in
{
packages = rec {
relay = pkgs.callPackage ./relay.nix { };
default = relay;
};
apps = rec {
dev = flake-utils.lib.mkApp { drv = self.packages.${system}.pict-rs-proxy; };
default = dev;
};
devShell = with pkgs; mkShell {
nativeBuildInputs = [ cargo cargo-outdated cargo-zigbuild clippy gcc protobuf rust-analyzer rustc rustfmt ];
RUST_SRC_PATH = "${pkgs.rust.packages.stable.rustPlatform.rustLibSrc}";
};
});
}

25
relay.nix Normal file
View File

@ -0,0 +1,25 @@
{ lib
, nixosTests
, protobuf
, rustPlatform
}:
rustPlatform.buildRustPackage {
pname = "relay";
version = "0.3.85";
src = ./.;
cargoLock.lockFile = ./Cargo.lock;
PROTOC = "${protobuf}/bin/protoc";
PROTOC_INCLUDE = "${protobuf}/include";
nativeBuildInputs = [ ];
passthru.tests = { inherit (nixosTests) relay; };
meta = with lib; {
description = "A simple image hosting service";
homepage = "https://git.asonix.dog/asonix/relay";
license = with licenses; [ agpl3Plus ];
};
}

View File

@ -1,4 +1,6 @@
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use std::collections::{BTreeMap, BTreeSet};
use time::OffsetDateTime;
pub mod client; pub mod client;
pub mod routes; pub mod routes;
@ -22,3 +24,9 @@ pub(crate) struct BlockedDomains {
pub(crate) struct ConnectedActors { pub(crate) struct ConnectedActors {
pub(crate) connected_actors: Vec<IriString>, pub(crate) connected_actors: Vec<IriString>,
} }
#[derive(serde::Deserialize, serde::Serialize)]
pub(crate) struct LastSeen {
pub(crate) last_seen: BTreeMap<OffsetDateTime, BTreeSet<String>>,
pub(crate) never: Vec<String>,
}

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains}, admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains, LastSeen},
collector::Snapshot, collector::Snapshot,
config::{AdminUrlKind, Config}, config::{AdminUrlKind, Config},
error::{Error, ErrorKind}, error::{Error, ErrorKind},
@ -55,6 +55,10 @@ pub(crate) async fn stats(client: &Client, config: &Config) -> Result<Snapshot,
get_results(client, config, AdminUrlKind::Stats).await get_results(client, config, AdminUrlKind::Stats).await
} }
pub(crate) async fn last_seen(client: &Client, config: &Config) -> Result<LastSeen, Error> {
get_results(client, config, AdminUrlKind::LastSeen).await
}
async fn get_results<T: DeserializeOwned>( async fn get_results<T: DeserializeOwned>(
client: &Client, client: &Client,
config: &Config, config: &Config,

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains}, admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains, LastSeen},
collector::{MemoryCollector, Snapshot}, collector::{MemoryCollector, Snapshot},
error::Error, error::Error,
extractors::Admin, extractors::Admin,
@ -8,6 +8,8 @@ use actix_web::{
web::{Data, Json}, web::{Data, Json},
HttpResponse, HttpResponse,
}; };
use std::collections::{BTreeMap, BTreeSet};
use time::OffsetDateTime;
pub(crate) async fn allow( pub(crate) async fn allow(
admin: Admin, admin: Admin,
@ -69,3 +71,20 @@ pub(crate) async fn stats(
) -> Result<Json<Snapshot>, Error> { ) -> Result<Json<Snapshot>, Error> {
Ok(Json(collector.snapshot())) Ok(Json(collector.snapshot()))
} }
pub(crate) async fn last_seen(admin: Admin) -> Result<Json<LastSeen>, Error> {
let nodes = admin.db_ref().last_seen().await?;
let mut last_seen: BTreeMap<OffsetDateTime, BTreeSet<String>> = BTreeMap::new();
let mut never = Vec::new();
for (domain, datetime) in nodes {
if let Some(datetime) = datetime {
last_seen.entry(datetime).or_default().insert(domain);
} else {
never.push(domain);
}
}
Ok(Json(LastSeen { last_seen, never }))
}

View File

@ -34,11 +34,13 @@ pub struct PublicKey {
#[serde(rename_all = "PascalCase")] #[serde(rename_all = "PascalCase")]
pub enum ValidTypes { pub enum ValidTypes {
Accept, Accept,
Add,
Announce, Announce,
Create, Create,
Delete, Delete,
Follow, Follow,
Reject, Reject,
Remove,
Undo, Undo,
Update, Update,
} }

View File

@ -17,11 +17,22 @@ pub(crate) struct Args {
#[arg(short, long, help = "Get statistics from the server")] #[arg(short, long, help = "Get statistics from the server")]
stats: bool, stats: bool,
#[arg(
short,
long,
help = "List domains by when they were last succesfully contacted"
)]
contacted: bool,
} }
impl Args { impl Args {
pub(crate) fn any(&self) -> bool { pub(crate) fn any(&self) -> bool {
!self.blocks.is_empty() || !self.allowed.is_empty() || self.list || self.stats !self.blocks.is_empty()
|| !self.allowed.is_empty()
|| self.list
|| self.stats
|| self.contacted
} }
pub(crate) fn new() -> Self { pub(crate) fn new() -> Self {
@ -47,4 +58,8 @@ impl Args {
pub(crate) fn stats(&self) -> bool { pub(crate) fn stats(&self) -> bool {
self.stats self.stats
} }
pub(crate) fn contacted(&self) -> bool {
self.contacted
}
} }

View File

@ -5,7 +5,8 @@ fn git_info() {
if let Ok(output) = Command::new("git").args(["rev-parse", "HEAD"]).output() { if let Ok(output) = Command::new("git").args(["rev-parse", "HEAD"]).output() {
if output.status.success() { if output.status.success() {
let git_hash = String::from_utf8_lossy(&output.stdout); let git_hash = String::from_utf8_lossy(&output.stdout);
println!("cargo:rustc-env=GIT_HASH={}", git_hash); println!("cargo:rustc-env=GIT_HASH={git_hash}");
println!("cargo:rustc-env=GIT_SHORT_HASH={}", &git_hash[..8])
} }
} }
@ -15,7 +16,7 @@ fn git_info() {
{ {
if output.status.success() { if output.status.success() {
let git_branch = String::from_utf8_lossy(&output.stdout); let git_branch = String::from_utf8_lossy(&output.stdout);
println!("cargo:rustc-env=GIT_BRANCH={}", git_branch); println!("cargo:rustc-env=GIT_BRANCH={git_branch}");
} }
} }
} }
@ -23,7 +24,7 @@ fn git_info() {
fn version_info() -> Result<(), anyhow::Error> { fn version_info() -> Result<(), anyhow::Error> {
let cargo_toml = Path::new(&std::env::var("CARGO_MANIFEST_DIR")?).join("Cargo.toml"); let cargo_toml = Path::new(&std::env::var("CARGO_MANIFEST_DIR")?).join("Cargo.toml");
let mut file = File::open(&cargo_toml)?; let mut file = File::open(cargo_toml)?;
let mut cargo_data = String::new(); let mut cargo_data = String::new();
file.read_to_string(&mut cargo_data)?; file.read_to_string(&mut cargo_data)?;
@ -31,11 +32,11 @@ fn version_info() -> Result<(), anyhow::Error> {
let data: toml::Value = toml::from_str(&cargo_data)?; let data: toml::Value = toml::from_str(&cargo_data)?;
if let Some(version) = data["package"]["version"].as_str() { if let Some(version) = data["package"]["version"].as_str() {
println!("cargo:rustc-env=PKG_VERSION={}", version); println!("cargo:rustc-env=PKG_VERSION={version}");
} }
if let Some(name) = data["package"]["name"].as_str() { if let Some(name) = data["package"]["name"].as_str() {
println!("cargo:rustc-env=PKG_NAME={}", name); println!("cargo:rustc-env=PKG_NAME={name}");
} }
Ok(()) Ok(())

View File

@ -40,11 +40,11 @@ impl std::fmt::Display for Counter {
let labels = self let labels = self
.labels .labels
.iter() .iter()
.map(|(k, v)| format!("{}: {}", k, v)) .map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(", "); .join(", ");
write!(f, "{} - {}", labels, self.value) write!(f, "{labels} - {}", self.value)
} }
} }
@ -59,11 +59,11 @@ impl std::fmt::Display for Gauge {
let labels = self let labels = self
.labels .labels
.iter() .iter()
.map(|(k, v)| format!("{}: {}", k, v)) .map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(", "); .join(", ");
write!(f, "{} - {}", labels, self.value) write!(f, "{labels} - {}", self.value)
} }
} }
@ -78,7 +78,7 @@ impl std::fmt::Display for Histogram {
let labels = self let labels = self
.labels .labels
.iter() .iter()
.map(|(k, v)| format!("{}: {}", k, v)) .map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(", "); .join(", ");
@ -87,15 +87,15 @@ impl std::fmt::Display for Histogram {
.iter() .iter()
.map(|(k, v)| { .map(|(k, v)| {
if let Some(v) = v { if let Some(v) = v {
format!("{}: {:.6}", k, v) format!("{k}: {v:.6}")
} else { } else {
format!("{}: None,", k) format!("{k}: None,")
} }
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(", "); .join(", ");
write!(f, "{} - {}", labels, value) write!(f, "{labels} - {value}")
} }
} }
@ -172,18 +172,18 @@ impl Snapshot {
continue; continue;
} }
println!("\t{}", key); println!("\t{key}");
for counter in counters { for counter in counters {
println!("\t\t{}", counter); println!("\t\t{counter}");
} }
} }
for (key, counters) in merging { for (key, counters) in merging {
println!("\t{}", key); println!("\t{key}");
for (_, counter) in counters { for (_, counter) in counters {
if let Some(counter) = counter.merge() { if let Some(counter) = counter.merge() {
println!("\t\t{}", counter); println!("\t\t{counter}");
} }
} }
} }
@ -192,10 +192,10 @@ impl Snapshot {
if !self.gauges.is_empty() { if !self.gauges.is_empty() {
println!("Gauges"); println!("Gauges");
for (key, gauges) in self.gauges { for (key, gauges) in self.gauges {
println!("\t{}", key); println!("\t{key}");
for gauge in gauges { for gauge in gauges {
println!("\t\t{}", gauge); println!("\t\t{gauge}");
} }
} }
} }
@ -203,10 +203,10 @@ impl Snapshot {
if !self.histograms.is_empty() { if !self.histograms.is_empty() {
println!("Histograms"); println!("Histograms");
for (key, histograms) in self.histograms { for (key, histograms) in self.histograms {
println!("\t{}", key); println!("\t{key}");
for histogram in histograms { for histogram in histograms {
println!("\t\t{}", histogram); println!("\t\t{histogram}");
} }
} }
} }
@ -216,7 +216,6 @@ impl Snapshot {
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) { fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
let labels = key let labels = key
.labels() .labels()
.into_iter()
.map(|label| (label.key().to_string(), label.value().to_string())) .map(|label| (label.key().to_string(), label.value().to_string()))
.collect(); .collect();
let name = key.name().to_string(); let name = key.name().to_string();
@ -348,10 +347,6 @@ impl MemoryCollector {
} }
} }
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
metrics::set_boxed_recorder(Box::new(self.clone()))
}
pub(crate) fn snapshot(&self) -> Snapshot { pub(crate) fn snapshot(&self) -> Snapshot {
self.inner.snapshot() self.inner.snapshot()
} }
@ -364,6 +359,10 @@ impl MemoryCollector {
let mut d = self.inner.descriptions.write().unwrap(); let mut d = self.inner.descriptions.write().unwrap();
d.entry(key.as_str().to_owned()).or_insert(description); d.entry(key.as_str().to_owned()).or_insert(description);
} }
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
metrics::set_boxed_recorder(Box::new(self.clone()))
}
} }
impl Recorder for MemoryCollector { impl Recorder for MemoryCollector {

View File

@ -1,22 +1,24 @@
use crate::{ use crate::{
data::{ActorCache, State},
error::Error, error::Error,
extractors::{AdminConfig, XApiToken}, extractors::{AdminConfig, XApiToken},
middleware::MyVerify,
requests::Requests,
}; };
use activitystreams::{ use activitystreams::{
iri, iri,
iri_string::{ iri_string::{
format::ToDedicatedString,
resolve::FixedBaseResolver, resolve::FixedBaseResolver,
types::{IriAbsoluteString, IriFragmentStr, IriRelativeStr, IriString}, types::{IriAbsoluteString, IriFragmentStr, IriRelativeStr, IriString},
}, },
}; };
use config::Environment; use config::Environment;
use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature}; use http_signature_normalization_actix::prelude::VerifyDigest;
use rsa::sha2::{Digest, Sha256};
use rustls::{Certificate, PrivateKey}; use rustls::{Certificate, PrivateKey};
use sha2::{Digest, Sha256}; use std::{
use std::{io::BufReader, net::IpAddr, path::PathBuf}; io::BufReader,
net::{IpAddr, SocketAddr},
path::PathBuf,
};
use uuid::Uuid; use uuid::Uuid;
#[derive(Clone, Debug, serde::Deserialize)] #[derive(Clone, Debug, serde::Deserialize)]
@ -31,6 +33,7 @@ pub(crate) struct ParsedConfig {
publish_blocks: bool, publish_blocks: bool,
sled_path: PathBuf, sled_path: PathBuf,
source_repo: IriString, source_repo: IriString,
repository_commit_base: String,
opentelemetry_url: Option<IriString>, opentelemetry_url: Option<IriString>,
telegram_token: Option<String>, telegram_token: Option<String>,
telegram_admin_handle: Option<String>, telegram_admin_handle: Option<String>,
@ -40,6 +43,9 @@ pub(crate) struct ParsedConfig {
footer_blurb: Option<String>, footer_blurb: Option<String>,
local_domains: Option<String>, local_domains: Option<String>,
local_blurb: Option<String>, local_blurb: Option<String>,
prometheus_addr: Option<IpAddr>,
prometheus_port: Option<u16>,
client_pool_size: usize,
} }
#[derive(Clone)] #[derive(Clone)]
@ -62,6 +68,8 @@ pub struct Config {
footer_blurb: Option<String>, footer_blurb: Option<String>,
local_domains: Vec<String>, local_domains: Vec<String>,
local_blurb: Option<String>, local_blurb: Option<String>,
prometheus_config: Option<PrometheusConfig>,
client_pool_size: usize,
} }
#[derive(Clone)] #[derive(Clone)]
@ -70,6 +78,12 @@ struct TlsConfig {
cert: PathBuf, cert: PathBuf,
} }
#[derive(Clone, Debug)]
struct PrometheusConfig {
addr: IpAddr,
port: u16,
}
#[derive(Debug)] #[derive(Debug)]
pub enum UrlKind { pub enum UrlKind {
Activity, Activity,
@ -94,6 +108,7 @@ pub enum AdminUrlKind {
Blocked, Blocked,
Connected, Connected,
Stats, Stats,
LastSeen,
} }
impl std::fmt::Debug for Config { impl std::fmt::Debug for Config {
@ -121,6 +136,8 @@ impl std::fmt::Debug for Config {
.field("footer_blurb", &self.footer_blurb) .field("footer_blurb", &self.footer_blurb)
.field("local_domains", &self.local_domains) .field("local_domains", &self.local_domains)
.field("local_blurb", &self.local_blurb) .field("local_blurb", &self.local_blurb)
.field("prometheus_config", &self.prometheus_config)
.field("client_pool_size", &self.client_pool_size)
.finish() .finish()
} }
} }
@ -138,6 +155,7 @@ impl Config {
.set_default("publish_blocks", false)? .set_default("publish_blocks", false)?
.set_default("sled_path", "./sled/db-0-34")? .set_default("sled_path", "./sled/db-0-34")?
.set_default("source_repo", "https://git.asonix.dog/asonix/relay")? .set_default("source_repo", "https://git.asonix.dog/asonix/relay")?
.set_default("repository_commit_base", "/src/commit/")?
.set_default("opentelemetry_url", None as Option<&str>)? .set_default("opentelemetry_url", None as Option<&str>)?
.set_default("telegram_token", None as Option<&str>)? .set_default("telegram_token", None as Option<&str>)?
.set_default("telegram_admin_handle", None as Option<&str>)? .set_default("telegram_admin_handle", None as Option<&str>)?
@ -147,13 +165,16 @@ impl Config {
.set_default("footer_blurb", None as Option<&str>)? .set_default("footer_blurb", None as Option<&str>)?
.set_default("local_domains", None as Option<&str>)? .set_default("local_domains", None as Option<&str>)?
.set_default("local_blurb", None as Option<&str>)? .set_default("local_blurb", None as Option<&str>)?
.set_default("prometheus_addr", None as Option<&str>)?
.set_default("prometheus_port", None as Option<u16>)?
.set_default("client_pool_size", 20u64)?
.add_source(Environment::default()) .add_source(Environment::default())
.build()?; .build()?;
let config: ParsedConfig = config.try_deserialize()?; let config: ParsedConfig = config.try_deserialize()?;
let scheme = if config.https { "https" } else { "http" }; let scheme = if config.https { "https" } else { "http" };
let base_uri = iri!(format!("{}://{}", scheme, config.hostname)).into_absolute(); let base_uri = iri!(format!("{scheme}://{}", config.hostname)).into_absolute();
let tls = match (config.tls_key, config.tls_cert) { let tls = match (config.tls_key, config.tls_cert) {
(Some(key), Some(cert)) => Some(TlsConfig { key, cert }), (Some(key), Some(cert)) => Some(TlsConfig { key, cert }),
@ -175,6 +196,29 @@ impl Config {
.map(|d| d.to_string()) .map(|d| d.to_string())
.collect(); .collect();
let prometheus_config = match (config.prometheus_addr, config.prometheus_port) {
(Some(addr), Some(port)) => Some(PrometheusConfig { addr, port }),
(Some(_), None) => {
tracing::warn!("PROMETHEUS_ADDR is set but PROMETHEUS_PORT is not set, not building Prometheus config");
None
}
(None, Some(_)) => {
tracing::warn!("PROMETHEUS_PORT is set but PROMETHEUS_ADDR is not set, not building Prometheus config");
None
}
(None, None) => None,
};
let source_url = match Self::git_hash() {
Some(hash) => format!(
"{}{}{hash}",
config.source_repo, config.repository_commit_base
)
.parse()
.expect("constructed source URL is valid"),
None => config.source_repo.clone(),
};
Ok(Config { Ok(Config {
hostname: config.hostname, hostname: config.hostname,
addr: config.addr, addr: config.addr,
@ -185,7 +229,7 @@ impl Config {
publish_blocks: config.publish_blocks, publish_blocks: config.publish_blocks,
base_uri, base_uri,
sled_path: config.sled_path, sled_path: config.sled_path,
source_repo: config.source_repo, source_repo: source_url,
opentelemetry_url: config.opentelemetry_url, opentelemetry_url: config.opentelemetry_url,
telegram_token: config.telegram_token, telegram_token: config.telegram_token,
telegram_admin_handle: config.telegram_admin_handle, telegram_admin_handle: config.telegram_admin_handle,
@ -194,9 +238,17 @@ impl Config {
footer_blurb: config.footer_blurb, footer_blurb: config.footer_blurb,
local_domains, local_domains,
local_blurb: config.local_blurb, local_blurb: config.local_blurb,
prometheus_config,
client_pool_size: config.client_pool_size,
}) })
} }
pub(crate) fn prometheus_bind_address(&self) -> Option<SocketAddr> {
let config = self.prometheus_config.as_ref()?;
Some((config.addr, config.port).into())
}
pub(crate) fn open_keys(&self) -> Result<Option<(Vec<Certificate>, PrivateKey)>, Error> { pub(crate) fn open_keys(&self) -> Result<Option<(Vec<Certificate>, PrivateKey)>, Error> {
let tls = if let Some(tls) = &self.tls { let tls = if let Some(tls) = &self.tls {
tls tls
@ -276,19 +328,6 @@ impl Config {
} }
} }
pub(crate) fn signature_middleware(
&self,
requests: Requests,
actors: ActorCache,
state: State,
) -> VerifySignature<MyVerify> {
if self.validate_signatures {
VerifySignature::new(MyVerify(requests, actors, state), Default::default())
} else {
VerifySignature::new(MyVerify(requests, actors, state), Default::default()).optional()
}
}
pub(crate) fn x_api_token(&self) -> Option<XApiToken> { pub(crate) fn x_api_token(&self) -> Option<XApiToken> {
self.api_token.clone().map(XApiToken::new) self.api_token.clone().map(XApiToken::new)
} }
@ -298,7 +337,7 @@ impl Config {
match AdminConfig::build(api_token) { match AdminConfig::build(api_token) {
Ok(conf) => Some(actix_web::web::Data::new(conf)), Ok(conf) => Some(actix_web::web::Data::new(conf)),
Err(e) => { Err(e) => {
tracing::error!("Error creating admin config: {}", e); tracing::error!("Error creating admin config: {e}");
None None
} }
} }
@ -337,7 +376,7 @@ impl Config {
pub(crate) fn software_version() -> String { pub(crate) fn software_version() -> String {
if let Some(git) = Self::git_version() { if let Some(git) = Self::git_version() {
return format!("v{}-{}", Self::version(), git); return format!("v{}-{git}", Self::version());
} }
format!("v{}", Self::version()) format!("v{}", Self::version())
@ -345,9 +384,9 @@ impl Config {
fn git_version() -> Option<String> { fn git_version() -> Option<String> {
let branch = Self::git_branch()?; let branch = Self::git_branch()?;
let hash = Self::git_hash()?; let hash = Self::git_short_hash()?;
Some(format!("{}-{}", branch, hash)) Some(format!("{branch}-{hash}"))
} }
fn name() -> &'static str { fn name() -> &'static str {
@ -366,6 +405,10 @@ impl Config {
option_env!("GIT_HASH") option_env!("GIT_HASH")
} }
fn git_short_hash() -> Option<&'static str> {
option_env!("GIT_SHORT_HASH")
}
pub(crate) fn user_agent(&self) -> String { pub(crate) fn user_agent(&self) -> String {
format!( format!(
"{} ({}/{}; +{})", "{} ({}/{}; +{})",
@ -376,6 +419,10 @@ impl Config {
) )
} }
pub(crate) fn client_pool_size(&self) -> usize {
self.client_pool_size
}
pub(crate) fn source_code(&self) -> &IriString { pub(crate) fn source_code(&self) -> &IriString {
&self.source_repo &self.source_repo
} }
@ -395,37 +442,44 @@ impl Config {
self.do_generate_url(kind).expect("Generated valid IRI") self.do_generate_url(kind).expect("Generated valid IRI")
} }
#[tracing::instrument(level = "debug", skip_all, fields(base_uri = tracing::field::debug(&self.base_uri), kind = tracing::field::debug(&kind)))]
fn do_generate_url(&self, kind: UrlKind) -> Result<IriString, Error> { fn do_generate_url(&self, kind: UrlKind) -> Result<IriString, Error> {
let iri = match kind { let iri = match kind {
UrlKind::Activity => FixedBaseResolver::new(self.base_uri.as_ref()).try_resolve( UrlKind::Activity => FixedBaseResolver::new(self.base_uri.as_ref())
IriRelativeStr::new(&format!("activity/{}", Uuid::new_v4()))?.as_ref(), .resolve(IriRelativeStr::new(&format!("activity/{}", Uuid::new_v4()))?.as_ref())
)?, .try_to_dedicated_string()?,
UrlKind::Actor => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::Actor => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("actor")?.as_ref())?, .resolve(IriRelativeStr::new("actor")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Followers => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::Followers => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("followers")?.as_ref())?, .resolve(IriRelativeStr::new("followers")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Following => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::Following => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("following")?.as_ref())?, .resolve(IriRelativeStr::new("following")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Inbox => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::Inbox => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("inbox")?.as_ref())?, .resolve(IriRelativeStr::new("inbox")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Index => self.base_uri.clone().into(), UrlKind::Index => self.base_uri.clone().into(),
UrlKind::MainKey => { UrlKind::MainKey => {
let actor = IriRelativeStr::new("actor")?; let actor = IriRelativeStr::new("actor")?;
let fragment = IriFragmentStr::new("main-key")?; let fragment = IriFragmentStr::new("main-key")?;
let mut resolved = let mut resolved = FixedBaseResolver::new(self.base_uri.as_ref())
FixedBaseResolver::new(self.base_uri.as_ref()).try_resolve(actor.as_ref())?; .resolve(actor.as_ref())
.try_to_dedicated_string()?;
resolved.set_fragment(Some(fragment)); resolved.set_fragment(Some(fragment));
resolved resolved
} }
UrlKind::Media(uuid) => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::Media(uuid) => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new(&format!("media/{}", uuid))?.as_ref())?, .resolve(IriRelativeStr::new(&format!("media/{uuid}"))?.as_ref())
.try_to_dedicated_string()?,
UrlKind::NodeInfo => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::NodeInfo => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("nodeinfo/2.0.json")?.as_ref())?, .resolve(IriRelativeStr::new("nodeinfo/2.0.json")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Outbox => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::Outbox => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("outbox")?.as_ref())?, .resolve(IriRelativeStr::new("outbox")?.as_ref())
.try_to_dedicated_string()?,
}; };
Ok(iri) Ok(iri)
@ -437,25 +491,22 @@ impl Config {
} }
fn do_generate_admin_url(&self, kind: AdminUrlKind) -> Result<IriString, Error> { fn do_generate_admin_url(&self, kind: AdminUrlKind) -> Result<IriString, Error> {
let iri = match kind { let path = match kind {
AdminUrlKind::Allow => FixedBaseResolver::new(self.base_uri.as_ref()) AdminUrlKind::Allow => "api/v1/admin/allow",
.try_resolve(IriRelativeStr::new("api/v1/admin/allow")?.as_ref())?, AdminUrlKind::Disallow => "api/v1/admin/disallow",
AdminUrlKind::Disallow => FixedBaseResolver::new(self.base_uri.as_ref()) AdminUrlKind::Block => "api/v1/admin/block",
.try_resolve(IriRelativeStr::new("api/v1/admin/disallow")?.as_ref())?, AdminUrlKind::Unblock => "api/v1/admin/unblock",
AdminUrlKind::Block => FixedBaseResolver::new(self.base_uri.as_ref()) AdminUrlKind::Allowed => "api/v1/admin/allowed",
.try_resolve(IriRelativeStr::new("api/v1/admin/block")?.as_ref())?, AdminUrlKind::Blocked => "api/v1/admin/blocked",
AdminUrlKind::Unblock => FixedBaseResolver::new(self.base_uri.as_ref()) AdminUrlKind::Connected => "api/v1/admin/connected",
.try_resolve(IriRelativeStr::new("api/v1/admin/unblock")?.as_ref())?, AdminUrlKind::Stats => "api/v1/admin/stats",
AdminUrlKind::Allowed => FixedBaseResolver::new(self.base_uri.as_ref()) AdminUrlKind::LastSeen => "api/v1/admin/last_seen",
.try_resolve(IriRelativeStr::new("api/v1/admin/allowed")?.as_ref())?,
AdminUrlKind::Blocked => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/blocked")?.as_ref())?,
AdminUrlKind::Connected => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/connected")?.as_ref())?,
AdminUrlKind::Stats => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/stats")?.as_ref())?,
}; };
let iri = FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new(path)?.as_ref())
.try_to_dedicated_string()?;
Ok(iri) Ok(iri)
} }
} }

View File

@ -1,9 +1,11 @@
mod actor; mod actor;
mod last_online;
mod media; mod media;
mod node; mod node;
mod state; mod state;
pub(crate) use actor::ActorCache; pub(crate) use actor::ActorCache;
pub(crate) use last_online::LastOnline;
pub(crate) use media::MediaCache; pub(crate) use media::MediaCache;
pub(crate) use node::{Node, NodeCache}; pub(crate) use node::{Node, NodeCache};
pub(crate) use state::State; pub(crate) use state::State;

View File

@ -37,7 +37,7 @@ impl ActorCache {
ActorCache { db } ActorCache { db }
} }
#[tracing::instrument(level = "debug" name = "Get Actor", skip_all, fields(id = id.to_string().as_str(), requests))] #[tracing::instrument(level = "debug" name = "Get Actor", skip_all, fields(id = id.to_string().as_str()))]
pub(crate) async fn get( pub(crate) async fn get(
&self, &self,
id: &IriString, id: &IriString,
@ -56,12 +56,8 @@ impl ActorCache {
#[tracing::instrument(level = "debug", name = "Add Connection", skip(self))] #[tracing::instrument(level = "debug", name = "Add Connection", skip(self))]
pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> { pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> {
let add_connection = self.db.add_connection(actor.id.clone()); self.db.add_connection(actor.id.clone()).await?;
let save_actor = self.db.save_actor(actor); self.db.save_actor(actor).await
tokio::try_join!(add_connection, save_actor)?;
Ok(())
} }
#[tracing::instrument(level = "debug", name = "Remove Connection", skip(self))] #[tracing::instrument(level = "debug", name = "Remove Connection", skip(self))]
@ -69,13 +65,13 @@ impl ActorCache {
self.db.remove_connection(actor.id.clone()).await self.db.remove_connection(actor.id.clone()).await
} }
#[tracing::instrument(level = "debug", name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str(), requests))] #[tracing::instrument(level = "debug", name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str()))]
pub(crate) async fn get_no_cache( pub(crate) async fn get_no_cache(
&self, &self,
id: &IriString, id: &IriString,
requests: &Requests, requests: &Requests,
) -> Result<Actor, Error> { ) -> Result<Actor, Error> {
let accepted_actor = requests.fetch::<AcceptedActors>(id.as_str()).await?; let accepted_actor = requests.fetch::<AcceptedActors>(id).await?;
let input_authority = id.authority_components().ok_or(ErrorKind::MissingDomain)?; let input_authority = id.authority_components().ok_or(ErrorKind::MissingDomain)?;
let accepted_actor_id = accepted_actor let accepted_actor_id = accepted_actor

28
src/data/last_online.rs Normal file
View File

@ -0,0 +1,28 @@
use activitystreams::iri_string::types::IriStr;
use std::{collections::HashMap, sync::Mutex};
use time::OffsetDateTime;
pub(crate) struct LastOnline {
domains: Mutex<HashMap<String, OffsetDateTime>>,
}
impl LastOnline {
pub(crate) fn mark_seen(&self, iri: &IriStr) {
if let Some(authority) = iri.authority_str() {
self.domains
.lock()
.unwrap()
.insert(authority.to_string(), OffsetDateTime::now_utc());
}
}
pub(crate) fn take(&self) -> HashMap<String, OffsetDateTime> {
std::mem::take(&mut *self.domains.lock().unwrap())
}
pub(crate) fn empty() -> Self {
Self {
domains: Mutex::new(HashMap::default()),
}
}
}

View File

@ -36,11 +36,9 @@ impl NodeCache {
#[tracing::instrument(level = "debug", name = "Get nodes", skip(self))] #[tracing::instrument(level = "debug", name = "Get nodes", skip(self))]
pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> { pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> {
let infos = self.db.connected_info(); let infos = self.db.connected_info().await?;
let instances = self.db.connected_instance(); let instances = self.db.connected_instance().await?;
let contacts = self.db.connected_contact(); let contacts = self.db.connected_contact().await?;
let (infos, instances, contacts) = tokio::try_join!(infos, instances, contacts)?;
let vec = self let vec = self
.db .db
@ -184,7 +182,7 @@ impl Node {
let authority = url.authority_str().ok_or(ErrorKind::MissingDomain)?; let authority = url.authority_str().ok_or(ErrorKind::MissingDomain)?;
let scheme = url.scheme_str(); let scheme = url.scheme_str();
let base = iri!(format!("{}://{}", scheme, authority)); let base = iri!(format!("{scheme}://{authority}"));
Ok(Node { Ok(Node {
base, base,

View File

@ -10,8 +10,9 @@ use actix_web::web;
use lru::LruCache; use lru::LruCache;
use rand::thread_rng; use rand::thread_rng;
use rsa::{RsaPrivateKey, RsaPublicKey}; use rsa::{RsaPrivateKey, RsaPublicKey};
use std::sync::Arc; use std::sync::{Arc, RwLock};
use tokio::sync::RwLock;
use super::LastOnline;
#[derive(Clone)] #[derive(Clone)]
pub struct State { pub struct State {
@ -20,6 +21,7 @@ pub struct State {
object_cache: Arc<RwLock<LruCache<IriString, IriString>>>, object_cache: Arc<RwLock<LruCache<IriString, IriString>>>,
node_cache: NodeCache, node_cache: NodeCache,
breakers: Breakers, breakers: Breakers,
pub(crate) last_online: Arc<LastOnline>,
pub(crate) db: Db, pub(crate) db: Db,
} }
@ -44,6 +46,8 @@ impl State {
self.private_key.clone(), self.private_key.clone(),
config.user_agent(), config.user_agent(),
self.breakers.clone(), self.breakers.clone(),
self.last_online.clone(),
config.client_pool_size(),
) )
} }
@ -78,12 +82,12 @@ impl State {
.collect()) .collect())
} }
pub(crate) async fn is_cached(&self, object_id: &IriString) -> bool { pub(crate) fn is_cached(&self, object_id: &IriString) -> bool {
self.object_cache.read().await.contains(object_id) self.object_cache.read().unwrap().contains(object_id)
} }
pub(crate) async fn cache(&self, object_id: IriString, actor_id: IriString) { pub(crate) fn cache(&self, object_id: IriString, actor_id: IriString) {
self.object_cache.write().await.put(object_id, actor_id); self.object_cache.write().unwrap().put(object_id, actor_id);
} }
#[tracing::instrument(level = "debug", name = "Building state", skip_all)] #[tracing::instrument(level = "debug", name = "Building state", skip_all)]
@ -115,6 +119,7 @@ impl State {
node_cache: NodeCache::new(db.clone()), node_cache: NodeCache::new(db.clone()),
breakers: Breakers::default(), breakers: Breakers::default(),
db, db,
last_online: Arc::new(LastOnline::empty()),
}; };
Ok(state) Ok(state)

157
src/db.rs
View File

@ -7,8 +7,16 @@ use rsa::{
pkcs8::{DecodePrivateKey, EncodePrivateKey}, pkcs8::{DecodePrivateKey, EncodePrivateKey},
RsaPrivateKey, RsaPrivateKey,
}; };
use sled::Tree; use sled::{Batch, Tree};
use std::{collections::HashMap, sync::Arc, time::SystemTime}; use std::{
collections::{BTreeMap, HashMap},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::SystemTime,
};
use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -17,6 +25,8 @@ pub(crate) struct Db {
} }
struct Inner { struct Inner {
healthz: Tree,
healthz_counter: Arc<AtomicU64>,
actor_id_actor: Tree, actor_id_actor: Tree,
public_key_id_actor_id: Tree, public_key_id_actor_id: Tree,
connected_actor_ids: Tree, connected_actor_ids: Tree,
@ -28,6 +38,7 @@ struct Inner {
actor_id_info: Tree, actor_id_info: Tree,
actor_id_instance: Tree, actor_id_instance: Tree,
actor_id_contact: Tree, actor_id_contact: Tree,
last_seen: Tree,
restricted_mode: bool, restricted_mode: bool,
} }
@ -236,6 +247,8 @@ impl Db {
fn build_inner(restricted_mode: bool, db: sled::Db) -> Result<Self, Error> { fn build_inner(restricted_mode: bool, db: sled::Db) -> Result<Self, Error> {
Ok(Db { Ok(Db {
inner: Arc::new(Inner { inner: Arc::new(Inner {
healthz: db.open_tree("healthz")?,
healthz_counter: Arc::new(AtomicU64::new(0)),
actor_id_actor: db.open_tree("actor-id-actor")?, actor_id_actor: db.open_tree("actor-id-actor")?,
public_key_id_actor_id: db.open_tree("public-key-id-actor-id")?, public_key_id_actor_id: db.open_tree("public-key-id-actor-id")?,
connected_actor_ids: db.open_tree("connected-actor-ids")?, connected_actor_ids: db.open_tree("connected-actor-ids")?,
@ -247,6 +260,7 @@ impl Db {
actor_id_info: db.open_tree("actor-id-info")?, actor_id_info: db.open_tree("actor-id-info")?,
actor_id_instance: db.open_tree("actor-id-instance")?, actor_id_instance: db.open_tree("actor-id-instance")?,
actor_id_contact: db.open_tree("actor-id-contact")?, actor_id_contact: db.open_tree("actor-id-contact")?,
last_seen: db.open_tree("last-seen")?,
restricted_mode, restricted_mode,
}), }),
}) })
@ -254,7 +268,7 @@ impl Db {
async fn unblock<T>( async fn unblock<T>(
&self, &self,
f: impl Fn(&Inner) -> Result<T, Error> + Send + 'static, f: impl FnOnce(&Inner) -> Result<T, Error> + Send + 'static,
) -> Result<T, Error> ) -> Result<T, Error>
where where
T: Send + 'static, T: Send + 'static,
@ -266,6 +280,63 @@ impl Db {
Ok(t) Ok(t)
} }
pub(crate) async fn check_health(&self) -> Result<(), Error> {
let next = self.inner.healthz_counter.fetch_add(1, Ordering::Relaxed);
self.unblock(move |inner| {
inner
.healthz
.insert("healthz", &next.to_be_bytes()[..])
.map_err(Error::from)
})
.await?;
self.inner.healthz.flush_async().await?;
self.unblock(move |inner| inner.healthz.get("healthz").map_err(Error::from))
.await?;
Ok(())
}
pub(crate) async fn mark_last_seen(
&self,
nodes: HashMap<String, OffsetDateTime>,
) -> Result<(), Error> {
let mut batch = Batch::default();
for (domain, datetime) in nodes {
let datetime_string = serde_json::to_vec(&datetime)?;
batch.insert(domain.as_bytes(), datetime_string);
}
self.unblock(move |inner| inner.last_seen.apply_batch(batch).map_err(Error::from))
.await
}
pub(crate) async fn last_seen(
&self,
) -> Result<BTreeMap<String, Option<OffsetDateTime>>, Error> {
self.unblock(|inner| {
let mut map = BTreeMap::new();
for iri in inner.connected() {
let Some(authority_str) = iri.authority_str() else {
continue;
};
if let Some(datetime) = inner.last_seen.get(authority_str)? {
map.insert(
authority_str.to_string(),
Some(serde_json::from_slice(&datetime)?),
);
} else {
map.insert(authority_str.to_string(), None);
}
}
Ok(map)
})
.await
}
pub(crate) async fn connected_ids(&self) -> Result<Vec<IriString>, Error> { pub(crate) async fn connected_ids(&self) -> Result<Vec<IriString>, Error> {
self.unblock(|inner| Ok(inner.connected().collect())).await self.unblock(|inner| Ok(inner.connected().collect())).await
} }
@ -285,12 +356,12 @@ impl Db {
pub(crate) async fn info(&self, actor_id: IriString) -> Result<Option<Info>, Error> { pub(crate) async fn info(&self, actor_id: IriString) -> Result<Option<Info>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_info.get(actor_id.as_str().as_bytes())? { inner
let info = serde_json::from_slice(&ivec)?; .actor_id_info
Ok(Some(info)) .get(actor_id.as_str().as_bytes())?
} else { .map(|ivec| serde_json::from_slice(&ivec))
Ok(None) .transpose()
} .map_err(Error::from)
}) })
.await .await
} }
@ -319,12 +390,12 @@ impl Db {
pub(crate) async fn instance(&self, actor_id: IriString) -> Result<Option<Instance>, Error> { pub(crate) async fn instance(&self, actor_id: IriString) -> Result<Option<Instance>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_instance.get(actor_id.as_str().as_bytes())? { inner
let instance = serde_json::from_slice(&ivec)?; .actor_id_instance
Ok(Some(instance)) .get(actor_id.as_str().as_bytes())?
} else { .map(|ivec| serde_json::from_slice(&ivec))
Ok(None) .transpose()
} .map_err(Error::from)
}) })
.await .await
} }
@ -353,12 +424,12 @@ impl Db {
pub(crate) async fn contact(&self, actor_id: IriString) -> Result<Option<Contact>, Error> { pub(crate) async fn contact(&self, actor_id: IriString) -> Result<Option<Contact>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_contact.get(actor_id.as_str().as_bytes())? { inner
let contact = serde_json::from_slice(&ivec)?; .actor_id_contact
Ok(Some(contact)) .get(actor_id.as_str().as_bytes())?
} else { .map(|ivec| serde_json::from_slice(&ivec))
Ok(None) .transpose()
} .map_err(Error::from)
}) })
.await .await
} }
@ -383,22 +454,20 @@ impl Db {
pub(crate) async fn media_id(&self, url: IriString) -> Result<Option<Uuid>, Error> { pub(crate) async fn media_id(&self, url: IriString) -> Result<Option<Uuid>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.media_url_media_id.get(url.as_str().as_bytes())? { Ok(inner
Ok(uuid_from_ivec(ivec)) .media_url_media_id
} else { .get(url.as_str().as_bytes())?
Ok(None) .and_then(uuid_from_ivec))
}
}) })
.await .await
} }
pub(crate) async fn media_url(&self, id: Uuid) -> Result<Option<IriString>, Error> { pub(crate) async fn media_url(&self, id: Uuid) -> Result<Option<IriString>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.media_id_media_url.get(id.as_bytes())? { Ok(inner
Ok(url_from_ivec(ivec)) .media_id_media_url
} else { .get(id.as_bytes())?
Ok(None) .and_then(url_from_ivec))
}
}) })
.await .await
} }
@ -419,7 +488,7 @@ impl Db {
pub(crate) async fn is_connected(&self, base_id: IriString) -> Result<bool, Error> { pub(crate) async fn is_connected(&self, base_id: IriString) -> Result<bool, Error> {
let scheme = base_id.scheme_str(); let scheme = base_id.scheme_str();
let authority = base_id.authority_str().ok_or(ErrorKind::MissingDomain)?; let authority = base_id.authority_str().ok_or(ErrorKind::MissingDomain)?;
let prefix = format!("{}://{}", scheme, authority); let prefix = format!("{scheme}://{authority}");
self.unblock(move |inner| { self.unblock(move |inner| {
let connected = inner let connected = inner
@ -438,26 +507,22 @@ impl Db {
public_key_id: IriString, public_key_id: IriString,
) -> Result<Option<IriString>, Error> { ) -> Result<Option<IriString>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner Ok(inner
.public_key_id_actor_id .public_key_id_actor_id
.get(public_key_id.as_str().as_bytes())? .get(public_key_id.as_str().as_bytes())?
{ .and_then(url_from_ivec))
Ok(url_from_ivec(ivec))
} else {
Ok(None)
}
}) })
.await .await
} }
pub(crate) async fn actor(&self, actor_id: IriString) -> Result<Option<Actor>, Error> { pub(crate) async fn actor(&self, actor_id: IriString) -> Result<Option<Actor>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_actor.get(actor_id.as_str().as_bytes())? { inner
let actor = serde_json::from_slice(&ivec)?; .actor_id_actor
Ok(Some(actor)) .get(actor_id.as_str().as_bytes())?
} else { .map(|ivec| serde_json::from_slice(&ivec))
Ok(None) .transpose()
} .map_err(Error::from)
}) })
.await .await
} }
@ -479,7 +544,7 @@ impl Db {
} }
pub(crate) async fn remove_connection(&self, actor_id: IriString) -> Result<(), Error> { pub(crate) async fn remove_connection(&self, actor_id: IriString) -> Result<(), Error> {
tracing::debug!("Removing Connection: {}", actor_id); tracing::debug!("Removing Connection: {actor_id}");
self.unblock(move |inner| { self.unblock(move |inner| {
inner inner
.connected_actor_ids .connected_actor_ids
@ -491,7 +556,7 @@ impl Db {
} }
pub(crate) async fn add_connection(&self, actor_id: IriString) -> Result<(), Error> { pub(crate) async fn add_connection(&self, actor_id: IriString) -> Result<(), Error> {
tracing::debug!("Adding Connection: {}", actor_id); tracing::debug!("Adding Connection: {actor_id}");
self.unblock(move |inner| { self.unblock(move |inner| {
inner inner
.connected_actor_ids .connected_actor_ids

View File

@ -10,7 +10,7 @@ use std::{convert::Infallible, fmt::Debug, io};
use tracing_error::SpanTrace; use tracing_error::SpanTrace;
pub(crate) struct Error { pub(crate) struct Error {
context: SpanTrace, context: String,
kind: ErrorKind, kind: ErrorKind,
} }
@ -26,6 +26,14 @@ impl Error {
pub(crate) fn is_bad_request(&self) -> bool { pub(crate) fn is_bad_request(&self) -> bool {
matches!(self.kind, ErrorKind::Status(_, StatusCode::BAD_REQUEST)) matches!(self.kind, ErrorKind::Status(_, StatusCode::BAD_REQUEST))
} }
pub(crate) fn is_gone(&self) -> bool {
matches!(self.kind, ErrorKind::Status(_, StatusCode::GONE))
}
pub(crate) fn is_malformed_json(&self) -> bool {
matches!(self.kind, ErrorKind::Json(_))
}
} }
impl std::fmt::Debug for Error { impl std::fmt::Debug for Error {
@ -53,7 +61,7 @@ where
{ {
fn from(error: T) -> Self { fn from(error: T) -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: error.into(), kind: error.into(),
} }
} }
@ -77,10 +85,7 @@ pub(crate) enum ErrorKind {
ParseIri(#[from] activitystreams::iri_string::validate::Error), ParseIri(#[from] activitystreams::iri_string::validate::Error),
#[error("Couldn't normalize IRI, {0}")] #[error("Couldn't normalize IRI, {0}")]
NormalizeIri( NormalizeIri(#[from] std::collections::TryReserveError),
#[from]
activitystreams::iri_string::task::Error<activitystreams::iri_string::normalize::Error>,
),
#[error("Couldn't perform IO, {0}")] #[error("Couldn't perform IO, {0}")]
Io(#[from] io::Error), Io(#[from] io::Error),
@ -98,13 +103,13 @@ pub(crate) enum ErrorKind {
PrepareSign(#[from] PrepareSignError), PrepareSign(#[from] PrepareSignError),
#[error("Couldn't sign digest")] #[error("Couldn't sign digest")]
Signature(#[from] signature::Error), Signature(#[from] rsa::signature::Error),
#[error("Couldn't read signature")] #[error("Couldn't read signature")]
ReadSignature(signature::Error), ReadSignature(rsa::signature::Error),
#[error("Couldn't verify signature")] #[error("Couldn't verify signature")]
VerifySignature(signature::Error), VerifySignature(rsa::signature::Error),
#[error("Couldn't parse the signature header")] #[error("Couldn't parse the signature header")]
HeaderValidation(#[from] actix_web::http::header::InvalidHeaderValue), HeaderValidation(#[from] actix_web::http::header::InvalidHeaderValue),
@ -125,7 +130,7 @@ pub(crate) enum ErrorKind {
BadActor(String, String), BadActor(String, String),
#[error("Signature verification is required, but no signature was given")] #[error("Signature verification is required, but no signature was given")]
NoSignature(String), NoSignature(Option<String>),
#[error("Wrong ActivityPub kind, {0}")] #[error("Wrong ActivityPub kind, {0}")]
Kind(String), Kind(String),
@ -196,7 +201,8 @@ impl ResponseError for Error {
ErrorKind::Kind(_) ErrorKind::Kind(_)
| ErrorKind::MissingKind | ErrorKind::MissingKind
| ErrorKind::MissingId | ErrorKind::MissingId
| ErrorKind::ObjectCount => StatusCode::BAD_REQUEST, | ErrorKind::ObjectCount
| ErrorKind::NoSignature(_) => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::INTERNAL_SERVER_ERROR,
} }
} }

View File

@ -80,7 +80,7 @@ impl Admin {
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[error("Failed authentication")] #[error("Failed authentication")]
pub(crate) struct Error { pub(crate) struct Error {
context: SpanTrace, context: String,
#[source] #[source]
kind: ErrorKind, kind: ErrorKind,
} }
@ -88,49 +88,49 @@ pub(crate) struct Error {
impl Error { impl Error {
fn invalid() -> Self { fn invalid() -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::Invalid, kind: ErrorKind::Invalid,
} }
} }
fn missing_config() -> Self { fn missing_config() -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::MissingConfig, kind: ErrorKind::MissingConfig,
} }
} }
fn missing_db() -> Self { fn missing_db() -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::MissingDb, kind: ErrorKind::MissingDb,
} }
} }
fn bcrypt_verify(e: BcryptError) -> Self { fn bcrypt_verify(e: BcryptError) -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::BCryptVerify(e), kind: ErrorKind::BCryptVerify(e),
} }
} }
fn bcrypt_hash(e: BcryptError) -> Self { fn bcrypt_hash(e: BcryptError) -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::BCryptHash(e), kind: ErrorKind::BCryptHash(e),
} }
} }
fn parse_header(e: ParseError) -> Self { fn parse_header(e: ParseError) -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::ParseHeader(e), kind: ErrorKind::ParseHeader(e),
} }
} }
fn canceled(_: BlockingError) -> Self { fn canceled(_: BlockingError) -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::Canceled, kind: ErrorKind::Canceled,
} }
} }

View File

@ -5,6 +5,7 @@ mod deliver_many;
mod instance; mod instance;
mod nodeinfo; mod nodeinfo;
mod process_listeners; mod process_listeners;
mod record_last_online;
pub(crate) use self::{ pub(crate) use self::{
contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance,
@ -15,14 +16,14 @@ use crate::{
config::Config, config::Config,
data::{ActorCache, MediaCache, NodeCache, State}, data::{ActorCache, MediaCache, NodeCache, State},
error::{Error, ErrorKind}, error::{Error, ErrorKind},
jobs::process_listeners::Listeners, jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline},
requests::Requests, requests::Requests,
}; };
use background_jobs::{ use background_jobs::{
memory_storage::{ActixTimer, Storage}, memory_storage::{ActixTimer, Storage},
Job, Manager, QueueHandle, WorkerConfig, Job, QueueHandle, WorkerConfig,
}; };
use std::{convert::TryFrom, num::NonZeroUsize, time::Duration}; use std::time::Duration;
fn debug_object(activity: &serde_json::Value) -> &serde_json::Value { fn debug_object(activity: &serde_json::Value) -> &serde_json::Value {
let mut object = &activity["object"]["type"]; let mut object = &activity["object"]["type"];
@ -43,11 +44,8 @@ pub(crate) fn create_workers(
actors: ActorCache, actors: ActorCache,
media: MediaCache, media: MediaCache,
config: Config, config: Config,
) -> (Manager, JobServer) { ) -> JobServer {
let parallelism = std::thread::available_parallelism() let queue_handle = WorkerConfig::new(Storage::new(ActixTimer), move |queue_handle| {
.unwrap_or_else(|_| NonZeroUsize::try_from(1).expect("nonzero"));
let shared = WorkerConfig::new_managed(Storage::new(ActixTimer), move |queue_handle| {
JobState::new( JobState::new(
state.clone(), state.clone(),
actors.clone(), actors.clone(),
@ -62,6 +60,7 @@ pub(crate) fn create_workers(
.register::<QueryInstance>() .register::<QueryInstance>()
.register::<Listeners>() .register::<Listeners>()
.register::<QueryContact>() .register::<QueryContact>()
.register::<RecordLastOnline>()
.register::<apub::Announce>() .register::<apub::Announce>()
.register::<apub::Follow>() .register::<apub::Follow>()
.register::<apub::Forward>() .register::<apub::Forward>()
@ -70,13 +69,12 @@ pub(crate) fn create_workers(
.set_worker_count("maintenance", 2) .set_worker_count("maintenance", 2)
.set_worker_count("apub", 2) .set_worker_count("apub", 2)
.set_worker_count("deliver", 8) .set_worker_count("deliver", 8)
.start_with_threads(parallelism); .start();
shared.every(Duration::from_secs(60 * 5), Listeners); queue_handle.every(Duration::from_secs(60 * 5), Listeners);
queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline);
let job_server = JobServer::new(shared.queue_handle().clone()); JobServer::new(queue_handle)
(shared, job_server)
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]

View File

@ -36,13 +36,13 @@ async fn get_inboxes(
state.inboxes_without(&actor.inbox, &authority).await state.inboxes_without(&actor.inbox, &authority).await
} }
fn prepare_activity<T, U, V, Kind>( fn prepare_activity<T, U, V>(
mut t: T, mut t: T,
id: impl TryInto<IriString, Error = U>, id: impl TryInto<IriString, Error = U>,
to: impl TryInto<IriString, Error = V>, to: impl TryInto<IriString, Error = V>,
) -> Result<T, Error> ) -> Result<T, Error>
where where
T: ObjectExt<Kind> + BaseExt<Kind>, T: ObjectExt + BaseExt,
Error: From<U> + From<V>, Error: From<U> + From<V>,
{ {
t.set_id(id.try_into()?) t.set_id(id.try_into()?)

View File

@ -42,7 +42,7 @@ impl Announce {
.queue(DeliverMany::new(inboxes, announce)?) .queue(DeliverMany::new(inboxes, announce)?)
.await?; .await?;
state.state.cache(self.object_id, activity_id).await; state.state.cache(self.object_id, activity_id);
Ok(()) Ok(())
} }
} }

View File

@ -42,7 +42,7 @@ impl QueryContact {
let contact = match state let contact = match state
.requests .requests
.fetch::<AcceptedActors>(self.contact_id.as_str()) .fetch::<AcceptedActors>(&self.contact_id)
.await .await
{ {
Ok(contact) => contact, Ok(contact) => contact,

View File

@ -35,7 +35,7 @@ impl Deliver {
#[tracing::instrument(name = "Deliver", skip(state))] #[tracing::instrument(name = "Deliver", skip(state))]
async fn permform(self, state: JobState) -> Result<(), Error> { async fn permform(self, state: JobState) -> Result<(), Error> {
if let Err(e) = state.requests.deliver(self.to, &self.data).await { if let Err(e) = state.requests.deliver(&self.to, &self.data).await {
if e.is_breaker() { if e.is_breaker() {
tracing::debug!("Not trying due to failed breaker"); tracing::debug!("Not trying due to failed breaker");
return Ok(()); return Ok(());

File diff suppressed because one or more lines are too long

View File

@ -39,11 +39,11 @@ impl QueryNodeinfo {
.authority_str() .authority_str()
.ok_or(ErrorKind::MissingDomain)?; .ok_or(ErrorKind::MissingDomain)?;
let scheme = self.actor_id.scheme_str(); let scheme = self.actor_id.scheme_str();
let well_known_uri = iri!(format!("{}://{}/.well-known/nodeinfo", scheme, authority)); let well_known_uri = iri!(format!("{scheme}://{authority}/.well-known/nodeinfo"));
let well_known = match state let well_known = match state
.requests .requests
.fetch_json::<WellKnown>(well_known_uri.as_str()) .fetch_json::<WellKnown>(&well_known_uri)
.await .await
{ {
Ok(well_known) => well_known, Ok(well_known) => well_known,
@ -55,7 +55,7 @@ impl QueryNodeinfo {
}; };
let href = if let Some(link) = well_known.links.into_iter().find(|l| l.rel.is_supported()) { let href = if let Some(link) = well_known.links.into_iter().find(|l| l.rel.is_supported()) {
link.href iri!(&link.href)
} else { } else {
return Ok(()); return Ok(());
}; };
@ -168,7 +168,7 @@ impl<'de> serde::de::Visitor<'de> for SupportedVersionVisitor {
type Value = SupportedVersion; type Value = SupportedVersion;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "a string starting with '{}'", SUPPORTED_VERSIONS) write!(f, "a string starting with '{SUPPORTED_VERSIONS}'")
} }
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E> fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
@ -187,7 +187,7 @@ impl<'de> serde::de::Visitor<'de> for SupportedNodeinfoVisitor {
type Value = SupportedNodeinfo; type Value = SupportedNodeinfo;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "a string starting with '{}'", SUPPORTED_NODEINFO) write!(f, "a string starting with '{SUPPORTED_NODEINFO}'")
} }
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E> fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>

View File

@ -0,0 +1,28 @@
use crate::{error::Error, jobs::JobState};
use background_jobs::{ActixJob, Backoff};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct RecordLastOnline;
impl RecordLastOnline {
#[tracing::instrument(skip(state))]
async fn perform(self, state: JobState) -> Result<(), Error> {
let nodes = state.state.last_online.take();
state.state.db.mark_last_seen(nodes).await
}
}
impl ActixJob for RecordLastOnline {
type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::RecordLastOnline";
const QUEUE: &'static str = "maintenance";
const BACKOFF: Backoff = Backoff::Linear(1);
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -2,10 +2,14 @@
#![allow(clippy::needless_borrow)] #![allow(clippy::needless_borrow)]
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_rt::task::JoinHandle;
use actix_web::{middleware::Compress, web, App, HttpServer}; use actix_web::{middleware::Compress, web, App, HttpServer};
use collector::MemoryCollector; use collector::MemoryCollector;
#[cfg(feature = "console")] #[cfg(feature = "console")]
use console_subscriber::ConsoleLayer; use console_subscriber::ConsoleLayer;
use http_signature_normalization_actix::middleware::VerifySignature;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::layers::FanoutBuilder;
use opentelemetry::{sdk::Resource, KeyValue}; use opentelemetry::{sdk::Resource, KeyValue};
use opentelemetry_otlp::WithExportConfig; use opentelemetry_otlp::WithExportConfig;
use rustls::ServerConfig; use rustls::ServerConfig;
@ -35,8 +39,8 @@ use self::{
data::{ActorCache, MediaCache, State}, data::{ActorCache, MediaCache, State},
db::Db, db::Db,
jobs::create_workers, jobs::create_workers,
middleware::{DebugPayload, RelayResolver, Timings}, middleware::{DebugPayload, MyVerify, RelayResolver, Timings},
routes::{actor, inbox, index, nodeinfo, nodeinfo_meta, statics}, routes::{actor, healthz, inbox, index, nodeinfo, nodeinfo_meta, statics},
}; };
fn init_subscriber( fn init_subscriber(
@ -94,19 +98,35 @@ fn init_subscriber(
Ok(()) Ok(())
} }
fn main() -> Result<(), anyhow::Error> { #[actix_rt::main]
async fn main() -> Result<(), anyhow::Error> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
let config = Config::build()?; let config = Config::build()?;
init_subscriber(Config::software_name(), config.opentelemetry_url())?; init_subscriber(Config::software_name(), config.opentelemetry_url())?;
let collector = MemoryCollector::new();
collector.install()?;
let args = Args::new(); let args = Args::new();
if args.any() { if args.any() {
return client_main(config, args); return client_main(config, args).await?;
}
let collector = MemoryCollector::new();
if let Some(bind_addr) = config.prometheus_bind_address() {
let (recorder, exporter) = PrometheusBuilder::new()
.with_http_listener(bind_addr)
.build()?;
actix_rt::spawn(exporter);
let recorder = FanoutBuilder::default()
.add_recorder(recorder)
.add_recorder(collector.clone())
.build();
metrics::set_boxed_recorder(Box::new(recorder))?;
} else {
collector.install()?;
} }
tracing::warn!("Opening DB"); tracing::warn!("Opening DB");
@ -116,20 +136,19 @@ fn main() -> Result<(), anyhow::Error> {
let actors = ActorCache::new(db.clone()); let actors = ActorCache::new(db.clone());
let media = MediaCache::new(db.clone()); let media = MediaCache::new(db.clone());
server_main(db, actors, media, collector, config)?; server_main(db, actors, media, collector, config).await??;
tracing::warn!("Application exit"); tracing::warn!("Application exit");
Ok(()) Ok(())
} }
#[actix_rt::main] fn client_main(config: Config, args: Args) -> JoinHandle<Result<(), anyhow::Error>> {
async fn client_main(config: Config, args: Args) -> Result<(), anyhow::Error> { actix_rt::spawn(do_client_main(config, args))
actix_rt::spawn(do_client_main(config, args)).await?
} }
async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> { async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
let client = requests::build_client(&config.user_agent()); let client = requests::build_client(&config.user_agent(), config.client_pool_size());
if !args.blocks().is_empty() || !args.allowed().is_empty() { if !args.blocks().is_empty() || !args.allowed().is_empty() {
if args.undo() { if args.undo() {
@ -142,6 +161,39 @@ async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error>
println!("Updated lists"); println!("Updated lists");
} }
if args.contacted() {
let last_seen = admin::client::last_seen(&client, &config).await?;
let mut report = String::from("Contacted:");
if !last_seen.never.is_empty() {
report += "\nNever seen:\n";
}
for domain in last_seen.never {
report += "\t";
report += &domain;
report += "\n";
}
if !last_seen.last_seen.is_empty() {
report += "\nSeen:\n";
}
for (datetime, domains) in last_seen.last_seen {
for domain in domains {
report += "\t";
report += &datetime.to_string();
report += " - ";
report += &domain;
report += "\n";
}
}
report += "\n";
println!("{report}");
}
if args.list() { if args.list() {
let (blocked, allowed, connected) = tokio::try_join!( let (blocked, allowed, connected) = tokio::try_join!(
admin::client::blocked(&client, &config), admin::client::blocked(&client, &config),
@ -174,15 +226,14 @@ async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error>
Ok(()) Ok(())
} }
#[actix_rt::main] fn server_main(
async fn server_main(
db: Db, db: Db,
actors: ActorCache, actors: ActorCache,
media: MediaCache, media: MediaCache,
collector: MemoryCollector, collector: MemoryCollector,
config: Config, config: Config,
) -> Result<(), anyhow::Error> { ) -> JoinHandle<Result<(), anyhow::Error>> {
actix_rt::spawn(do_server_main(db, actors, media, collector, config)).await? actix_rt::spawn(do_server_main(db, actors, media, collector, config))
} }
async fn do_server_main( async fn do_server_main(
@ -195,10 +246,6 @@ async fn do_server_main(
tracing::warn!("Creating state"); tracing::warn!("Creating state");
let state = State::build(db.clone()).await?; let state = State::build(db.clone()).await?;
tracing::warn!("Creating workers");
let (manager, job_server) =
create_workers(state.clone(), actors.clone(), media.clone(), config.clone());
if let Some((token, admin_handle)) = config.telegram_info() { if let Some((token, admin_handle)) = config.telegram_info() {
tracing::warn!("Creating telegram handler"); tracing::warn!("Creating telegram handler");
telegram::start(admin_handle.to_owned(), db.clone(), token); telegram::start(admin_handle.to_owned(), db.clone(), token);
@ -208,13 +255,18 @@ async fn do_server_main(
let bind_address = config.bind_address(); let bind_address = config.bind_address();
let server = HttpServer::new(move || { let server = HttpServer::new(move || {
let requests = state.requests(&config);
let job_server =
create_workers(state.clone(), actors.clone(), media.clone(), config.clone());
let app = App::new() let app = App::new()
.app_data(web::Data::new(db.clone())) .app_data(web::Data::new(db.clone()))
.app_data(web::Data::new(state.clone())) .app_data(web::Data::new(state.clone()))
.app_data(web::Data::new(state.requests(&config))) .app_data(web::Data::new(requests.clone()))
.app_data(web::Data::new(actors.clone())) .app_data(web::Data::new(actors.clone()))
.app_data(web::Data::new(config.clone())) .app_data(web::Data::new(config.clone()))
.app_data(web::Data::new(job_server.clone())) .app_data(web::Data::new(job_server))
.app_data(web::Data::new(media.clone())) .app_data(web::Data::new(media.clone()))
.app_data(web::Data::new(collector.clone())); .app_data(web::Data::new(collector.clone()));
@ -227,15 +279,15 @@ async fn do_server_main(
app.wrap(Compress::default()) app.wrap(Compress::default())
.wrap(TracingLogger::default()) .wrap(TracingLogger::default())
.wrap(Timings) .wrap(Timings)
.route("/healthz", web::get().to(healthz))
.service(web::resource("/").route(web::get().to(index))) .service(web::resource("/").route(web::get().to(index)))
.service(web::resource("/media/{path}").route(web::get().to(routes::media))) .service(web::resource("/media/{path}").route(web::get().to(routes::media)))
.service( .service(
web::resource("/inbox") web::resource("/inbox")
.wrap(config.digest_middleware()) .wrap(config.digest_middleware())
.wrap(config.signature_middleware( .wrap(VerifySignature::new(
state.requests(&config), MyVerify(requests, actors.clone(), state.clone()),
actors.clone(), Default::default(),
state.clone(),
)) ))
.wrap(DebugPayload(config.debug())) .wrap(DebugPayload(config.debug()))
.route(web::post().to(inbox)), .route(web::post().to(inbox)),
@ -258,7 +310,8 @@ async fn do_server_main(
.route("/allowed", web::get().to(admin::routes::allowed)) .route("/allowed", web::get().to(admin::routes::allowed))
.route("/blocked", web::get().to(admin::routes::blocked)) .route("/blocked", web::get().to(admin::routes::blocked))
.route("/connected", web::get().to(admin::routes::connected)) .route("/connected", web::get().to(admin::routes::connected))
.route("/stats", web::get().to(admin::routes::stats)), .route("/stats", web::get().to(admin::routes::stats))
.route("/last_seen", web::get().to(admin::routes::last_seen)),
), ),
) )
}); });
@ -282,10 +335,6 @@ async fn do_server_main(
tracing::warn!("Server closed"); tracing::warn!("Server closed");
drop(manager);
tracing::warn!("Main complete");
Ok(()) Ok(())
} }

View File

@ -6,10 +6,12 @@ use crate::{
}; };
use activitystreams::{base::BaseExt, iri, iri_string::types::IriString}; use activitystreams::{base::BaseExt, iri, iri_string::types::IriString};
use actix_web::web; use actix_web::web;
use base64::{engine::general_purpose::STANDARD, Engine};
use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm}; use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm};
use rsa::{pkcs1v15::VerifyingKey, pkcs8::DecodePublicKey, RsaPublicKey}; use rsa::{
use sha2::{Digest, Sha256}; pkcs1v15::Signature, pkcs1v15::VerifyingKey, pkcs8::DecodePublicKey, sha2::Sha256,
use signature::{DigestVerifier, Signature}; signature::Verifier, RsaPublicKey,
};
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -65,11 +67,17 @@ impl MyVerify {
actor_id actor_id
} else { } else {
self.0 match self.0.fetch::<PublicKeyResponse>(&public_key_id).await {
.fetch::<PublicKeyResponse>(public_key_id.as_str()) Ok(res) => res.actor_id().ok_or(ErrorKind::MissingId),
.await? Err(e) => {
.actor_id() if e.is_gone() {
.ok_or(ErrorKind::MissingId)? tracing::warn!("Actor gone: {public_key_id}");
return Ok(false);
} else {
return Err(e);
}
}
}?
}; };
// Previously we verified the sig from an actor's local cache // Previously we verified the sig from an actor's local cache
@ -117,13 +125,13 @@ async fn do_verify(
let span = tracing::Span::current(); let span = tracing::Span::current();
web::block(move || { web::block(move || {
span.in_scope(|| { span.in_scope(|| {
let decoded = base64::decode(signature)?; let decoded = STANDARD.decode(signature)?;
let signature = Signature::from_bytes(&decoded).map_err(ErrorKind::ReadSignature)?; let signature =
let hashed = Sha256::new_with_prefix(signing_string.as_bytes()); Signature::try_from(decoded.as_slice()).map_err(ErrorKind::ReadSignature)?;
let verifying_key = VerifyingKey::new_with_prefix(public_key); let verifying_key = VerifyingKey::<Sha256>::new(public_key);
verifying_key verifying_key
.verify_digest(hashed, &signature) .verify(signing_string.as_bytes(), &signature)
.map_err(ErrorKind::VerifySignature)?; .map_err(ErrorKind::VerifySignature)?;
Ok(()) as Result<(), Error> Ok(()) as Result<(), Error>
@ -159,20 +167,20 @@ mod tests {
use crate::apub::AcceptedActors; use crate::apub::AcceptedActors;
use rsa::{pkcs8::DecodePublicKey, RsaPublicKey}; use rsa::{pkcs8::DecodePublicKey, RsaPublicKey};
const ASONIX_DOG_ACTOR: &'static str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://w3id.org/security/v1",{"manuallyApprovesFollowers":"as:manuallyApprovesFollowers","toot":"http://joinmastodon.org/ns#","featured":{"@id":"toot:featured","@type":"@id"},"featuredTags":{"@id":"toot:featuredTags","@type":"@id"},"alsoKnownAs":{"@id":"as:alsoKnownAs","@type":"@id"},"movedTo":{"@id":"as:movedTo","@type":"@id"},"schema":"http://schema.org#","PropertyValue":"schema:PropertyValue","value":"schema:value","discoverable":"toot:discoverable","Device":"toot:Device","Ed25519Signature":"toot:Ed25519Signature","Ed25519Key":"toot:Ed25519Key","Curve25519Key":"toot:Curve25519Key","EncryptedMessage":"toot:EncryptedMessage","publicKeyBase64":"toot:publicKeyBase64","deviceId":"toot:deviceId","claim":{"@type":"@id","@id":"toot:claim"},"fingerprintKey":{"@type":"@id","@id":"toot:fingerprintKey"},"identityKey":{"@type":"@id","@id":"toot:identityKey"},"devices":{"@type":"@id","@id":"toot:devices"},"messageFranking":"toot:messageFranking","messageType":"toot:messageType","cipherText":"toot:cipherText","suspended":"toot:suspended"}],"id":"https://masto.asonix.dog/actor","type":"Application","inbox":"https://masto.asonix.dog/actor/inbox","outbox":"https://masto.asonix.dog/actor/outbox","preferredUsername":"masto.asonix.dog","url":"https://masto.asonix.dog/about/more?instance_actor=true","manuallyApprovesFollowers":true,"publicKey":{"id":"https://masto.asonix.dog/actor#main-key","owner":"https://masto.asonix.dog/actor","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n"},"endpoints":{"sharedInbox":"https://masto.asonix.dog/inbox"}}"#; const ASONIX_DOG_ACTOR: &str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://w3id.org/security/v1",{"manuallyApprovesFollowers":"as:manuallyApprovesFollowers","toot":"http://joinmastodon.org/ns#","featured":{"@id":"toot:featured","@type":"@id"},"featuredTags":{"@id":"toot:featuredTags","@type":"@id"},"alsoKnownAs":{"@id":"as:alsoKnownAs","@type":"@id"},"movedTo":{"@id":"as:movedTo","@type":"@id"},"schema":"http://schema.org#","PropertyValue":"schema:PropertyValue","value":"schema:value","discoverable":"toot:discoverable","Device":"toot:Device","Ed25519Signature":"toot:Ed25519Signature","Ed25519Key":"toot:Ed25519Key","Curve25519Key":"toot:Curve25519Key","EncryptedMessage":"toot:EncryptedMessage","publicKeyBase64":"toot:publicKeyBase64","deviceId":"toot:deviceId","claim":{"@type":"@id","@id":"toot:claim"},"fingerprintKey":{"@type":"@id","@id":"toot:fingerprintKey"},"identityKey":{"@type":"@id","@id":"toot:identityKey"},"devices":{"@type":"@id","@id":"toot:devices"},"messageFranking":"toot:messageFranking","messageType":"toot:messageType","cipherText":"toot:cipherText","suspended":"toot:suspended"}],"id":"https://masto.asonix.dog/actor","type":"Application","inbox":"https://masto.asonix.dog/actor/inbox","outbox":"https://masto.asonix.dog/actor/outbox","preferredUsername":"masto.asonix.dog","url":"https://masto.asonix.dog/about/more?instance_actor=true","manuallyApprovesFollowers":true,"publicKey":{"id":"https://masto.asonix.dog/actor#main-key","owner":"https://masto.asonix.dog/actor","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n"},"endpoints":{"sharedInbox":"https://masto.asonix.dog/inbox"}}"#;
const KARJALAZET_RELAY: &'static str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://pleroma.karjalazet.se/schemas/litepub-0.1.jsonld",{"@language":"und"}],"alsoKnownAs":[],"attachment":[],"capabilities":{},"discoverable":false,"endpoints":{"oauthAuthorizationEndpoint":"https://pleroma.karjalazet.se/oauth/authorize","oauthRegistrationEndpoint":"https://pleroma.karjalazet.se/api/v1/apps","oauthTokenEndpoint":"https://pleroma.karjalazet.se/oauth/token","sharedInbox":"https://pleroma.karjalazet.se/inbox","uploadMedia":"https://pleroma.karjalazet.se/api/ap/upload_media"},"featured":"https://pleroma.karjalazet.se/relay/collections/featured","followers":"https://pleroma.karjalazet.se/relay/followers","following":"https://pleroma.karjalazet.se/relay/following","id":"https://pleroma.karjalazet.se/relay","inbox":"https://pleroma.karjalazet.se/relay/inbox","manuallyApprovesFollowers":false,"name":null,"outbox":"https://pleroma.karjalazet.se/relay/outbox","preferredUsername":"relay","publicKey":{"id":"https://pleroma.karjalazet.se/relay#main-key","owner":"https://pleroma.karjalazet.se/relay","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n"},"summary":"","tag":[],"type":"Person","url":"https://pleroma.karjalazet.se/relay"}"#; const KARJALAZET_RELAY: &str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://pleroma.karjalazet.se/schemas/litepub-0.1.jsonld",{"@language":"und"}],"alsoKnownAs":[],"attachment":[],"capabilities":{},"discoverable":false,"endpoints":{"oauthAuthorizationEndpoint":"https://pleroma.karjalazet.se/oauth/authorize","oauthRegistrationEndpoint":"https://pleroma.karjalazet.se/api/v1/apps","oauthTokenEndpoint":"https://pleroma.karjalazet.se/oauth/token","sharedInbox":"https://pleroma.karjalazet.se/inbox","uploadMedia":"https://pleroma.karjalazet.se/api/ap/upload_media"},"featured":"https://pleroma.karjalazet.se/relay/collections/featured","followers":"https://pleroma.karjalazet.se/relay/followers","following":"https://pleroma.karjalazet.se/relay/following","id":"https://pleroma.karjalazet.se/relay","inbox":"https://pleroma.karjalazet.se/relay/inbox","manuallyApprovesFollowers":false,"name":null,"outbox":"https://pleroma.karjalazet.se/relay/outbox","preferredUsername":"relay","publicKey":{"id":"https://pleroma.karjalazet.se/relay#main-key","owner":"https://pleroma.karjalazet.se/relay","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n"},"summary":"","tag":[],"type":"Person","url":"https://pleroma.karjalazet.se/relay"}"#;
const ASONIX_DOG_KEY: &'static str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n"; const ASONIX_DOG_KEY: &str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n";
const KARJALAZET_KEY: &'static str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n"; const KARJALAZET_KEY: &str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n";
#[test] #[test]
fn handles_masto_keys() { fn handles_masto_keys() {
println!("{}", ASONIX_DOG_KEY); println!("{ASONIX_DOG_KEY}");
let _ = RsaPublicKey::from_public_key_pem(ASONIX_DOG_KEY.trim()).unwrap(); let _ = RsaPublicKey::from_public_key_pem(ASONIX_DOG_KEY.trim()).unwrap();
} }
#[test] #[test]
fn handles_pleromo_keys() { fn handles_pleromo_keys() {
println!("{}", KARJALAZET_KEY); println!("{KARJALAZET_KEY}");
let _ = RsaPublicKey::from_public_key_pem(KARJALAZET_KEY.trim()).unwrap(); let _ = RsaPublicKey::from_public_key_pem(KARJALAZET_KEY.trim()).unwrap();
} }

View File

@ -1,20 +1,22 @@
use crate::error::{Error, ErrorKind}; use crate::{
data::LastOnline,
error::{Error, ErrorKind},
};
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_web::http::header::Date; use actix_web::http::header::Date;
use awc::{error::SendRequestError, Client, ClientResponse}; use awc::{error::SendRequestError, Client, ClientResponse, Connector};
use base64::{engine::general_purpose::STANDARD, Engine};
use dashmap::DashMap; use dashmap::DashMap;
use http_signature_normalization_actix::prelude::*; use http_signature_normalization_actix::prelude::*;
use rand::thread_rng; use rand::thread_rng;
use rsa::{pkcs1v15::SigningKey, RsaPrivateKey}; use rsa::{
use sha2::{Digest, Sha256}; pkcs1v15::SigningKey,
use signature::RandomizedSigner; sha2::{Digest, Sha256},
signature::{RandomizedSigner, SignatureEncoding},
RsaPrivateKey,
};
use std::{ use std::{
cell::RefCell, sync::Arc,
rc::Rc,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
use tracing_awc::Tracing; use tracing_awc::Tracing;
@ -54,7 +56,7 @@ impl Breakers {
if let Some(mut breaker) = self.inner.get_mut(authority) { if let Some(mut breaker) = self.inner.get_mut(authority) {
breaker.fail(); breaker.fail();
if !breaker.should_try() { if !breaker.should_try() {
tracing::warn!("Failed breaker for {}", authority); tracing::warn!("Failed breaker for {authority}");
} }
false false
} else { } else {
@ -138,20 +140,20 @@ impl Default for Breaker {
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct Requests { pub(crate) struct Requests {
client: Rc<RefCell<Client>>, pool_size: usize,
consecutive_errors: Rc<AtomicUsize>, client: Client,
error_limit: usize,
key_id: String, key_id: String,
user_agent: String, user_agent: String,
private_key: RsaPrivateKey, private_key: RsaPrivateKey,
config: Config, config: Config,
breakers: Breakers, breakers: Breakers,
last_online: Arc<LastOnline>,
} }
impl std::fmt::Debug for Requests { impl std::fmt::Debug for Requests {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Requests") f.debug_struct("Requests")
.field("error_limit", &self.error_limit) .field("pool_size", &self.pool_size)
.field("key_id", &self.key_id) .field("key_id", &self.key_id)
.field("user_agent", &self.user_agent) .field("user_agent", &self.user_agent)
.field("config", &self.config) .field("config", &self.config)
@ -160,12 +162,25 @@ impl std::fmt::Debug for Requests {
} }
} }
pub(crate) fn build_client(user_agent: &str) -> Client { thread_local! {
Client::builder() static CLIENT: std::cell::OnceCell<Client> = std::cell::OnceCell::new();
.wrap(Tracing) }
.add_default_header(("User-Agent", user_agent.to_string()))
.timeout(Duration::from_secs(15)) pub(crate) fn build_client(user_agent: &str, pool_size: usize) -> Client {
.finish() CLIENT.with(|client| {
client
.get_or_init(|| {
let connector = Connector::new().limit(pool_size);
Client::builder()
.connector(connector)
.wrap(Tracing)
.add_default_header(("User-Agent", user_agent.to_string()))
.timeout(Duration::from_secs(15))
.finish()
})
.clone()
})
} }
impl Requests { impl Requests {
@ -174,16 +189,18 @@ impl Requests {
private_key: RsaPrivateKey, private_key: RsaPrivateKey,
user_agent: String, user_agent: String,
breakers: Breakers, breakers: Breakers,
last_online: Arc<LastOnline>,
pool_size: usize,
) -> Self { ) -> Self {
Requests { Requests {
client: Rc::new(RefCell::new(build_client(&user_agent))), pool_size,
consecutive_errors: Rc::new(AtomicUsize::new(0)), client: build_client(&user_agent, pool_size),
error_limit: 3,
key_id, key_id,
user_agent, user_agent,
private_key, private_key,
config: Config::default().mastodon_compat(), config: Config::default().mastodon_compat(),
breakers, breakers,
last_online,
} }
} }
@ -191,41 +208,25 @@ impl Requests {
self.breakers.succeed(iri); self.breakers.succeed(iri);
} }
fn count_err(&self) {
let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
if count + 1 >= self.error_limit {
tracing::warn!("{} consecutive errors, rebuilding http client", count + 1);
*self.client.borrow_mut() = build_client(&self.user_agent);
self.reset_err();
}
}
fn reset_err(&self) {
self.consecutive_errors.swap(0, Ordering::Relaxed);
}
async fn check_response( async fn check_response(
&self, &self,
parsed_url: &IriString, parsed_url: &IriString,
res: Result<ClientResponse, SendRequestError>, res: Result<ClientResponse, SendRequestError>,
) -> Result<ClientResponse, Error> { ) -> Result<ClientResponse, Error> {
if res.is_err() { if res.is_err() {
self.count_err();
self.breakers.fail(&parsed_url); self.breakers.fail(&parsed_url);
} }
let mut res = let mut res =
res.map_err(|e| ErrorKind::SendRequest(parsed_url.to_string(), e.to_string()))?; res.map_err(|e| ErrorKind::SendRequest(parsed_url.to_string(), e.to_string()))?;
self.reset_err(); if res.status().is_server_error() {
if !res.status().is_success() {
self.breakers.fail(&parsed_url); self.breakers.fail(&parsed_url);
if let Ok(bytes) = res.body().await { if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() { if !s.is_empty() {
tracing::warn!("Response from {}, {}", parsed_url, s); tracing::warn!("Response from {parsed_url}, {s}");
} }
} }
} }
@ -233,58 +234,55 @@ impl Requests {
return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into()); return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into());
} }
self.last_online.mark_seen(&parsed_url);
self.breakers.succeed(&parsed_url); self.breakers.succeed(&parsed_url);
Ok(res) Ok(res)
} }
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))] #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, Error> pub(crate) async fn fetch_json<T>(&self, url: &IriString) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
self.do_fetch(url, "application/json").await self.do_fetch(url, "application/json").await
} }
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch_json_msky<T>(&self, url: &IriString) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let mut res = self
.do_deliver(
url,
&serde_json::json!({}),
"application/json",
"application/json",
)
.await?;
let body = res
.body()
.await
.map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?;
Ok(serde_json::from_slice(body.as_ref())?)
}
#[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))] #[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, Error> pub(crate) async fn fetch<T>(&self, url: &IriString) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
self.do_fetch(url, "application/activity+json").await self.do_fetch(url, "application/activity+json").await
} }
async fn do_fetch<T>(&self, url: &str, accept: &str) -> Result<T, Error> async fn do_fetch<T>(&self, url: &IriString, accept: &str) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
let parsed_url = url.parse::<IriString>()?; let mut res = self.do_fetch_response(url, accept).await?;
if !self.breakers.should_try(&parsed_url) {
return Err(ErrorKind::Breaker.into());
}
let signer = self.signer();
let span = tracing::Span::current();
let client: Client = self.client.borrow().clone();
let res = client
.get(url)
.insert_header(("Accept", accept))
.insert_header(Date(SystemTime::now().into()))
.signature(
self.config.clone(),
self.key_id.clone(),
move |signing_string| {
span.record("signing_string", signing_string);
span.in_scope(|| signer.sign(signing_string))
},
)
.await?
.send()
.await;
let mut res = self.check_response(&parsed_url, res).await?;
let body = res let body = res
.body() .body()
@ -295,18 +293,26 @@ impl Requests {
} }
#[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))] #[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))]
pub(crate) async fn fetch_response(&self, url: IriString) -> Result<ClientResponse, Error> { pub(crate) async fn fetch_response(&self, url: &IriString) -> Result<ClientResponse, Error> {
if !self.breakers.should_try(&url) { self.do_fetch_response(url, "*/*").await
}
pub(crate) async fn do_fetch_response(
&self,
url: &IriString,
accept: &str,
) -> Result<ClientResponse, Error> {
if !self.breakers.should_try(url) {
return Err(ErrorKind::Breaker.into()); return Err(ErrorKind::Breaker.into());
} }
let signer = self.signer(); let signer = self.signer();
let span = tracing::Span::current(); let span = tracing::Span::current();
let client: Client = self.client.borrow().clone(); let res = self
let res = client .client
.get(url.as_str()) .get(url.as_str())
.insert_header(("Accept", "*/*")) .insert_header(("Accept", accept))
.insert_header(Date(SystemTime::now().into())) .insert_header(Date(SystemTime::now().into()))
.no_decompress() .no_decompress()
.signature( .signature(
@ -321,7 +327,7 @@ impl Requests {
.send() .send()
.await; .await;
let res = self.check_response(&url, res).await?; let res = self.check_response(url, res).await?;
Ok(res) Ok(res)
} }
@ -331,7 +337,27 @@ impl Requests {
skip_all, skip_all,
fields(inbox = inbox.to_string().as_str(), signing_string) fields(inbox = inbox.to_string().as_str(), signing_string)
)] )]
pub(crate) async fn deliver<T>(&self, inbox: IriString, item: &T) -> Result<(), Error> pub(crate) async fn deliver<T>(&self, inbox: &IriString, item: &T) -> Result<(), Error>
where
T: serde::ser::Serialize + std::fmt::Debug,
{
self.do_deliver(
inbox,
item,
"application/activity+json",
"application/activity+json",
)
.await?;
Ok(())
}
async fn do_deliver<T>(
&self,
inbox: &IriString,
item: &T,
content_type: &str,
accept: &str,
) -> Result<ClientResponse, Error>
where where
T: serde::ser::Serialize + std::fmt::Debug, T: serde::ser::Serialize + std::fmt::Debug,
{ {
@ -343,11 +369,11 @@ impl Requests {
let span = tracing::Span::current(); let span = tracing::Span::current();
let item_string = serde_json::to_string(item)?; let item_string = serde_json::to_string(item)?;
let client: Client = self.client.borrow().clone(); let (req, body) = self
let (req, body) = client .client
.post(inbox.as_str()) .post(inbox.as_str())
.insert_header(("Accept", "application/activity+json")) .insert_header(("Accept", accept))
.insert_header(("Content-Type", "application/activity+json")) .insert_header(("Content-Type", content_type))
.insert_header(Date(SystemTime::now().into())) .insert_header(Date(SystemTime::now().into()))
.signature_with_digest( .signature_with_digest(
self.config.clone(), self.config.clone(),
@ -364,9 +390,9 @@ impl Requests {
let res = req.send_body(body).await; let res = req.send_body(body).await;
self.check_response(&inbox, res).await?; let res = self.check_response(inbox, res).await?;
Ok(()) Ok(res)
} }
fn signer(&self) -> Signer { fn signer(&self) -> Signer {
@ -382,8 +408,9 @@ struct Signer {
impl Signer { impl Signer {
fn sign(&self, signing_string: &str) -> Result<String, Error> { fn sign(&self, signing_string: &str) -> Result<String, Error> {
let signing_key = SigningKey::<Sha256>::new_with_prefix(self.private_key.clone()); let signing_key = SigningKey::<Sha256>::new(self.private_key.clone());
let signature = signing_key.try_sign_with_rng(thread_rng(), signing_string.as_bytes())?; let signature =
Ok(base64::encode(signature.as_ref())) signing_key.try_sign_with_rng(&mut thread_rng(), signing_string.as_bytes())?;
Ok(STANDARD.encode(signature.to_bytes().as_ref()))
} }
} }

View File

@ -1,4 +1,5 @@
mod actor; mod actor;
mod healthz;
mod inbox; mod inbox;
mod index; mod index;
mod media; mod media;
@ -7,6 +8,7 @@ mod statics;
pub(crate) use self::{ pub(crate) use self::{
actor::route as actor, actor::route as actor,
healthz::route as healthz,
inbox::route as inbox, inbox::route as inbox,
index::route as index, index::route as index,
media::route as media, media::route as media,

7
src/routes/healthz.rs Normal file
View File

@ -0,0 +1,7 @@
use crate::{data::State, error::Error};
use actix_web::{web, HttpResponse};
pub(crate) async fn route(state: web::Data<State>) -> Result<HttpResponse, Error> {
state.db.check_health().await?;
Ok(HttpResponse::Ok().finish())
}

View File

@ -16,7 +16,8 @@ use activitystreams::{
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpResponse};
use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified}; use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified};
#[tracing::instrument(name = "Inbox", skip_all)] #[tracing::instrument(name = "Inbox", skip_all, fields(id = tracing::field::debug(&input.id_unchecked()), kind = tracing::field::debug(&input.kind())))]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn route( pub(crate) async fn route(
state: web::Data<State>, state: web::Data<State>,
actors: web::Data<ActorCache>, actors: web::Data<ActorCache>,
@ -24,22 +25,48 @@ pub(crate) async fn route(
client: web::Data<Requests>, client: web::Data<Requests>,
jobs: web::Data<JobServer>, jobs: web::Data<JobServer>,
input: web::Json<AcceptedActivities>, input: web::Json<AcceptedActivities>,
verified: Option<(SignatureVerified, DigestVerified)>, digest_verified: Option<DigestVerified>,
signature_verified: Option<SignatureVerified>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let input = input.into_inner(); let input = input.into_inner();
let actor = actors let kind = input.kind().ok_or(ErrorKind::MissingKind)?;
.get(
input.actor()?.as_single_id().ok_or(ErrorKind::MissingId)?,
&client,
)
.await?
.into_inner();
let is_allowed = state.db.is_allowed(actor.id.clone()); if digest_verified.is_some() && signature_verified.is_none() && *kind == ValidTypes::Delete {
let is_connected = state.db.is_connected(actor.id.clone()); return Ok(accepted(serde_json::json!({})));
} else if config.validate_signatures()
&& (digest_verified.is_none() || signature_verified.is_none())
{
return Err(ErrorKind::NoSignature(None).into());
}
let (is_allowed, is_connected) = tokio::try_join!(is_allowed, is_connected)?; let actor_id = if input.id_unchecked().is_some() {
input.actor()?.as_single_id().ok_or(ErrorKind::MissingId)?
} else {
input
.actor_unchecked()
.as_single_id()
.ok_or(ErrorKind::MissingId)?
};
let actor = actors.get(actor_id, &client).await?.into_inner();
if let Some(verified) = signature_verified {
if actor.public_key_id.as_str() != verified.key_id() {
tracing::error!("Actor signed with wrong key");
return Err(ErrorKind::BadActor(
actor.public_key_id.to_string(),
verified.key_id().to_owned(),
)
.into());
}
} else if config.validate_signatures() {
tracing::error!("This case should never be reachable, since I handle signature checks earlier in the flow. If you see this in a log it means I did it wrong");
return Err(ErrorKind::NoSignature(Some(actor.public_key_id.to_string())).into());
}
let is_allowed = state.db.is_allowed(actor.id.clone()).await?;
let is_connected = state.db.is_connected(actor.id.clone()).await?;
if !is_allowed { if !is_allowed {
return Err(ErrorKind::NotAllowed(actor.id.to_string()).into()); return Err(ErrorKind::NotAllowed(actor.id.to_string()).into());
@ -49,29 +76,16 @@ pub(crate) async fn route(
return Err(ErrorKind::NotSubscribed(actor.id.to_string()).into()); return Err(ErrorKind::NotSubscribed(actor.id.to_string()).into());
} }
if config.validate_signatures() && verified.is_none() { match kind {
return Err(ErrorKind::NoSignature(actor.public_key_id.to_string()).into());
} else if config.validate_signatures() {
if let Some((verified, _)) = verified {
if actor.public_key_id.as_str() != verified.key_id() {
tracing::error!("Bad actor, more info: {:?}", input);
return Err(ErrorKind::BadActor(
actor.public_key_id.to_string(),
verified.key_id().to_owned(),
)
.into());
}
}
}
match input.kind().ok_or(ErrorKind::MissingKind)? {
ValidTypes::Accept => handle_accept(&config, input).await?, ValidTypes::Accept => handle_accept(&config, input).await?,
ValidTypes::Reject => handle_reject(&config, &jobs, input, actor).await?, ValidTypes::Reject => handle_reject(&config, &jobs, input, actor).await?,
ValidTypes::Announce | ValidTypes::Create => { ValidTypes::Announce | ValidTypes::Create => {
handle_announce(&state, &jobs, input, actor).await? handle_announce(&state, &jobs, input, actor).await?
} }
ValidTypes::Follow => handle_follow(&config, &jobs, input, actor).await?, ValidTypes::Follow => handle_follow(&config, &jobs, input, actor).await?,
ValidTypes::Delete | ValidTypes::Update => handle_forward(&jobs, input, actor).await?, ValidTypes::Add | ValidTypes::Delete | ValidTypes::Remove | ValidTypes::Update => {
handle_forward(&jobs, input, actor).await?
}
ValidTypes::Undo => handle_undo(&config, &jobs, input, actor, is_connected).await?, ValidTypes::Undo => handle_undo(&config, &jobs, input, actor, is_connected).await?,
}; };
@ -203,7 +217,7 @@ async fn handle_announce(
.as_single_id() .as_single_id()
.ok_or(ErrorKind::MissingId)?; .ok_or(ErrorKind::MissingId)?;
if state.is_cached(object_id).await { if state.is_cached(object_id) {
return Err(ErrorKind::Duplicate.into()); return Err(ErrorKind::Duplicate.into());
} }

View File

@ -14,8 +14,11 @@ const MINIFY_CONFIG: minify_html::Cfg = minify_html::Cfg {
keep_html_and_head_opening_tags: false, keep_html_and_head_opening_tags: false,
keep_spaces_between_attributes: true, keep_spaces_between_attributes: true,
keep_comments: false, keep_comments: false,
minify_js: true,
minify_css: true, minify_css: true,
minify_css_level_1: true,
minify_css_level_2: false,
minify_css_level_3: false,
minify_js: true,
remove_bangs: true, remove_bangs: true,
remove_processing_instructions: true, remove_processing_instructions: true,
}; };
@ -71,7 +74,7 @@ pub(crate) async fn route(
let mut buf = BufWriter::new(Vec::new()); let mut buf = BufWriter::new(Vec::new());
crate::templates::index(&mut buf, &local, &nodes, &config)?; crate::templates::index_html(&mut buf, &local, &nodes, &config)?;
let html = buf.into_inner().map_err(|e| { let html = buf.into_inner().map_err(|e| {
tracing::error!("Error rendering template, {}", e.error()); tracing::error!("Error rendering template, {}", e.error());
ErrorKind::FlushBuffer ErrorKind::FlushBuffer

View File

@ -11,7 +11,7 @@ pub(crate) async fn route(
let uuid = uuid.into_inner(); let uuid = uuid.into_inner();
if let Some(url) = media.get_url(uuid).await? { if let Some(url) = media.get_url(uuid).await? {
let res = requests.fetch_response(url).await?; let res = requests.fetch_response(&url).await?;
let mut response = HttpResponse::build(res.status()); let mut response = HttpResponse::build(res.status());

View File

@ -24,18 +24,18 @@ struct Links {
links: Vec<Link>, links: Vec<Link>,
} }
#[tracing::instrument(name = "NodeInfo")] #[tracing::instrument(name = "NodeInfo", skip_all)]
pub(crate) async fn route( pub(crate) async fn route(
config: web::Data<Config>, config: web::Data<Config>,
state: web::Data<State>, state: web::Data<State>,
) -> web::Json<NodeInfo> { ) -> web::Json<NodeInfo> {
let (inboxes, blocks) = tokio::join!(state.db.inboxes(), async { let inboxes = state.db.inboxes().await;
if config.publish_blocks() {
Some(state.db.blocks().await.unwrap_or_default()) let blocks = if config.publish_blocks() {
} else { Some(state.db.blocks().await.unwrap_or_default())
None } else {
} None
}); };
let peers = inboxes let peers = inboxes
.unwrap_or_default() .unwrap_or_default()
@ -44,6 +44,8 @@ pub(crate) async fn route(
.map(|s| s.to_owned()) .map(|s| s.to_owned())
.collect(); .collect();
let open_registrations = !config.restricted_mode();
web::Json(NodeInfo { web::Json(NodeInfo {
version: NodeInfoVersion, version: NodeInfoVersion,
software: Software { software: Software {
@ -55,7 +57,7 @@ pub(crate) async fn route(
inbound: vec![], inbound: vec![],
outbound: vec![], outbound: vec![],
}, },
open_registrations: false, open_registrations,
usage: Usage { usage: Usage {
users: Users { users: Users {
total: 1, total: 1,

View File

@ -5,7 +5,7 @@ use actix_web::{
}; };
#[allow(clippy::async_yields_async)] #[allow(clippy::async_yields_async)]
#[tracing::instrument(name = "Statistics")] #[tracing::instrument(name = "Statics")]
pub(crate) async fn route(filename: web::Path<String>) -> HttpResponse { pub(crate) async fn route(filename: web::Path<String>) -> HttpResponse {
if let Some(data) = StaticFile::get(&filename.into_inner()) { if let Some(data) = StaticFile::get(&filename.into_inner()) {
HttpResponse::Ok() HttpResponse::Ok()

View File

@ -89,19 +89,19 @@ async fn answer(bot: Bot, msg: Message, cmd: Command, db: Db) -> ResponseResult<
.await?; .await?;
} }
Command::Block { domain } if db.add_blocks(vec![domain.clone()]).await.is_ok() => { Command::Block { domain } if db.add_blocks(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{} has been blocked", domain)) bot.send_message(msg.chat.id, format!("{domain} has been blocked"))
.await?; .await?;
} }
Command::Unblock { domain } if db.remove_blocks(vec![domain.clone()]).await.is_ok() => { Command::Unblock { domain } if db.remove_blocks(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{} has been unblocked", domain)) bot.send_message(msg.chat.id, format!("{domain} has been unblocked"))
.await?; .await?;
} }
Command::Allow { domain } if db.add_allows(vec![domain.clone()]).await.is_ok() => { Command::Allow { domain } if db.add_allows(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{} has been allowed", domain)) bot.send_message(msg.chat.id, format!("{domain} has been allowed"))
.await?; .await?;
} }
Command::Disallow { domain } if db.remove_allows(vec![domain.clone()]).await.is_ok() => { Command::Disallow { domain } if db.remove_allows(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{} has been disallowed", domain)) bot.send_message(msg.chat.id, format!("{domain} has been disallowed"))
.await?; .await?;
} }
Command::ListAllowed => { Command::ListAllowed => {

View File

@ -0,0 +1,15 @@
[Unit]
Description=Activitypub Relay
Documentation=https://git.asonix.dog/asonix/relay
Wants=network.target
After=network.target
[Install]
WantedBy=multi-user.target
[Service]
Type=simple
EnvironmentFile=/etc/systemd/system/example-relay.service.env
ExecStart=/path/to/relay
Restart=always

View File

@ -0,0 +1,19 @@
HOSTNAME='relay.example.com'
ADDR='0.0.0.0'
PORT='8080'
RESTRICTED_MODE='true'
VALIDATE_SIGNATURES='true'
HTTPS='true'
PRETTY_LOG='false'
PUBLISH_BLOCKS='true'
DEBUG='false'
SLED_PATH='/opt/sled'
TELEGRAM_ADMIN_HANDLE='myhandle'
RUST_BACKTRACE='full'
FOOTER_BLURB='Contact <a href="https://masto.example.com/@example">@example</a> for inquiries.'
LOCAL_DOMAINS='masto.example.com'
LOCAL_BLURB='<p>An ActivityPub relay for servers. Currently running somewhere. Let me know if you want to join!</p>'
OPENTELEMETRY_URL='http://otel.example.com:4317'
API_TOKEN='blahblahblahblahblahblahblah'
TELEGRAM_TOKEN='blahblahblahblahblahblahblah'

View File

@ -0,0 +1,11 @@
[Unit]
Description=Activitypub Relay Socket
Before=multi-user.target
After=network.target
[Socket]
Service=example-relay.service
ListenStream=8080
[Install]
WantedBy=sockets.target

View File

@ -6,7 +6,7 @@
<div class="admin"> <div class="admin">
<div class="left"> <div class="left">
<figure class="avatar"> <figure class="avatar">
<img src="@contact.avatar" alt="@contact.display_name's avatar"> <img loading="lazy" src="@contact.avatar" alt="@contact.display_name's avatar">
</figure> </figure>
</div> </div>
<div class="right"> <div class="right">

View File

@ -1,7 +1,7 @@
@use crate::{ @use crate::{
config::{Config, UrlKind}, config::{Config, UrlKind},
data::Node, data::Node,
templates::{info, instance, statics::index_css}, templates::{info_html, instance_html, statics::index_css},
}; };
@(local: &[Node], nodes: &[Node], config: &Config) @(local: &[Node], nodes: &[Node], config: &Config)
@ -39,13 +39,13 @@ templates::{info, instance, statics::index_css},
@for node in local { @for node in local {
@if let Some(inst) = node.instance.as_ref() { @if let Some(inst) = node.instance.as_ref() {
<li> <li>
@:instance(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(), @:instance_html(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(),
&node.base) &node.base)
</li> </li>
} else { } else {
@if let Some(inf) = node.info.as_ref() { @if let Some(inf) = node.info.as_ref() {
<li> <li>
@:info(inf, &node.base) @:info_html(inf, &node.base)
</li> </li>
} }
} }
@ -94,13 +94,13 @@ templates::{info, instance, statics::index_css},
@for node in nodes { @for node in nodes {
@if let Some(inst) = node.instance.as_ref() { @if let Some(inst) = node.instance.as_ref() {
<li> <li>
@:instance(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(), @:instance_html(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(),
&node.base) &node.base)
</li> </li>
} else { } else {
@if let Some(inf) = node.info.as_ref() { @if let Some(inf) = node.info.as_ref() {
<li> <li>
@:info(inf, &node.base) @:info_html(inf, &node.base)
</li> </li>
} }
} }

View File

@ -1,4 +1,4 @@
@use crate::{db::{Contact, Instance}, templates::admin}; @use crate::{db::{Contact, Instance}, templates::admin_html};
@use activitystreams::iri_string::types::IriString; @use activitystreams::iri_string::types::IriString;
@(instance: &Instance, software: Option<&str>, contact: Option<&Contact>, base: &IriString) @(instance: &Instance, software: Option<&str>, contact: Option<&Contact>, base: &IriString)
@ -33,7 +33,7 @@
</div> </div>
@if let Some(contact) = contact { @if let Some(contact) = contact {
<h5 class="instance-admin">Administré par:</h5> <h5 class="instance-admin">Administré par:</h5>
@:admin(contact, base) @:admin_html(contact, base)
} }
} }
</section> </section>