Compare commits

...

72 Commits

Author SHA1 Message Date
c5112cb9bb Merge tag 'v0.3.82' into max 2023-03-05 17:16:33 +01:00
asonix
d644e83733 Bump version 2023-02-25 15:14:24 -06:00
asonix
ae91aa8fa7 Update bcrypt 2023-02-25 15:06:18 -06:00
asonix
73c016d418 Update deps 2023-02-25 15:04:30 -06:00
asonix
a1ea5d676c Rework misskey fetch to reuse deliver plumbing
Only count server errors towards failed breakers
2023-02-25 15:02:16 -06:00
perillamint
667d586160 Send dummy JSON when trying Misskey API endpoint
From Misskey 13, Misskey expects valid JSON (does not care its content
though) in POST body. To workaround this, send empty JSON object when
requesting Misskey API endpoint
2023-02-25 14:34:38 -06:00
perillamint
4a7775b56d Misskey metadata support
This commit implements misskey metadata support and corresponding test
for From<MskyMeta> implementation

Also, it is good to note that, Misskey does not return 404 but 200 OK
when they poked at nonexistant endpoint, so the implementation should
handle for invalid json case
2023-02-25 14:34:22 -06:00
asonix
9b809913ad Add note about JSON-LD problems 2023-02-11 18:16:06 -06:00
asonix
a952b528df Use transpose in a couple places 2023-02-05 21:09:47 -06:00
asonix
b5138fc16d Bump version 2023-01-29 13:23:11 -06:00
asonix
0e9b88a7ae Bump deps 2023-01-29 13:21:53 -06:00
asonix
f9cad61049 Add healthcheck for db, new clippy lints 2023-01-29 13:21:36 -06:00
Tealk
96547230bc
update Pleroma text
Signed-off-by: Tealk <tealk@rollenspiel.monster>
2023-01-28 23:45:31 +01:00
asonix
c11ff17192 Bump version 2023-01-23 08:58:07 -06:00
asonix
e93dd2da56 Update teloxide 2023-01-23 08:57:16 -06:00
asonix
34dc1a2281 Update rsa 2023-01-23 08:56:18 -06:00
asonix
9cdebeae4c Update base64, ructe 2023-01-23 08:38:55 -06:00
asonix
662620be46 Only show open_registrations: false when restricted mode is enabled 2023-01-23 08:29:32 -06:00
b94f792a19 Merge tag 'v0.3.79' into max 2023-01-19 21:14:20 +01:00
asonix
5488acb59d Fix docker volume mount in readme 2023-01-03 15:17:56 -06:00
asonix
4998cd3a56 Bump version 2023-01-02 12:43:51 -06:00
asonix
f0a8862922 Don't prometheus exporter for relay client 2023-01-02 12:43:32 -06:00
asonix
b6a10c4e65 Apply patch from perillamint on github
Document REPOSITORY_COMMIT_BASE envvar
2023-01-01 10:29:28 -06:00
asonix
3a14242a91 Apply patch from perillamint on github
Accept REPOSITORY_COMMIT_BASE envvar to build repository url
2023-01-01 10:28:52 -06:00
asonix
f5fed2fce1 Apply patch from perillamint on github
One missing bit Debug implementation for source_url
2023-01-01 10:19:11 -06:00
asonix
5faeaf6371 Revert "Apply patch from perillamint on github"
This reverts commit f291b24269.
2023-01-01 10:01:39 -06:00
asonix
f291b24269 Apply patch from perillamint on github
Show repository URL with commit reference
2023-01-01 09:47:21 -06:00
asonix
5f5c34640f Apply patch from perillamint on github
use git hash to describe version number
2023-01-01 09:46:44 -06:00
asonix
d4e51a1afa Add scrape endpoint to .env 2022-12-26 11:20:02 -06:00
asonix
fafba69258 Add optional prometheus scrape endpoint 2022-12-26 10:57:16 -06:00
asonix
07b961c28f Bump deps 2022-12-21 16:59:19 -06:00
asonix
30dd16a889 Bump version 2022-12-21 16:58:17 -06:00
asonix
88b0383084 Keep track of when servers were last seen 2022-12-21 16:51:17 -06:00
asonix
b49eeaf822 Bump version 2022-12-19 22:25:27 -06:00
asonix
943f679a69 Allow activities without IDs, fetch actor unchecked 2022-12-19 22:24:58 -06:00
asonix
37b2afe344 Bump version 2022-12-19 21:46:27 -06:00
asonix
4e5fabce5f Also debug Kind in inbox 2022-12-19 21:45:52 -06:00
asonix
689d85befb Bump version 2022-12-19 21:07:25 -06:00
asonix
40eb12258d Record id in inbox route 2022-12-19 21:05:53 -06:00
asonix
efcec29d7b Remove unused docker-related files 2022-12-19 16:32:16 -06:00
asonix
62a886d0bf Bump version 2022-12-19 16:31:51 -06:00
asonix
163e480076 Update deps 2022-12-19 16:30:48 -06:00
asonix
675fddcfeb Support Remove activity, forward verbatim 2022-12-19 16:08:39 -06:00
asonix
359ec68aa0 Add example systemd configuration 2022-12-19 15:52:47 -06:00
asonix
565a94d756 clippy 2022-12-19 12:23:06 -06:00
asonix
815c18b899 Update version number in various places 2022-12-19 12:17:08 -06:00
asonix
fbcbf141dd Bump version 2022-12-19 11:46:49 -06:00
asonix
cf7a25f935 Consider NoSignature a BadRequest 2022-12-19 11:44:50 -06:00
asonix
b56bddccb4 Allow Signature to be missing if kind is Delete, return early without additional processing 2022-12-19 11:39:30 -06:00
asonix
886c7d0ac6 Apply patch from perallamint on github
Temporary fix: allow signing bypass for 410 gone actors
DIRTY FIX: implement sigcheck_bypass for 410'ing actors
2022-12-19 09:44:04 -06:00
asonix
178d23bcbd Bump deps 2022-12-14 20:17:14 -06:00
asonix
549eb47202 Bump version 2022-12-13 23:41:06 -06:00
asonix
5968cb8953 bump deps 2022-12-13 23:40:53 -06:00
asonix
c5e254dad6 Update deps 2022-12-13 23:37:09 -06:00
asonix
430ebec810 Improve tracing, immediately stringify spantrace, remove join macros 2022-12-13 23:36:40 -06:00
asonix
c15f591bc8 Add punctuation to readme 2022-12-13 10:56:25 -06:00
asonix
5d69eaf2ab Add note about Add activity to README 2022-12-13 10:46:58 -06:00
asonix
43b70f88a7 Apply patch from perallamint on github
clippy: unnecessary lifetime annotation on static strings

Since string literal constant already has static lifetime, it is not
necessary to explicitly annotate it with 'static.
2022-12-13 10:39:25 -06:00
asonix
a0dc2363f6 Add support for Add activity - forward verbatim 2022-12-13 10:35:16 -06:00
asonix
9d68ccd834 Update deps 2022-12-12 11:06:23 -06:00
asonix
a8b8325557 Update deps 2022-12-12 10:56:53 -06:00
asonix
6082def854 Bump version 2022-12-09 18:04:15 -06:00
asonix
31021e80e4 Bump deps 2022-12-09 18:03:36 -06:00
asonix
f4db90b699 Use sync RwLock for lru access 2022-12-09 17:47:45 -06:00
asonix
d834537300 Bump http-signature-normalization-actix 2022-12-08 21:15:43 -06:00
asonix
c18760d57f Bump version 2022-12-08 15:14:12 -06:00
asonix
8575439d88 Bump deps 2022-12-08 15:14:04 -06:00
asonix
c543e8b4eb Bump version 2022-12-06 18:53:36 -06:00
asonix
a0fbf9d236 Bump activitystreams again 2022-12-06 18:53:19 -06:00
asonix
b9dba28207 Bump activitystreams 2022-12-06 18:21:55 -06:00
asonix
b5dc3e7c08 Wrap whole main in actix_rt, fixes opentelemetry 2022-12-06 17:55:02 -06:00
asonix
4d8e1a7241 Enable lazy loading for images 2022-11-23 13:32:27 -06:00
52 changed files with 1968 additions and 1408 deletions

2
.env
View File

@ -9,3 +9,5 @@ FOOTER_BLURB="Opéré par <a href=\"https://mastodon.xolus.net/@max\">@max</a>"
LOCAL_DOMAINS="xolus.net"
LOCAL_BLURB="<p>Relais ActivityPub francophone</p>"
# OPENTELEMETRY_URL=http://localhost:4317
PROMETHEUS_ADDR=127.0.0.1
PROMETHEUS_PORT=9000

1000
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,7 @@
[package]
name = "ap-relay"
description = "A simple activitypub relay"
version = "0.3.66"
version = "0.3.82"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"
@ -29,20 +29,23 @@ actix-web = { version = "4.0.1", default-features = false, features = [
"compress-gzip",
] }
actix-webfinger = "0.4.0"
activitystreams = "0.7.0-alpha.19"
activitystreams-ext = "0.1.0-alpha.2"
activitystreams = "0.7.0-alpha.21"
activitystreams-ext = "0.1.0-alpha.3"
ammonia = "3.1.0"
awc = { version = "3.0.0", default-features = false, features = ["rustls"] }
bcrypt = "0.13"
base64 = "0.13"
bcrypt = "0.14"
base64 = "0.21"
clap = { version = "4.0.0", features = ["derive"] }
config = "0.13.0"
console-subscriber = { version = "0.1", optional = true }
dashmap = "5.1.0"
dotenv = "0.15.0"
futures-util = "0.3.17"
lru = "0.8.0"
lru = "0.9.0"
metrics = "0.20.1"
metrics-exporter-prometheus = { version = "0.11.0", default-features = false, features = [
"http-listener",
] }
metrics-util = "0.14.0"
mime = "0.3.16"
minify-html = "0.10.0"
@ -51,21 +54,20 @@ opentelemetry-otlp = "0.11"
pin-project-lite = "0.2.9"
quanta = "0.10.1"
rand = "0.8"
rsa = "0.7"
rsa-magic-public-key = "0.6.0"
rsa = { version = "0.8", features = ["sha2"] }
rsa-magic-public-key = "0.7.0"
rustls = "0.20.7"
rustls-pemfile = "1.0.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = { version = "0.10", features = ["oid"] }
signature = "1.6.4"
sled = "0.34.7"
teloxide = { version = "0.11.1", default-features = false, features = [
teloxide = { version = "0.12.0", default-features = false, features = [
"ctrlc_handler",
"macros",
"rustls",
] }
thiserror = "1.0"
time = { version = "0.3.17", features = ["serde"] }
tracing = "0.1"
tracing-awc = "0.1.6"
tracing-error = "0.2"
@ -86,18 +88,18 @@ default-features = false
features = ["background-jobs-actix", "error-logging"]
[dependencies.http-signature-normalization-actix]
version = "0.6.0"
version = "0.8.0"
default-features = false
features = ["client", "server", "sha-2"]
[dependencies.tracing-actix-web]
version = "0.6.1"
version = "0.7.0"
[build-dependencies]
anyhow = "1.0"
dotenv = "0.15.0"
ructe = { version = "0.15.0", features = ["sass", "mime03"] }
toml = "0.5.8"
ructe = { version = "0.16.0", features = ["sass", "mime03"] }
toml = "0.7.0"
[profile.dev.package.rsa]
opt-level = 3

View File

@ -6,11 +6,11 @@ _A simple and efficient activitypub relay_
If running docker, you can start the relay with the following command:
```
$ sudo docker run --rm -it \
-v "./:/mnt/" \
-v "$(pwd):/mnt/" \
-e ADDR=0.0.0.0 \
-e SLED_PATH=/mnt/sled/db-0.34 \
-p 8080:8080 \
asonix/relay:0.3.52
asonix/relay:0.3.78
```
This will launch the relay with the database stored in "./sled/db-0.34" and listening on port 8080
#### Cargo
@ -103,6 +103,8 @@ TLS_CERT=/path/to/cert
FOOTER_BLURB="Contact <a href=\"https://masto.asonix.dog/@asonix\">@asonix</a> for inquiries"
LOCAL_DOMAINS=masto.asonix.dog
LOCAL_BLURB="<p>Welcome to my cool relay where I have cool relay things happening. I hope you enjoy your stay!</p>"
PROMETHEUS_ADDR=0.0.0.0
PROMETHEUS_PORT=9000
```
#### Descriptions
@ -128,6 +130,8 @@ Where to store the on-disk database of connected servers. This defaults to `./sl
The log level to print. Available levels are `ERROR`, `WARN`, `INFO`, `DEBUG`, and `TRACE`. You can also specify module paths to enable some logs but not others, such as `RUST_LOG=warn,tracing_actix_web=info,relay=info`. This defaults to `warn`
##### `SOURCE_REPO`
The URL to the source code for the relay. This defaults to `https://git.asonix.dog/asonix/relay`, but should be changed if you're running a fork hosted elsewhere.
##### `REPOSITORY_COMMIT_BASE`
The base path of the repository commit hash reference. For example, `/src/commit/` for Gitea, `/tree/` for GitLab.
##### `API_TOKEN`
The Secret token used to access the admin APIs. This must be set for the commandline to function
##### `OPENTELEMETRY_URL`
@ -146,6 +150,10 @@ Optional - Add custom notes in the footer of the page
Optional - domains of mastodon servers run by the same admin as the relay
##### `LOCAL_BLURB`
Optional - description for the relay
##### `PROMETHEUS_ADDR`
Optional - Address to bind to for serving the prometheus scrape endpoint
##### `PROMETHEUS_PORT`
Optional - Port to bind to for serving the prometheus scrape endpoint
### Subscribing
Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings.
@ -165,10 +173,16 @@ example, if the server is `https://relay.my.tld`, the correct URL would be
- Follow Public, become a listener of the relay
- Undo Follow {self-actor}, stop listening on the relay, an Undo Follow will be sent back
- Undo Follow Public, stop listening on the relay
- Delete {anything}, the Delete {anything} is relayed verbatim to listening servers
- Delete {anything}, the Delete {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature
- Update {anything}, the Update {anything} is relayed verbatim to listening servers
- Update {anything}, the Update {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature
- Add {anything}, the Add {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature
- Remove {anything}, the Remove {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature
@ -176,6 +190,9 @@ example, if the server is `https://relay.my.tld`, the correct URL would be
- Webfinger
- NodeInfo
### Known issues
Pleroma and Akkoma do not support validating JSON-LD signatures, meaning many activities such as Delete, Update, Add, and Remove will be rejected with a message similar to `WARN: Response from https://example.com/inbox, "Invalid HTTP Signature"`. This is normal and not an issue with the relay.
### Contributing
Feel free to open issues for anything you find an issue with. Please note that any contributed code will be licensed under the AGPLv3.

View File

@ -1,41 +0,0 @@
ARG REPO_ARCH=amd64
# cross-build environment
FROM asonix/rust-builder:$REPO_ARCH-latest AS builder
ARG TAG=main
ARG BINARY=relay
ARG PROJECT=relay
ARG GIT_REPOSITORY=https://git.asonix.dog/asonix/$PROJECT
ENV \
BINARY=${BINARY}
ADD \
--chown=build:build \
$GIT_REPOSITORY/archive/$TAG.tar.gz \
/opt/build/repo.tar.gz
RUN \
tar zxf repo.tar.gz
WORKDIR /opt/build/$PROJECT
RUN \
build
# production environment
FROM asonix/rust-runner:$REPO_ARCH-latest
ARG BINARY=relay
ENV \
BINARY=${BINARY}
COPY \
--from=builder \
/opt/build/binary \
/usr/bin/${BINARY}
ENTRYPOINT ["/sbin/tini", "--"]
CMD /usr/bin/${BINARY}

View File

@ -1,37 +0,0 @@
#!/usr/bin/env bash
function require() {
if [ "$1" = "" ]; then
echo "input '$2' required"
print_help
exit 1
fi
}
function print_help() {
echo "deploy.sh"
echo ""
echo "Usage:"
echo " deploy.sh [repo] [tag] [arch]"
echo ""
echo "Args:"
echo " repo: The docker repository to publish the image"
echo " tag: The tag applied to the docker image"
echo " arch: The architecuture of the doker image"
}
REPO=$1
TAG=$2
ARCH=$3
require "$REPO" repo
require "$TAG" tag
require "$ARCH" arch
sudo docker build \
--pull \
--build-arg TAG=$TAG \
--build-arg REPO_ARCH=$ARCH \
-t $REPO:$ARCH-$TAG \
-f Dockerfile \
.

View File

@ -1,87 +0,0 @@
#!/usr/bin/env bash
function require() {
if [ "$1" = "" ]; then
echo "input '$2' required"
print_help
exit 1
fi
}
function print_help() {
echo "deploy.sh"
echo ""
echo "Usage:"
echo " deploy.sh [tag] [branch] [push]"
echo ""
echo "Args:"
echo " tag: The git tag to be applied to the repository and docker build"
echo " branch: The git branch to use for tagging and publishing"
echo " push: Whether or not to push the image"
echo ""
echo "Examples:"
echo " ./deploy.sh v0.3.0-alpha.13 main true"
echo " ./deploy.sh v0.3.0-alpha.13-shell-out asonix/shell-out false"
}
function build_image() {
tag=$1
arch=$2
push=$3
./build-image.sh asonix/relay $tag $arch
sudo docker tag asonix/relay:$arch-$tag asonix/relay:$arch-latest
if [ "$push" == "true" ]; then
sudo docker push asonix/relay:$arch-$tag
sudo docker push asonix/relay:$arch-latest
fi
}
# Creating the new tag
new_tag="$1"
branch="$2"
push=$3
require "$new_tag" "tag"
require "$branch" "branch"
require "$push" "push"
if ! sudo docker run --rm -it arm64v8/alpine:3.11 /bin/sh -c 'echo "docker is configured correctly"'
then
echo "docker is not configured to run on qemu-emulated architectures, fixing will require sudo"
sudo docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
fi
set -xe
git checkout $branch
# Changing the docker-compose prod
sed -i "s/asonix\/relay:.*/asonix\/relay:$new_tag/" docker-compose.yml
git add ../prod/docker-compose.yml
# The commit
git commit -m"Version $new_tag"
git tag $new_tag
# Push
git push origin $new_tag
git push
# Build for arm64v8, arm32v7 and amd64
build_image $new_tag arm64v8 $push
build_image $new_tag arm32v7 $push
build_image $new_tag amd64 $push
# Build for other archs
# TODO
if [ "$push" == "true" ]; then
./manifest.sh relay $new_tag
./manifest.sh relay latest
# pushd ../../
# cargo publish
# popd
fi

View File

@ -2,7 +2,7 @@ version: '3.3'
services:
relay:
image: asonix/relay:v0.3.8
image: asonix/relay:v0.3.73
ports:
- "8079:8079"
restart: always
@ -14,6 +14,7 @@ services:
- RESTRICTED_MODE=false
- VALIDATE_SIGNATURES=true
- HTTPS=true
- DATABASE_URL=postgres://pg_user:pg_pass@pg_host:pg_port/pg_database
- SLED_PATH=/mnt/sled/db-0.34
- PRETTY_LOG=false
- PUBLISH_BLOCKS=true
- API_TOKEN=somepasswordishtoken

View File

@ -1,43 +0,0 @@
#!/usr/bin/env bash
function require() {
if [ "$1" = "" ]; then
echo "input '$2' required"
print_help
exit 1
fi
}
function print_help() {
echo "deploy.sh"
echo ""
echo "Usage:"
echo " manifest.sh [repo] [tag]"
echo ""
echo "Args:"
echo " repo: The docker repository to update"
echo " tag: The git tag to be applied to the image manifest"
}
REPO=$1
TAG=$2
require "$REPO" "repo"
require "$TAG" "tag"
set -xe
sudo docker manifest create asonix/$REPO:$TAG \
-a asonix/$REPO:arm64v8-$TAG \
-a asonix/$REPO:arm32v7-$TAG \
-a asonix/$REPO:amd64-$TAG
sudo docker manifest annotate asonix/$REPO:$TAG \
asonix/$REPO:arm64v8-$TAG --os linux --arch arm64 --variant v8
sudo docker manifest annotate asonix/$REPO:$TAG \
asonix/$REPO:arm32v7-$TAG --os linux --arch arm --variant v7
sudo docker manifest annotate asonix/$REPO:$TAG \
asonix/$REPO:amd64-$TAG --os linux --arch amd64
sudo docker manifest push asonix/$REPO:$TAG --purge

View File

@ -1,4 +1,6 @@
use activitystreams::iri_string::types::IriString;
use std::collections::{BTreeMap, BTreeSet};
use time::OffsetDateTime;
pub mod client;
pub mod routes;
@ -22,3 +24,9 @@ pub(crate) struct BlockedDomains {
pub(crate) struct ConnectedActors {
pub(crate) connected_actors: Vec<IriString>,
}
#[derive(serde::Deserialize, serde::Serialize)]
pub(crate) struct LastSeen {
pub(crate) last_seen: BTreeMap<OffsetDateTime, BTreeSet<String>>,
pub(crate) never: Vec<String>,
}

View File

@ -1,5 +1,5 @@
use crate::{
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains},
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains, LastSeen},
collector::Snapshot,
config::{AdminUrlKind, Config},
error::{Error, ErrorKind},
@ -55,6 +55,10 @@ pub(crate) async fn stats(client: &Client, config: &Config) -> Result<Snapshot,
get_results(client, config, AdminUrlKind::Stats).await
}
pub(crate) async fn last_seen(client: &Client, config: &Config) -> Result<LastSeen, Error> {
get_results(client, config, AdminUrlKind::LastSeen).await
}
async fn get_results<T: DeserializeOwned>(
client: &Client,
config: &Config,

View File

@ -1,5 +1,5 @@
use crate::{
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains},
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains, LastSeen},
collector::{MemoryCollector, Snapshot},
error::Error,
extractors::Admin,
@ -8,6 +8,8 @@ use actix_web::{
web::{Data, Json},
HttpResponse,
};
use std::collections::{BTreeMap, BTreeSet};
use time::OffsetDateTime;
pub(crate) async fn allow(
admin: Admin,
@ -69,3 +71,20 @@ pub(crate) async fn stats(
) -> Result<Json<Snapshot>, Error> {
Ok(Json(collector.snapshot()))
}
pub(crate) async fn last_seen(admin: Admin) -> Result<Json<LastSeen>, Error> {
let nodes = admin.db_ref().last_seen().await?;
let mut last_seen: BTreeMap<OffsetDateTime, BTreeSet<String>> = BTreeMap::new();
let mut never = Vec::new();
for (domain, datetime) in nodes {
if let Some(datetime) = datetime {
last_seen.entry(datetime).or_default().insert(domain);
} else {
never.push(domain);
}
}
Ok(Json(LastSeen { last_seen, never }))
}

View File

@ -34,11 +34,13 @@ pub struct PublicKey {
#[serde(rename_all = "PascalCase")]
pub enum ValidTypes {
Accept,
Add,
Announce,
Create,
Delete,
Follow,
Reject,
Remove,
Undo,
Update,
}

View File

@ -17,11 +17,22 @@ pub(crate) struct Args {
#[arg(short, long, help = "Get statistics from the server")]
stats: bool,
#[arg(
short,
long,
help = "List domains by when they were last succesfully contacted"
)]
contacted: bool,
}
impl Args {
pub(crate) fn any(&self) -> bool {
!self.blocks.is_empty() || !self.allowed.is_empty() || self.list || self.stats
!self.blocks.is_empty()
|| !self.allowed.is_empty()
|| self.list
|| self.stats
|| self.contacted
}
pub(crate) fn new() -> Self {
@ -47,4 +58,8 @@ impl Args {
pub(crate) fn stats(&self) -> bool {
self.stats
}
pub(crate) fn contacted(&self) -> bool {
self.contacted
}
}

View File

@ -5,7 +5,8 @@ fn git_info() {
if let Ok(output) = Command::new("git").args(["rev-parse", "HEAD"]).output() {
if output.status.success() {
let git_hash = String::from_utf8_lossy(&output.stdout);
println!("cargo:rustc-env=GIT_HASH={}", git_hash);
println!("cargo:rustc-env=GIT_HASH={git_hash}");
println!("cargo:rustc-env=GIT_SHORT_HASH={}", &git_hash[..8])
}
}
@ -15,7 +16,7 @@ fn git_info() {
{
if output.status.success() {
let git_branch = String::from_utf8_lossy(&output.stdout);
println!("cargo:rustc-env=GIT_BRANCH={}", git_branch);
println!("cargo:rustc-env=GIT_BRANCH={git_branch}");
}
}
}
@ -23,7 +24,7 @@ fn git_info() {
fn version_info() -> Result<(), anyhow::Error> {
let cargo_toml = Path::new(&std::env::var("CARGO_MANIFEST_DIR")?).join("Cargo.toml");
let mut file = File::open(&cargo_toml)?;
let mut file = File::open(cargo_toml)?;
let mut cargo_data = String::new();
file.read_to_string(&mut cargo_data)?;
@ -31,11 +32,11 @@ fn version_info() -> Result<(), anyhow::Error> {
let data: toml::Value = toml::from_str(&cargo_data)?;
if let Some(version) = data["package"]["version"].as_str() {
println!("cargo:rustc-env=PKG_VERSION={}", version);
println!("cargo:rustc-env=PKG_VERSION={version}");
}
if let Some(name) = data["package"]["name"].as_str() {
println!("cargo:rustc-env=PKG_NAME={}", name);
println!("cargo:rustc-env=PKG_NAME={name}");
}
Ok(())

View File

@ -1,414 +1,5 @@
use metrics::{Key, Recorder, SetRecorderError};
use metrics_util::{
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
MetricKindMask, Summary,
};
use quanta::Clock;
use std::{
collections::{BTreeMap, HashMap},
sync::{atomic::Ordering, Arc, RwLock},
time::Duration,
};
mod double;
mod stats;
const SECONDS: u64 = 1;
const MINUTES: u64 = 60 * SECONDS;
const HOURS: u64 = 60 * MINUTES;
const DAYS: u64 = 24 * HOURS;
type DistributionMap = BTreeMap<Vec<(String, String)>, Summary>;
#[derive(Clone)]
pub struct MemoryCollector {
inner: Arc<Inner>,
}
struct Inner {
descriptions: RwLock<HashMap<String, metrics::SharedString>>,
distributions: RwLock<HashMap<String, DistributionMap>>,
recency: Recency<Key>,
registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Counter {
labels: BTreeMap<String, String>,
value: u64,
}
impl std::fmt::Display for Counter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ");
write!(f, "{} - {}", labels, self.value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Gauge {
labels: BTreeMap<String, String>,
value: f64,
}
impl std::fmt::Display for Gauge {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ");
write!(f, "{} - {}", labels, self.value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Histogram {
labels: BTreeMap<String, String>,
value: Vec<(f64, Option<f64>)>,
}
impl std::fmt::Display for Histogram {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ");
let value = self
.value
.iter()
.map(|(k, v)| {
if let Some(v) = v {
format!("{}: {:.6}", k, v)
} else {
format!("{}: None,", k)
}
})
.collect::<Vec<_>>()
.join(", ");
write!(f, "{} - {}", labels, value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Snapshot {
counters: HashMap<String, Vec<Counter>>,
gauges: HashMap<String, Vec<Gauge>>,
histograms: HashMap<String, Vec<Histogram>>,
}
const PAIRS: [((&str, &str), &str); 2] = [
(
(
"background-jobs.worker.started",
"background-jobs.worker.finished",
),
"background-jobs.worker.running",
),
(
(
"background-jobs.job.started",
"background-jobs.job.finished",
),
"background-jobs.job.running",
),
];
#[derive(Default)]
struct MergeCounter {
start: Option<Counter>,
finish: Option<Counter>,
}
impl MergeCounter {
fn merge(self) -> Option<Counter> {
match (self.start, self.finish) {
(Some(start), Some(end)) => Some(Counter {
labels: start.labels,
value: start.value.saturating_sub(end.value),
}),
(Some(only), None) => Some(only),
(None, Some(only)) => Some(Counter {
labels: only.labels,
value: 0,
}),
(None, None) => None,
}
}
}
impl Snapshot {
pub(crate) fn present(self) {
if !self.counters.is_empty() {
println!("Counters");
let mut merging = HashMap::new();
for (key, counters) in self.counters {
if let Some(((start, _), name)) = PAIRS
.iter()
.find(|((start, finish), _)| *start == key || *finish == key)
{
let entry = merging.entry(name).or_insert_with(HashMap::new);
for counter in counters {
let mut merge_counter = entry
.entry(counter.labels.clone())
.or_insert_with(MergeCounter::default);
if key == *start {
merge_counter.start = Some(counter);
} else {
merge_counter.finish = Some(counter);
}
}
continue;
}
println!("\t{}", key);
for counter in counters {
println!("\t\t{}", counter);
}
}
for (key, counters) in merging {
println!("\t{}", key);
for (_, counter) in counters {
if let Some(counter) = counter.merge() {
println!("\t\t{}", counter);
}
}
}
}
if !self.gauges.is_empty() {
println!("Gauges");
for (key, gauges) in self.gauges {
println!("\t{}", key);
for gauge in gauges {
println!("\t\t{}", gauge);
}
}
}
if !self.histograms.is_empty() {
println!("Histograms");
for (key, histograms) in self.histograms {
println!("\t{}", key);
for histogram in histograms {
println!("\t\t{}", histogram);
}
}
}
}
}
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
let labels = key
.labels()
.into_iter()
.map(|label| (label.key().to_string(), label.value().to_string()))
.collect();
let name = key.name().to_string();
(name, labels)
}
impl Inner {
fn snapshot_counters(&self) -> HashMap<String, Vec<Counter>> {
let mut counters = HashMap::new();
for (key, counter) in self.registry.get_counter_handles() {
let gen = counter.get_generation();
if !self.recency.should_store_counter(&key, gen, &self.registry) {
continue;
}
let (name, labels) = key_to_parts(&key);
let value = counter.get_inner().load(Ordering::Acquire);
counters.entry(name).or_insert_with(Vec::new).push(Counter {
labels: labels.into_iter().collect(),
value,
});
}
counters
}
fn snapshot_gauges(&self) -> HashMap<String, Vec<Gauge>> {
let mut gauges = HashMap::new();
for (key, gauge) in self.registry.get_gauge_handles() {
let gen = gauge.get_generation();
if !self.recency.should_store_gauge(&key, gen, &self.registry) {
continue;
}
let (name, labels) = key_to_parts(&key);
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
gauges.entry(name).or_insert_with(Vec::new).push(Gauge {
labels: labels.into_iter().collect(),
value,
})
}
gauges
}
fn snapshot_histograms(&self) -> HashMap<String, Vec<Histogram>> {
for (key, histogram) in self.registry.get_histogram_handles() {
let gen = histogram.get_generation();
let (name, labels) = key_to_parts(&key);
if !self
.recency
.should_store_histogram(&key, gen, &self.registry)
{
let mut d = self.distributions.write().unwrap();
let delete_by_name = if let Some(by_name) = d.get_mut(&name) {
by_name.remove(&labels);
by_name.is_empty()
} else {
false
};
drop(d);
if delete_by_name {
self.descriptions.write().unwrap().remove(&name);
}
continue;
}
let mut d = self.distributions.write().unwrap();
let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new);
let entry = outer_entry
.entry(labels)
.or_insert_with(Summary::with_defaults);
histogram.get_inner().clear_with(|samples| {
for sample in samples {
entry.add(*sample);
}
})
}
let d = self.distributions.read().unwrap().clone();
d.into_iter()
.map(|(key, value)| {
(
key,
value
.into_iter()
.map(|(labels, summary)| Histogram {
labels: labels.into_iter().collect(),
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
.into_iter()
.map(|q| (q, summary.quantile(q)))
.collect(),
})
.collect(),
)
})
.collect()
}
fn snapshot(&self) -> Snapshot {
Snapshot {
counters: self.snapshot_counters(),
gauges: self.snapshot_gauges(),
histograms: self.snapshot_histograms(),
}
}
}
impl MemoryCollector {
pub(crate) fn new() -> Self {
MemoryCollector {
inner: Arc::new(Inner {
descriptions: Default::default(),
distributions: Default::default(),
recency: Recency::new(
Clock::new(),
MetricKindMask::ALL,
Some(Duration::from_secs(5 * DAYS)),
),
registry: Registry::new(GenerationalStorage::atomic()),
}),
}
}
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
metrics::set_boxed_recorder(Box::new(self.clone()))
}
pub(crate) fn snapshot(&self) -> Snapshot {
self.inner.snapshot()
}
fn add_description_if_missing(
&self,
key: &metrics::KeyName,
description: metrics::SharedString,
) {
let mut d = self.inner.descriptions.write().unwrap();
d.entry(key.as_str().to_owned()).or_insert(description);
}
}
impl Recorder for MemoryCollector {
fn describe_counter(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn describe_gauge(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn describe_histogram(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn register_counter(&self, key: &Key) -> metrics::Counter {
self.inner
.registry
.get_or_create_counter(key, |c| c.clone().into())
}
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
self.inner
.registry
.get_or_create_gauge(key, |c| c.clone().into())
}
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
self.inner
.registry
.get_or_create_histogram(key, |c| c.clone().into())
}
}
pub(crate) use double::DoubleRecorder;
pub(crate) use stats::{MemoryCollector, Snapshot};

133
src/collector/double.rs Normal file
View File

@ -0,0 +1,133 @@
use metrics::{CounterFn, GaugeFn, HistogramFn, Key, Recorder, SetRecorderError};
use std::sync::Arc;
#[derive(Clone)]
pub(crate) struct DoubleRecorder<R, S> {
first: R,
second: S,
}
struct DoubleCounter {
first: metrics::Counter,
second: metrics::Counter,
}
struct DoubleGauge {
first: metrics::Gauge,
second: metrics::Gauge,
}
struct DoubleHistogram {
first: metrics::Histogram,
second: metrics::Histogram,
}
impl<R, S> DoubleRecorder<R, S> {
pub(crate) fn new(first: R, second: S) -> Self {
DoubleRecorder { first, second }
}
pub(crate) fn install(self) -> Result<(), SetRecorderError>
where
R: Recorder + 'static,
S: Recorder + 'static,
{
metrics::set_boxed_recorder(Box::new(self))
}
}
impl<R, S> Recorder for DoubleRecorder<R, S>
where
R: Recorder,
S: Recorder,
{
fn describe_counter(
&self,
key: metrics::KeyName,
unit: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.first
.describe_counter(key.clone(), unit, description.clone());
self.second.describe_counter(key, unit, description);
}
fn describe_gauge(
&self,
key: metrics::KeyName,
unit: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.first
.describe_gauge(key.clone(), unit, description.clone());
self.second.describe_gauge(key, unit, description);
}
fn describe_histogram(
&self,
key: metrics::KeyName,
unit: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.first
.describe_histogram(key.clone(), unit, description.clone());
self.second.describe_histogram(key, unit, description);
}
fn register_counter(&self, key: &Key) -> metrics::Counter {
let first = self.first.register_counter(key);
let second = self.second.register_counter(key);
metrics::Counter::from_arc(Arc::new(DoubleCounter { first, second }))
}
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
let first = self.first.register_gauge(key);
let second = self.second.register_gauge(key);
metrics::Gauge::from_arc(Arc::new(DoubleGauge { first, second }))
}
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
let first = self.first.register_histogram(key);
let second = self.second.register_histogram(key);
metrics::Histogram::from_arc(Arc::new(DoubleHistogram { first, second }))
}
}
impl CounterFn for DoubleCounter {
fn increment(&self, value: u64) {
self.first.increment(value);
self.second.increment(value);
}
fn absolute(&self, value: u64) {
self.first.absolute(value);
self.second.absolute(value);
}
}
impl GaugeFn for DoubleGauge {
fn increment(&self, value: f64) {
self.first.increment(value);
self.second.increment(value);
}
fn decrement(&self, value: f64) {
self.first.decrement(value);
self.second.decrement(value);
}
fn set(&self, value: f64) {
self.first.set(value);
self.second.set(value);
}
}
impl HistogramFn for DoubleHistogram {
fn record(&self, value: f64) {
self.first.record(value);
self.second.record(value);
}
}

414
src/collector/stats.rs Normal file
View File

@ -0,0 +1,414 @@
use metrics::{Key, Recorder, SetRecorderError};
use metrics_util::{
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
MetricKindMask, Summary,
};
use quanta::Clock;
use std::{
collections::{BTreeMap, HashMap},
sync::{atomic::Ordering, Arc, RwLock},
time::Duration,
};
const SECONDS: u64 = 1;
const MINUTES: u64 = 60 * SECONDS;
const HOURS: u64 = 60 * MINUTES;
const DAYS: u64 = 24 * HOURS;
type DistributionMap = BTreeMap<Vec<(String, String)>, Summary>;
#[derive(Clone)]
pub struct MemoryCollector {
inner: Arc<Inner>,
}
struct Inner {
descriptions: RwLock<HashMap<String, metrics::SharedString>>,
distributions: RwLock<HashMap<String, DistributionMap>>,
recency: Recency<Key>,
registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Counter {
labels: BTreeMap<String, String>,
value: u64,
}
impl std::fmt::Display for Counter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<_>>()
.join(", ");
write!(f, "{labels} - {}", self.value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Gauge {
labels: BTreeMap<String, String>,
value: f64,
}
impl std::fmt::Display for Gauge {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<_>>()
.join(", ");
write!(f, "{labels} - {}", self.value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Histogram {
labels: BTreeMap<String, String>,
value: Vec<(f64, Option<f64>)>,
}
impl std::fmt::Display for Histogram {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<_>>()
.join(", ");
let value = self
.value
.iter()
.map(|(k, v)| {
if let Some(v) = v {
format!("{k}: {v:.6}")
} else {
format!("{k}: None,")
}
})
.collect::<Vec<_>>()
.join(", ");
write!(f, "{labels} - {value}")
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Snapshot {
counters: HashMap<String, Vec<Counter>>,
gauges: HashMap<String, Vec<Gauge>>,
histograms: HashMap<String, Vec<Histogram>>,
}
const PAIRS: [((&str, &str), &str); 2] = [
(
(
"background-jobs.worker.started",
"background-jobs.worker.finished",
),
"background-jobs.worker.running",
),
(
(
"background-jobs.job.started",
"background-jobs.job.finished",
),
"background-jobs.job.running",
),
];
#[derive(Default)]
struct MergeCounter {
start: Option<Counter>,
finish: Option<Counter>,
}
impl MergeCounter {
fn merge(self) -> Option<Counter> {
match (self.start, self.finish) {
(Some(start), Some(end)) => Some(Counter {
labels: start.labels,
value: start.value.saturating_sub(end.value),
}),
(Some(only), None) => Some(only),
(None, Some(only)) => Some(Counter {
labels: only.labels,
value: 0,
}),
(None, None) => None,
}
}
}
impl Snapshot {
pub(crate) fn present(self) {
if !self.counters.is_empty() {
println!("Counters");
let mut merging = HashMap::new();
for (key, counters) in self.counters {
if let Some(((start, _), name)) = PAIRS
.iter()
.find(|((start, finish), _)| *start == key || *finish == key)
{
let entry = merging.entry(name).or_insert_with(HashMap::new);
for counter in counters {
let mut merge_counter = entry
.entry(counter.labels.clone())
.or_insert_with(MergeCounter::default);
if key == *start {
merge_counter.start = Some(counter);
} else {
merge_counter.finish = Some(counter);
}
}
continue;
}
println!("\t{key}");
for counter in counters {
println!("\t\t{counter}");
}
}
for (key, counters) in merging {
println!("\t{key}");
for (_, counter) in counters {
if let Some(counter) = counter.merge() {
println!("\t\t{counter}");
}
}
}
}
if !self.gauges.is_empty() {
println!("Gauges");
for (key, gauges) in self.gauges {
println!("\t{key}");
for gauge in gauges {
println!("\t\t{gauge}");
}
}
}
if !self.histograms.is_empty() {
println!("Histograms");
for (key, histograms) in self.histograms {
println!("\t{key}");
for histogram in histograms {
println!("\t\t{histogram}");
}
}
}
}
}
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
let labels = key
.labels()
.into_iter()
.map(|label| (label.key().to_string(), label.value().to_string()))
.collect();
let name = key.name().to_string();
(name, labels)
}
impl Inner {
fn snapshot_counters(&self) -> HashMap<String, Vec<Counter>> {
let mut counters = HashMap::new();
for (key, counter) in self.registry.get_counter_handles() {
let gen = counter.get_generation();
if !self.recency.should_store_counter(&key, gen, &self.registry) {
continue;
}
let (name, labels) = key_to_parts(&key);
let value = counter.get_inner().load(Ordering::Acquire);
counters.entry(name).or_insert_with(Vec::new).push(Counter {
labels: labels.into_iter().collect(),
value,
});
}
counters
}
fn snapshot_gauges(&self) -> HashMap<String, Vec<Gauge>> {
let mut gauges = HashMap::new();
for (key, gauge) in self.registry.get_gauge_handles() {
let gen = gauge.get_generation();
if !self.recency.should_store_gauge(&key, gen, &self.registry) {
continue;
}
let (name, labels) = key_to_parts(&key);
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
gauges.entry(name).or_insert_with(Vec::new).push(Gauge {
labels: labels.into_iter().collect(),
value,
})
}
gauges
}
fn snapshot_histograms(&self) -> HashMap<String, Vec<Histogram>> {
for (key, histogram) in self.registry.get_histogram_handles() {
let gen = histogram.get_generation();
let (name, labels) = key_to_parts(&key);
if !self
.recency
.should_store_histogram(&key, gen, &self.registry)
{
let mut d = self.distributions.write().unwrap();
let delete_by_name = if let Some(by_name) = d.get_mut(&name) {
by_name.remove(&labels);
by_name.is_empty()
} else {
false
};
drop(d);
if delete_by_name {
self.descriptions.write().unwrap().remove(&name);
}
continue;
}
let mut d = self.distributions.write().unwrap();
let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new);
let entry = outer_entry
.entry(labels)
.or_insert_with(Summary::with_defaults);
histogram.get_inner().clear_with(|samples| {
for sample in samples {
entry.add(*sample);
}
})
}
let d = self.distributions.read().unwrap().clone();
d.into_iter()
.map(|(key, value)| {
(
key,
value
.into_iter()
.map(|(labels, summary)| Histogram {
labels: labels.into_iter().collect(),
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
.into_iter()
.map(|q| (q, summary.quantile(q)))
.collect(),
})
.collect(),
)
})
.collect()
}
fn snapshot(&self) -> Snapshot {
Snapshot {
counters: self.snapshot_counters(),
gauges: self.snapshot_gauges(),
histograms: self.snapshot_histograms(),
}
}
}
impl MemoryCollector {
pub(crate) fn new() -> Self {
MemoryCollector {
inner: Arc::new(Inner {
descriptions: Default::default(),
distributions: Default::default(),
recency: Recency::new(
Clock::new(),
MetricKindMask::ALL,
Some(Duration::from_secs(5 * DAYS)),
),
registry: Registry::new(GenerationalStorage::atomic()),
}),
}
}
pub(crate) fn snapshot(&self) -> Snapshot {
self.inner.snapshot()
}
fn add_description_if_missing(
&self,
key: &metrics::KeyName,
description: metrics::SharedString,
) {
let mut d = self.inner.descriptions.write().unwrap();
d.entry(key.as_str().to_owned()).or_insert(description);
}
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
metrics::set_boxed_recorder(Box::new(self.clone()))
}
}
impl Recorder for MemoryCollector {
fn describe_counter(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn describe_gauge(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn describe_histogram(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn register_counter(&self, key: &Key) -> metrics::Counter {
self.inner
.registry
.get_or_create_counter(key, |c| c.clone().into())
}
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
self.inner
.registry
.get_or_create_gauge(key, |c| c.clone().into())
}
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
self.inner
.registry
.get_or_create_histogram(key, |c| c.clone().into())
}
}

View File

@ -1,22 +1,24 @@
use crate::{
data::{ActorCache, State},
error::Error,
extractors::{AdminConfig, XApiToken},
middleware::MyVerify,
requests::Requests,
};
use activitystreams::{
iri,
iri_string::{
format::ToDedicatedString,
resolve::FixedBaseResolver,
types::{IriAbsoluteString, IriFragmentStr, IriRelativeStr, IriString},
},
};
use config::Environment;
use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature};
use http_signature_normalization_actix::prelude::VerifyDigest;
use rsa::sha2::{Digest, Sha256};
use rustls::{Certificate, PrivateKey};
use sha2::{Digest, Sha256};
use std::{io::BufReader, net::IpAddr, path::PathBuf};
use std::{
io::BufReader,
net::{IpAddr, SocketAddr},
path::PathBuf,
};
use uuid::Uuid;
#[derive(Clone, Debug, serde::Deserialize)]
@ -31,6 +33,7 @@ pub(crate) struct ParsedConfig {
publish_blocks: bool,
sled_path: PathBuf,
source_repo: IriString,
repository_commit_base: String,
opentelemetry_url: Option<IriString>,
telegram_token: Option<String>,
telegram_admin_handle: Option<String>,
@ -40,6 +43,8 @@ pub(crate) struct ParsedConfig {
footer_blurb: Option<String>,
local_domains: Option<String>,
local_blurb: Option<String>,
prometheus_addr: Option<IpAddr>,
prometheus_port: Option<u16>,
}
#[derive(Clone)]
@ -62,6 +67,7 @@ pub struct Config {
footer_blurb: Option<String>,
local_domains: Vec<String>,
local_blurb: Option<String>,
prometheus_config: Option<PrometheusConfig>,
}
#[derive(Clone)]
@ -70,6 +76,12 @@ struct TlsConfig {
cert: PathBuf,
}
#[derive(Clone, Debug)]
struct PrometheusConfig {
addr: IpAddr,
port: u16,
}
#[derive(Debug)]
pub enum UrlKind {
Activity,
@ -94,6 +106,7 @@ pub enum AdminUrlKind {
Blocked,
Connected,
Stats,
LastSeen,
}
impl std::fmt::Debug for Config {
@ -121,6 +134,7 @@ impl std::fmt::Debug for Config {
.field("footer_blurb", &self.footer_blurb)
.field("local_domains", &self.local_domains)
.field("local_blurb", &self.local_blurb)
.field("prometheus_config", &self.prometheus_config)
.finish()
}
}
@ -138,6 +152,7 @@ impl Config {
.set_default("publish_blocks", false)?
.set_default("sled_path", "./sled/db-0-34")?
.set_default("source_repo", "https://git.asonix.dog/asonix/relay")?
.set_default("repository_commit_base", "/src/commit/")?
.set_default("opentelemetry_url", None as Option<&str>)?
.set_default("telegram_token", None as Option<&str>)?
.set_default("telegram_admin_handle", None as Option<&str>)?
@ -147,13 +162,15 @@ impl Config {
.set_default("footer_blurb", None as Option<&str>)?
.set_default("local_domains", None as Option<&str>)?
.set_default("local_blurb", None as Option<&str>)?
.set_default("prometheus_addr", None as Option<&str>)?
.set_default("prometheus_port", None as Option<u16>)?
.add_source(Environment::default())
.build()?;
let config: ParsedConfig = config.try_deserialize()?;
let scheme = if config.https { "https" } else { "http" };
let base_uri = iri!(format!("{}://{}", scheme, config.hostname)).into_absolute();
let base_uri = iri!(format!("{scheme}://{}", config.hostname)).into_absolute();
let tls = match (config.tls_key, config.tls_cert) {
(Some(key), Some(cert)) => Some(TlsConfig { key, cert }),
@ -175,6 +192,29 @@ impl Config {
.map(|d| d.to_string())
.collect();
let prometheus_config = match (config.prometheus_addr, config.prometheus_port) {
(Some(addr), Some(port)) => Some(PrometheusConfig { addr, port }),
(Some(_), None) => {
tracing::warn!("PROMETHEUS_ADDR is set but PROMETHEUS_PORT is not set, not building Prometheus config");
None
}
(None, Some(_)) => {
tracing::warn!("PROMETHEUS_PORT is set but PROMETHEUS_ADDR is not set, not building Prometheus config");
None
}
(None, None) => None,
};
let source_url = match Self::git_hash() {
Some(hash) => format!(
"{}{}{hash}",
config.source_repo, config.repository_commit_base
)
.parse()
.expect("constructed source URL is valid"),
None => config.source_repo.clone(),
};
Ok(Config {
hostname: config.hostname,
addr: config.addr,
@ -185,7 +225,7 @@ impl Config {
publish_blocks: config.publish_blocks,
base_uri,
sled_path: config.sled_path,
source_repo: config.source_repo,
source_repo: source_url,
opentelemetry_url: config.opentelemetry_url,
telegram_token: config.telegram_token,
telegram_admin_handle: config.telegram_admin_handle,
@ -194,9 +234,16 @@ impl Config {
footer_blurb: config.footer_blurb,
local_domains,
local_blurb: config.local_blurb,
prometheus_config,
})
}
pub(crate) fn prometheus_bind_address(&self) -> Option<SocketAddr> {
let config = self.prometheus_config.as_ref()?;
Some((config.addr, config.port).into())
}
pub(crate) fn open_keys(&self) -> Result<Option<(Vec<Certificate>, PrivateKey)>, Error> {
let tls = if let Some(tls) = &self.tls {
tls
@ -276,19 +323,6 @@ impl Config {
}
}
pub(crate) fn signature_middleware(
&self,
requests: Requests,
actors: ActorCache,
state: State,
) -> VerifySignature<MyVerify> {
if self.validate_signatures {
VerifySignature::new(MyVerify(requests, actors, state), Default::default())
} else {
VerifySignature::new(MyVerify(requests, actors, state), Default::default()).optional()
}
}
pub(crate) fn x_api_token(&self) -> Option<XApiToken> {
self.api_token.clone().map(XApiToken::new)
}
@ -298,7 +332,7 @@ impl Config {
match AdminConfig::build(api_token) {
Ok(conf) => Some(actix_web::web::Data::new(conf)),
Err(e) => {
tracing::error!("Error creating admin config: {}", e);
tracing::error!("Error creating admin config: {e}");
None
}
}
@ -337,7 +371,7 @@ impl Config {
pub(crate) fn software_version() -> String {
if let Some(git) = Self::git_version() {
return format!("v{}-{}", Self::version(), git);
return format!("v{}-{git}", Self::version());
}
format!("v{}", Self::version())
@ -345,9 +379,9 @@ impl Config {
fn git_version() -> Option<String> {
let branch = Self::git_branch()?;
let hash = Self::git_hash()?;
let hash = Self::git_short_hash()?;
Some(format!("{}-{}", branch, hash))
Some(format!("{branch}-{hash}"))
}
fn name() -> &'static str {
@ -366,6 +400,10 @@ impl Config {
option_env!("GIT_HASH")
}
fn git_short_hash() -> Option<&'static str> {
option_env!("GIT_SHORT_HASH")
}
pub(crate) fn user_agent(&self) -> String {
format!(
"{} ({}/{}; +{})",
@ -395,37 +433,44 @@ impl Config {
self.do_generate_url(kind).expect("Generated valid IRI")
}
#[tracing::instrument(level = "debug", skip_all, fields(base_uri = tracing::field::debug(&self.base_uri), kind = tracing::field::debug(&kind)))]
fn do_generate_url(&self, kind: UrlKind) -> Result<IriString, Error> {
let iri = match kind {
UrlKind::Activity => FixedBaseResolver::new(self.base_uri.as_ref()).try_resolve(
IriRelativeStr::new(&format!("activity/{}", Uuid::new_v4()))?.as_ref(),
)?,
UrlKind::Activity => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new(&format!("activity/{}", Uuid::new_v4()))?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Actor => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("actor")?.as_ref())?,
.resolve(IriRelativeStr::new("actor")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Followers => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("followers")?.as_ref())?,
.resolve(IriRelativeStr::new("followers")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Following => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("following")?.as_ref())?,
.resolve(IriRelativeStr::new("following")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Inbox => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("inbox")?.as_ref())?,
.resolve(IriRelativeStr::new("inbox")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Index => self.base_uri.clone().into(),
UrlKind::MainKey => {
let actor = IriRelativeStr::new("actor")?;
let fragment = IriFragmentStr::new("main-key")?;
let mut resolved =
FixedBaseResolver::new(self.base_uri.as_ref()).try_resolve(actor.as_ref())?;
let mut resolved = FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(actor.as_ref())
.try_to_dedicated_string()?;
resolved.set_fragment(Some(fragment));
resolved
}
UrlKind::Media(uuid) => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new(&format!("media/{}", uuid))?.as_ref())?,
.resolve(IriRelativeStr::new(&format!("media/{uuid}"))?.as_ref())
.try_to_dedicated_string()?,
UrlKind::NodeInfo => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("nodeinfo/2.0.json")?.as_ref())?,
.resolve(IriRelativeStr::new("nodeinfo/2.0.json")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Outbox => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("outbox")?.as_ref())?,
.resolve(IriRelativeStr::new("outbox")?.as_ref())
.try_to_dedicated_string()?,
};
Ok(iri)
@ -437,25 +482,22 @@ impl Config {
}
fn do_generate_admin_url(&self, kind: AdminUrlKind) -> Result<IriString, Error> {
let iri = match kind {
AdminUrlKind::Allow => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/allow")?.as_ref())?,
AdminUrlKind::Disallow => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/disallow")?.as_ref())?,
AdminUrlKind::Block => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/block")?.as_ref())?,
AdminUrlKind::Unblock => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/unblock")?.as_ref())?,
AdminUrlKind::Allowed => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/allowed")?.as_ref())?,
AdminUrlKind::Blocked => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/blocked")?.as_ref())?,
AdminUrlKind::Connected => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/connected")?.as_ref())?,
AdminUrlKind::Stats => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/stats")?.as_ref())?,
let path = match kind {
AdminUrlKind::Allow => "api/v1/admin/allow",
AdminUrlKind::Disallow => "api/v1/admin/disallow",
AdminUrlKind::Block => "api/v1/admin/block",
AdminUrlKind::Unblock => "api/v1/admin/unblock",
AdminUrlKind::Allowed => "api/v1/admin/allowed",
AdminUrlKind::Blocked => "api/v1/admin/blocked",
AdminUrlKind::Connected => "api/v1/admin/connected",
AdminUrlKind::Stats => "api/v1/admin/stats",
AdminUrlKind::LastSeen => "api/v1/admin/last_seen",
};
let iri = FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new(path)?.as_ref())
.try_to_dedicated_string()?;
Ok(iri)
}
}

View File

@ -1,9 +1,11 @@
mod actor;
mod last_online;
mod media;
mod node;
mod state;
pub(crate) use actor::ActorCache;
pub(crate) use last_online::LastOnline;
pub(crate) use media::MediaCache;
pub(crate) use node::{Node, NodeCache};
pub(crate) use state::State;

View File

@ -37,7 +37,7 @@ impl ActorCache {
ActorCache { db }
}
#[tracing::instrument(level = "debug" name = "Get Actor", skip_all, fields(id = id.to_string().as_str(), requests))]
#[tracing::instrument(level = "debug" name = "Get Actor", skip_all, fields(id = id.to_string().as_str()))]
pub(crate) async fn get(
&self,
id: &IriString,
@ -56,12 +56,8 @@ impl ActorCache {
#[tracing::instrument(level = "debug", name = "Add Connection", skip(self))]
pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> {
let add_connection = self.db.add_connection(actor.id.clone());
let save_actor = self.db.save_actor(actor);
tokio::try_join!(add_connection, save_actor)?;
Ok(())
self.db.add_connection(actor.id.clone()).await?;
self.db.save_actor(actor).await
}
#[tracing::instrument(level = "debug", name = "Remove Connection", skip(self))]
@ -69,13 +65,13 @@ impl ActorCache {
self.db.remove_connection(actor.id.clone()).await
}
#[tracing::instrument(level = "debug", name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str(), requests))]
#[tracing::instrument(level = "debug", name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str()))]
pub(crate) async fn get_no_cache(
&self,
id: &IriString,
requests: &Requests,
) -> Result<Actor, Error> {
let accepted_actor = requests.fetch::<AcceptedActors>(id.as_str()).await?;
let accepted_actor = requests.fetch::<AcceptedActors>(id).await?;
let input_authority = id.authority_components().ok_or(ErrorKind::MissingDomain)?;
let accepted_actor_id = accepted_actor

28
src/data/last_online.rs Normal file
View File

@ -0,0 +1,28 @@
use activitystreams::iri_string::types::IriStr;
use std::{collections::HashMap, sync::Mutex};
use time::OffsetDateTime;
pub(crate) struct LastOnline {
domains: Mutex<HashMap<String, OffsetDateTime>>,
}
impl LastOnline {
pub(crate) fn mark_seen(&self, iri: &IriStr) {
if let Some(authority) = iri.authority_str() {
self.domains
.lock()
.unwrap()
.insert(authority.to_string(), OffsetDateTime::now_utc());
}
}
pub(crate) fn take(&self) -> HashMap<String, OffsetDateTime> {
std::mem::take(&mut *self.domains.lock().unwrap())
}
pub(crate) fn empty() -> Self {
Self {
domains: Mutex::new(HashMap::default()),
}
}
}

View File

@ -36,11 +36,9 @@ impl NodeCache {
#[tracing::instrument(level = "debug", name = "Get nodes", skip(self))]
pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> {
let infos = self.db.connected_info();
let instances = self.db.connected_instance();
let contacts = self.db.connected_contact();
let (infos, instances, contacts) = tokio::try_join!(infos, instances, contacts)?;
let infos = self.db.connected_info().await?;
let instances = self.db.connected_instance().await?;
let contacts = self.db.connected_contact().await?;
let vec = self
.db
@ -184,7 +182,7 @@ impl Node {
let authority = url.authority_str().ok_or(ErrorKind::MissingDomain)?;
let scheme = url.scheme_str();
let base = iri!(format!("{}://{}", scheme, authority));
let base = iri!(format!("{scheme}://{authority}"));
Ok(Node {
base,

View File

@ -10,8 +10,9 @@ use actix_web::web;
use lru::LruCache;
use rand::thread_rng;
use rsa::{RsaPrivateKey, RsaPublicKey};
use std::sync::Arc;
use tokio::sync::RwLock;
use std::sync::{Arc, RwLock};
use super::LastOnline;
#[derive(Clone)]
pub struct State {
@ -20,6 +21,7 @@ pub struct State {
object_cache: Arc<RwLock<LruCache<IriString, IriString>>>,
node_cache: NodeCache,
breakers: Breakers,
pub(crate) last_online: Arc<LastOnline>,
pub(crate) db: Db,
}
@ -44,6 +46,7 @@ impl State {
self.private_key.clone(),
config.user_agent(),
self.breakers.clone(),
self.last_online.clone(),
)
}
@ -78,12 +81,12 @@ impl State {
.collect())
}
pub(crate) async fn is_cached(&self, object_id: &IriString) -> bool {
self.object_cache.read().await.contains(object_id)
pub(crate) fn is_cached(&self, object_id: &IriString) -> bool {
self.object_cache.read().unwrap().contains(object_id)
}
pub(crate) async fn cache(&self, object_id: IriString, actor_id: IriString) {
self.object_cache.write().await.put(object_id, actor_id);
pub(crate) fn cache(&self, object_id: IriString, actor_id: IriString) {
self.object_cache.write().unwrap().put(object_id, actor_id);
}
#[tracing::instrument(level = "debug", name = "Building state", skip_all)]
@ -115,6 +118,7 @@ impl State {
node_cache: NodeCache::new(db.clone()),
breakers: Breakers::default(),
db,
last_online: Arc::new(LastOnline::empty()),
};
Ok(state)

157
src/db.rs
View File

@ -7,8 +7,16 @@ use rsa::{
pkcs8::{DecodePrivateKey, EncodePrivateKey},
RsaPrivateKey,
};
use sled::Tree;
use std::{collections::HashMap, sync::Arc, time::SystemTime};
use sled::{Batch, Tree};
use std::{
collections::{BTreeMap, HashMap},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::SystemTime,
};
use time::OffsetDateTime;
use uuid::Uuid;
#[derive(Clone, Debug)]
@ -17,6 +25,8 @@ pub(crate) struct Db {
}
struct Inner {
healthz: Tree,
healthz_counter: Arc<AtomicU64>,
actor_id_actor: Tree,
public_key_id_actor_id: Tree,
connected_actor_ids: Tree,
@ -28,6 +38,7 @@ struct Inner {
actor_id_info: Tree,
actor_id_instance: Tree,
actor_id_contact: Tree,
last_seen: Tree,
restricted_mode: bool,
}
@ -236,6 +247,8 @@ impl Db {
fn build_inner(restricted_mode: bool, db: sled::Db) -> Result<Self, Error> {
Ok(Db {
inner: Arc::new(Inner {
healthz: db.open_tree("healthz")?,
healthz_counter: Arc::new(AtomicU64::new(0)),
actor_id_actor: db.open_tree("actor-id-actor")?,
public_key_id_actor_id: db.open_tree("public-key-id-actor-id")?,
connected_actor_ids: db.open_tree("connected-actor-ids")?,
@ -247,6 +260,7 @@ impl Db {
actor_id_info: db.open_tree("actor-id-info")?,
actor_id_instance: db.open_tree("actor-id-instance")?,
actor_id_contact: db.open_tree("actor-id-contact")?,
last_seen: db.open_tree("last-seen")?,
restricted_mode,
}),
})
@ -254,7 +268,7 @@ impl Db {
async fn unblock<T>(
&self,
f: impl Fn(&Inner) -> Result<T, Error> + Send + 'static,
f: impl FnOnce(&Inner) -> Result<T, Error> + Send + 'static,
) -> Result<T, Error>
where
T: Send + 'static,
@ -266,6 +280,63 @@ impl Db {
Ok(t)
}
pub(crate) async fn check_health(&self) -> Result<(), Error> {
let next = self.inner.healthz_counter.fetch_add(1, Ordering::Relaxed);
self.unblock(move |inner| {
inner
.healthz
.insert("healthz", &next.to_be_bytes()[..])
.map_err(Error::from)
})
.await?;
self.inner.healthz.flush_async().await?;
self.unblock(move |inner| inner.healthz.get("healthz").map_err(Error::from))
.await?;
Ok(())
}
pub(crate) async fn mark_last_seen(
&self,
nodes: HashMap<String, OffsetDateTime>,
) -> Result<(), Error> {
let mut batch = Batch::default();
for (domain, datetime) in nodes {
let datetime_string = serde_json::to_vec(&datetime)?;
batch.insert(domain.as_bytes(), datetime_string);
}
self.unblock(move |inner| inner.last_seen.apply_batch(batch).map_err(Error::from))
.await
}
pub(crate) async fn last_seen(
&self,
) -> Result<BTreeMap<String, Option<OffsetDateTime>>, Error> {
self.unblock(|inner| {
let mut map = BTreeMap::new();
for iri in inner.connected() {
let Some(authority_str) = iri.authority_str() else {
continue;
};
if let Some(datetime) = inner.last_seen.get(authority_str)? {
map.insert(
authority_str.to_string(),
Some(serde_json::from_slice(&datetime)?),
);
} else {
map.insert(authority_str.to_string(), None);
}
}
Ok(map)
})
.await
}
pub(crate) async fn connected_ids(&self) -> Result<Vec<IriString>, Error> {
self.unblock(|inner| Ok(inner.connected().collect())).await
}
@ -285,12 +356,12 @@ impl Db {
pub(crate) async fn info(&self, actor_id: IriString) -> Result<Option<Info>, Error> {
self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_info.get(actor_id.as_str().as_bytes())? {
let info = serde_json::from_slice(&ivec)?;
Ok(Some(info))
} else {
Ok(None)
}
inner
.actor_id_info
.get(actor_id.as_str().as_bytes())?
.map(|ivec| serde_json::from_slice(&ivec))
.transpose()
.map_err(Error::from)
})
.await
}
@ -319,12 +390,12 @@ impl Db {
pub(crate) async fn instance(&self, actor_id: IriString) -> Result<Option<Instance>, Error> {
self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_instance.get(actor_id.as_str().as_bytes())? {
let instance = serde_json::from_slice(&ivec)?;
Ok(Some(instance))
} else {
Ok(None)
}
inner
.actor_id_instance
.get(actor_id.as_str().as_bytes())?
.map(|ivec| serde_json::from_slice(&ivec))
.transpose()
.map_err(Error::from)
})
.await
}
@ -353,12 +424,12 @@ impl Db {
pub(crate) async fn contact(&self, actor_id: IriString) -> Result<Option<Contact>, Error> {
self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_contact.get(actor_id.as_str().as_bytes())? {
let contact = serde_json::from_slice(&ivec)?;
Ok(Some(contact))
} else {
Ok(None)
}
inner
.actor_id_contact
.get(actor_id.as_str().as_bytes())?
.map(|ivec| serde_json::from_slice(&ivec))
.transpose()
.map_err(Error::from)
})
.await
}
@ -383,22 +454,20 @@ impl Db {
pub(crate) async fn media_id(&self, url: IriString) -> Result<Option<Uuid>, Error> {
self.unblock(move |inner| {
if let Some(ivec) = inner.media_url_media_id.get(url.as_str().as_bytes())? {
Ok(uuid_from_ivec(ivec))
} else {
Ok(None)
}
Ok(inner
.media_url_media_id
.get(url.as_str().as_bytes())?
.and_then(uuid_from_ivec))
})
.await
}
pub(crate) async fn media_url(&self, id: Uuid) -> Result<Option<IriString>, Error> {
self.unblock(move |inner| {
if let Some(ivec) = inner.media_id_media_url.get(id.as_bytes())? {
Ok(url_from_ivec(ivec))
} else {
Ok(None)
}
Ok(inner
.media_id_media_url
.get(id.as_bytes())?
.and_then(url_from_ivec))
})
.await
}
@ -419,7 +488,7 @@ impl Db {
pub(crate) async fn is_connected(&self, base_id: IriString) -> Result<bool, Error> {
let scheme = base_id.scheme_str();
let authority = base_id.authority_str().ok_or(ErrorKind::MissingDomain)?;
let prefix = format!("{}://{}", scheme, authority);
let prefix = format!("{scheme}://{authority}");
self.unblock(move |inner| {
let connected = inner
@ -438,26 +507,22 @@ impl Db {
public_key_id: IriString,
) -> Result<Option<IriString>, Error> {
self.unblock(move |inner| {
if let Some(ivec) = inner
Ok(inner
.public_key_id_actor_id
.get(public_key_id.as_str().as_bytes())?
{
Ok(url_from_ivec(ivec))
} else {
Ok(None)
}
.and_then(url_from_ivec))
})
.await
}
pub(crate) async fn actor(&self, actor_id: IriString) -> Result<Option<Actor>, Error> {
self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_actor.get(actor_id.as_str().as_bytes())? {
let actor = serde_json::from_slice(&ivec)?;
Ok(Some(actor))
} else {
Ok(None)
}
inner
.actor_id_actor
.get(actor_id.as_str().as_bytes())?
.map(|ivec| serde_json::from_slice(&ivec))
.transpose()
.map_err(Error::from)
})
.await
}
@ -479,7 +544,7 @@ impl Db {
}
pub(crate) async fn remove_connection(&self, actor_id: IriString) -> Result<(), Error> {
tracing::debug!("Removing Connection: {}", actor_id);
tracing::debug!("Removing Connection: {actor_id}");
self.unblock(move |inner| {
inner
.connected_actor_ids
@ -491,7 +556,7 @@ impl Db {
}
pub(crate) async fn add_connection(&self, actor_id: IriString) -> Result<(), Error> {
tracing::debug!("Adding Connection: {}", actor_id);
tracing::debug!("Adding Connection: {actor_id}");
self.unblock(move |inner| {
inner
.connected_actor_ids

View File

@ -10,7 +10,7 @@ use std::{convert::Infallible, fmt::Debug, io};
use tracing_error::SpanTrace;
pub(crate) struct Error {
context: SpanTrace,
context: String,
kind: ErrorKind,
}
@ -26,6 +26,14 @@ impl Error {
pub(crate) fn is_bad_request(&self) -> bool {
matches!(self.kind, ErrorKind::Status(_, StatusCode::BAD_REQUEST))
}
pub(crate) fn is_gone(&self) -> bool {
matches!(self.kind, ErrorKind::Status(_, StatusCode::GONE))
}
pub(crate) fn is_malformed_json(&self) -> bool {
matches!(self.kind, ErrorKind::Json(_))
}
}
impl std::fmt::Debug for Error {
@ -53,7 +61,7 @@ where
{
fn from(error: T) -> Self {
Error {
context: SpanTrace::capture(),
context: SpanTrace::capture().to_string(),
kind: error.into(),
}
}
@ -77,10 +85,7 @@ pub(crate) enum ErrorKind {
ParseIri(#[from] activitystreams::iri_string::validate::Error),
#[error("Couldn't normalize IRI, {0}")]
NormalizeIri(
#[from]
activitystreams::iri_string::task::Error<activitystreams::iri_string::normalize::Error>,
),
NormalizeIri(#[from] std::collections::TryReserveError),
#[error("Couldn't perform IO, {0}")]
Io(#[from] io::Error),
@ -98,13 +103,13 @@ pub(crate) enum ErrorKind {
PrepareSign(#[from] PrepareSignError),
#[error("Couldn't sign digest")]
Signature(#[from] signature::Error),
Signature(#[from] rsa::signature::Error),
#[error("Couldn't read signature")]
ReadSignature(signature::Error),
ReadSignature(rsa::signature::Error),
#[error("Couldn't verify signature")]
VerifySignature(signature::Error),
VerifySignature(rsa::signature::Error),
#[error("Couldn't parse the signature header")]
HeaderValidation(#[from] actix_web::http::header::InvalidHeaderValue),
@ -125,7 +130,7 @@ pub(crate) enum ErrorKind {
BadActor(String, String),
#[error("Signature verification is required, but no signature was given")]
NoSignature(String),
NoSignature(Option<String>),
#[error("Wrong ActivityPub kind, {0}")]
Kind(String),
@ -196,7 +201,8 @@ impl ResponseError for Error {
ErrorKind::Kind(_)
| ErrorKind::MissingKind
| ErrorKind::MissingId
| ErrorKind::ObjectCount => StatusCode::BAD_REQUEST,
| ErrorKind::ObjectCount
| ErrorKind::NoSignature(_) => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}

View File

@ -80,7 +80,7 @@ impl Admin {
#[derive(Debug, thiserror::Error)]
#[error("Failed authentication")]
pub(crate) struct Error {
context: SpanTrace,
context: String,
#[source]
kind: ErrorKind,
}
@ -88,49 +88,49 @@ pub(crate) struct Error {
impl Error {
fn invalid() -> Self {
Error {
context: SpanTrace::capture(),
context: SpanTrace::capture().to_string(),
kind: ErrorKind::Invalid,
}
}
fn missing_config() -> Self {
Error {
context: SpanTrace::capture(),
context: SpanTrace::capture().to_string(),
kind: ErrorKind::MissingConfig,
}
}
fn missing_db() -> Self {
Error {
context: SpanTrace::capture(),
context: SpanTrace::capture().to_string(),
kind: ErrorKind::MissingDb,
}
}
fn bcrypt_verify(e: BcryptError) -> Self {
Error {
context: SpanTrace::capture(),
context: SpanTrace::capture().to_string(),
kind: ErrorKind::BCryptVerify(e),
}
}
fn bcrypt_hash(e: BcryptError) -> Self {
Error {
context: SpanTrace::capture(),
context: SpanTrace::capture().to_string(),
kind: ErrorKind::BCryptHash(e),
}
}
fn parse_header(e: ParseError) -> Self {
Error {
context: SpanTrace::capture(),
context: SpanTrace::capture().to_string(),
kind: ErrorKind::ParseHeader(e),
}
}
fn canceled(_: BlockingError) -> Self {
Error {
context: SpanTrace::capture(),
context: SpanTrace::capture().to_string(),
kind: ErrorKind::Canceled,
}
}

View File

@ -5,6 +5,7 @@ mod deliver_many;
mod instance;
mod nodeinfo;
mod process_listeners;
mod record_last_online;
pub(crate) use self::{
contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance,
@ -15,7 +16,7 @@ use crate::{
config::Config,
data::{ActorCache, MediaCache, NodeCache, State},
error::{Error, ErrorKind},
jobs::process_listeners::Listeners,
jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline},
requests::Requests,
};
use background_jobs::{
@ -62,6 +63,7 @@ pub(crate) fn create_workers(
.register::<QueryInstance>()
.register::<Listeners>()
.register::<QueryContact>()
.register::<RecordLastOnline>()
.register::<apub::Announce>()
.register::<apub::Follow>()
.register::<apub::Forward>()
@ -73,6 +75,7 @@ pub(crate) fn create_workers(
.start_with_threads(parallelism);
shared.every(Duration::from_secs(60 * 5), Listeners);
shared.every(Duration::from_secs(60 * 10), RecordLastOnline);
let job_server = JobServer::new(shared.queue_handle().clone());

View File

@ -36,13 +36,13 @@ async fn get_inboxes(
state.inboxes_without(&actor.inbox, &authority).await
}
fn prepare_activity<T, U, V, Kind>(
fn prepare_activity<T, U, V>(
mut t: T,
id: impl TryInto<IriString, Error = U>,
to: impl TryInto<IriString, Error = V>,
) -> Result<T, Error>
where
T: ObjectExt<Kind> + BaseExt<Kind>,
T: ObjectExt + BaseExt,
Error: From<U> + From<V>,
{
t.set_id(id.try_into()?)

View File

@ -42,7 +42,7 @@ impl Announce {
.queue(DeliverMany::new(inboxes, announce)?)
.await?;
state.state.cache(self.object_id, activity_id).await;
state.state.cache(self.object_id, activity_id);
Ok(())
}
}

View File

@ -42,7 +42,7 @@ impl QueryContact {
let contact = match state
.requests
.fetch::<AcceptedActors>(self.contact_id.as_str())
.fetch::<AcceptedActors>(&self.contact_id)
.await
{
Ok(contact) => contact,

View File

@ -35,7 +35,7 @@ impl Deliver {
#[tracing::instrument(name = "Deliver", skip(state))]
async fn permform(self, state: JobState) -> Result<(), Error> {
if let Err(e) = state.requests.deliver(self.to, &self.data).await {
if let Err(e) = state.requests.deliver(&self.to, &self.data).await {
if e.is_breaker() {
tracing::debug!("Not trying due to failed breaker");
return Ok(());

File diff suppressed because one or more lines are too long

View File

@ -39,11 +39,11 @@ impl QueryNodeinfo {
.authority_str()
.ok_or(ErrorKind::MissingDomain)?;
let scheme = self.actor_id.scheme_str();
let well_known_uri = iri!(format!("{}://{}/.well-known/nodeinfo", scheme, authority));
let well_known_uri = iri!(format!("{scheme}://{authority}/.well-known/nodeinfo"));
let well_known = match state
.requests
.fetch_json::<WellKnown>(well_known_uri.as_str())
.fetch_json::<WellKnown>(&well_known_uri)
.await
{
Ok(well_known) => well_known,
@ -55,7 +55,7 @@ impl QueryNodeinfo {
};
let href = if let Some(link) = well_known.links.into_iter().find(|l| l.rel.is_supported()) {
link.href
iri!(&link.href)
} else {
return Ok(());
};
@ -168,7 +168,7 @@ impl<'de> serde::de::Visitor<'de> for SupportedVersionVisitor {
type Value = SupportedVersion;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "a string starting with '{}'", SUPPORTED_VERSIONS)
write!(f, "a string starting with '{SUPPORTED_VERSIONS}'")
}
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
@ -187,7 +187,7 @@ impl<'de> serde::de::Visitor<'de> for SupportedNodeinfoVisitor {
type Value = SupportedNodeinfo;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "a string starting with '{}'", SUPPORTED_NODEINFO)
write!(f, "a string starting with '{SUPPORTED_NODEINFO}'")
}
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>

View File

@ -0,0 +1,28 @@
use crate::{error::Error, jobs::JobState};
use background_jobs::{ActixJob, Backoff};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct RecordLastOnline;
impl RecordLastOnline {
#[tracing::instrument(skip(state))]
async fn perform(self, state: JobState) -> Result<(), Error> {
let nodes = state.state.last_online.take();
state.state.db.mark_last_seen(nodes).await
}
}
impl ActixJob for RecordLastOnline {
type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::RecordLastOnline";
const QUEUE: &'static str = "maintenance";
const BACKOFF: Backoff = Backoff::Linear(1);
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -2,10 +2,13 @@
#![allow(clippy::needless_borrow)]
use activitystreams::iri_string::types::IriString;
use actix_rt::task::JoinHandle;
use actix_web::{middleware::Compress, web, App, HttpServer};
use collector::MemoryCollector;
use collector::{DoubleRecorder, MemoryCollector};
#[cfg(feature = "console")]
use console_subscriber::ConsoleLayer;
use http_signature_normalization_actix::middleware::VerifySignature;
use metrics_exporter_prometheus::PrometheusBuilder;
use opentelemetry::{sdk::Resource, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use rustls::ServerConfig;
@ -35,8 +38,8 @@ use self::{
data::{ActorCache, MediaCache, State},
db::Db,
jobs::create_workers,
middleware::{DebugPayload, RelayResolver, Timings},
routes::{actor, inbox, index, nodeinfo, nodeinfo_meta, statics},
middleware::{DebugPayload, MyVerify, RelayResolver, Timings},
routes::{actor, healthz, inbox, index, nodeinfo, nodeinfo_meta, statics},
};
fn init_subscriber(
@ -94,19 +97,31 @@ fn init_subscriber(
Ok(())
}
fn main() -> Result<(), anyhow::Error> {
#[actix_rt::main]
async fn main() -> Result<(), anyhow::Error> {
dotenv::dotenv().ok();
let config = Config::build()?;
init_subscriber(Config::software_name(), config.opentelemetry_url())?;
let collector = MemoryCollector::new();
collector.install()?;
let args = Args::new();
if args.any() {
return client_main(config, args);
return client_main(config, args).await?;
}
let collector = MemoryCollector::new();
if let Some(bind_addr) = config.prometheus_bind_address() {
let (recorder, exporter) = PrometheusBuilder::new()
.with_http_listener(bind_addr)
.build()?;
actix_rt::spawn(exporter);
DoubleRecorder::new(recorder, collector.clone()).install()?;
} else {
collector.install()?;
}
tracing::warn!("Opening DB");
@ -116,16 +131,15 @@ fn main() -> Result<(), anyhow::Error> {
let actors = ActorCache::new(db.clone());
let media = MediaCache::new(db.clone());
server_main(db, actors, media, collector, config)?;
server_main(db, actors, media, collector, config).await??;
tracing::warn!("Application exit");
Ok(())
}
#[actix_rt::main]
async fn client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
actix_rt::spawn(do_client_main(config, args)).await?
fn client_main(config: Config, args: Args) -> JoinHandle<Result<(), anyhow::Error>> {
actix_rt::spawn(do_client_main(config, args))
}
async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
@ -142,6 +156,39 @@ async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error>
println!("Updated lists");
}
if args.contacted() {
let last_seen = admin::client::last_seen(&client, &config).await?;
let mut report = String::from("Contacted:");
if !last_seen.never.is_empty() {
report += "\nNever seen:\n";
}
for domain in last_seen.never {
report += "\t";
report += &domain;
report += "\n";
}
if !last_seen.last_seen.is_empty() {
report += "\nSeen:\n";
}
for (datetime, domains) in last_seen.last_seen {
for domain in domains {
report += "\t";
report += &datetime.to_string();
report += " - ";
report += &domain;
report += "\n";
}
}
report += "\n";
println!("{report}");
}
if args.list() {
let (blocked, allowed, connected) = tokio::try_join!(
admin::client::blocked(&client, &config),
@ -174,15 +221,14 @@ async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error>
Ok(())
}
#[actix_rt::main]
async fn server_main(
fn server_main(
db: Db,
actors: ActorCache,
media: MediaCache,
collector: MemoryCollector,
config: Config,
) -> Result<(), anyhow::Error> {
actix_rt::spawn(do_server_main(db, actors, media, collector, config)).await?
) -> JoinHandle<Result<(), anyhow::Error>> {
actix_rt::spawn(do_server_main(db, actors, media, collector, config))
}
async fn do_server_main(
@ -227,15 +273,15 @@ async fn do_server_main(
app.wrap(Compress::default())
.wrap(TracingLogger::default())
.wrap(Timings)
.route("/healthz", web::get().to(healthz))
.service(web::resource("/").route(web::get().to(index)))
.service(web::resource("/media/{path}").route(web::get().to(routes::media)))
.service(
web::resource("/inbox")
.wrap(config.digest_middleware())
.wrap(config.signature_middleware(
state.requests(&config),
actors.clone(),
state.clone(),
.wrap(VerifySignature::new(
MyVerify(state.requests(&config), actors.clone(), state.clone()),
Default::default(),
))
.wrap(DebugPayload(config.debug()))
.route(web::post().to(inbox)),
@ -258,7 +304,8 @@ async fn do_server_main(
.route("/allowed", web::get().to(admin::routes::allowed))
.route("/blocked", web::get().to(admin::routes::blocked))
.route("/connected", web::get().to(admin::routes::connected))
.route("/stats", web::get().to(admin::routes::stats)),
.route("/stats", web::get().to(admin::routes::stats))
.route("/last_seen", web::get().to(admin::routes::last_seen)),
),
)
});

View File

@ -6,10 +6,12 @@ use crate::{
};
use activitystreams::{base::BaseExt, iri, iri_string::types::IriString};
use actix_web::web;
use base64::{engine::general_purpose::STANDARD, Engine};
use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm};
use rsa::{pkcs1v15::VerifyingKey, pkcs8::DecodePublicKey, RsaPublicKey};
use sha2::{Digest, Sha256};
use signature::{DigestVerifier, Signature};
use rsa::{
pkcs1v15::Signature, pkcs1v15::VerifyingKey, pkcs8::DecodePublicKey, sha2::Sha256,
signature::Verifier, RsaPublicKey,
};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug)]
@ -65,11 +67,17 @@ impl MyVerify {
actor_id
} else {
self.0
.fetch::<PublicKeyResponse>(public_key_id.as_str())
.await?
.actor_id()
.ok_or(ErrorKind::MissingId)?
match self.0.fetch::<PublicKeyResponse>(&public_key_id).await {
Ok(res) => res.actor_id().ok_or(ErrorKind::MissingId),
Err(e) => {
if e.is_gone() {
tracing::warn!("Actor gone: {public_key_id}");
return Ok(false);
} else {
return Err(e);
}
}
}?
};
// Previously we verified the sig from an actor's local cache
@ -117,13 +125,13 @@ async fn do_verify(
let span = tracing::Span::current();
web::block(move || {
span.in_scope(|| {
let decoded = base64::decode(signature)?;
let signature = Signature::from_bytes(&decoded).map_err(ErrorKind::ReadSignature)?;
let hashed = Sha256::new_with_prefix(signing_string.as_bytes());
let decoded = STANDARD.decode(signature)?;
let signature =
Signature::try_from(decoded.as_slice()).map_err(ErrorKind::ReadSignature)?;
let verifying_key = VerifyingKey::new_with_prefix(public_key);
let verifying_key = VerifyingKey::<Sha256>::new_with_prefix(public_key);
verifying_key
.verify_digest(hashed, &signature)
.verify(signing_string.as_bytes(), &signature)
.map_err(ErrorKind::VerifySignature)?;
Ok(()) as Result<(), Error>
@ -159,20 +167,20 @@ mod tests {
use crate::apub::AcceptedActors;
use rsa::{pkcs8::DecodePublicKey, RsaPublicKey};
const ASONIX_DOG_ACTOR: &'static str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://w3id.org/security/v1",{"manuallyApprovesFollowers":"as:manuallyApprovesFollowers","toot":"http://joinmastodon.org/ns#","featured":{"@id":"toot:featured","@type":"@id"},"featuredTags":{"@id":"toot:featuredTags","@type":"@id"},"alsoKnownAs":{"@id":"as:alsoKnownAs","@type":"@id"},"movedTo":{"@id":"as:movedTo","@type":"@id"},"schema":"http://schema.org#","PropertyValue":"schema:PropertyValue","value":"schema:value","discoverable":"toot:discoverable","Device":"toot:Device","Ed25519Signature":"toot:Ed25519Signature","Ed25519Key":"toot:Ed25519Key","Curve25519Key":"toot:Curve25519Key","EncryptedMessage":"toot:EncryptedMessage","publicKeyBase64":"toot:publicKeyBase64","deviceId":"toot:deviceId","claim":{"@type":"@id","@id":"toot:claim"},"fingerprintKey":{"@type":"@id","@id":"toot:fingerprintKey"},"identityKey":{"@type":"@id","@id":"toot:identityKey"},"devices":{"@type":"@id","@id":"toot:devices"},"messageFranking":"toot:messageFranking","messageType":"toot:messageType","cipherText":"toot:cipherText","suspended":"toot:suspended"}],"id":"https://masto.asonix.dog/actor","type":"Application","inbox":"https://masto.asonix.dog/actor/inbox","outbox":"https://masto.asonix.dog/actor/outbox","preferredUsername":"masto.asonix.dog","url":"https://masto.asonix.dog/about/more?instance_actor=true","manuallyApprovesFollowers":true,"publicKey":{"id":"https://masto.asonix.dog/actor#main-key","owner":"https://masto.asonix.dog/actor","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n"},"endpoints":{"sharedInbox":"https://masto.asonix.dog/inbox"}}"#;
const KARJALAZET_RELAY: &'static str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://pleroma.karjalazet.se/schemas/litepub-0.1.jsonld",{"@language":"und"}],"alsoKnownAs":[],"attachment":[],"capabilities":{},"discoverable":false,"endpoints":{"oauthAuthorizationEndpoint":"https://pleroma.karjalazet.se/oauth/authorize","oauthRegistrationEndpoint":"https://pleroma.karjalazet.se/api/v1/apps","oauthTokenEndpoint":"https://pleroma.karjalazet.se/oauth/token","sharedInbox":"https://pleroma.karjalazet.se/inbox","uploadMedia":"https://pleroma.karjalazet.se/api/ap/upload_media"},"featured":"https://pleroma.karjalazet.se/relay/collections/featured","followers":"https://pleroma.karjalazet.se/relay/followers","following":"https://pleroma.karjalazet.se/relay/following","id":"https://pleroma.karjalazet.se/relay","inbox":"https://pleroma.karjalazet.se/relay/inbox","manuallyApprovesFollowers":false,"name":null,"outbox":"https://pleroma.karjalazet.se/relay/outbox","preferredUsername":"relay","publicKey":{"id":"https://pleroma.karjalazet.se/relay#main-key","owner":"https://pleroma.karjalazet.se/relay","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n"},"summary":"","tag":[],"type":"Person","url":"https://pleroma.karjalazet.se/relay"}"#;
const ASONIX_DOG_KEY: &'static str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n";
const KARJALAZET_KEY: &'static str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n";
const ASONIX_DOG_ACTOR: &str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://w3id.org/security/v1",{"manuallyApprovesFollowers":"as:manuallyApprovesFollowers","toot":"http://joinmastodon.org/ns#","featured":{"@id":"toot:featured","@type":"@id"},"featuredTags":{"@id":"toot:featuredTags","@type":"@id"},"alsoKnownAs":{"@id":"as:alsoKnownAs","@type":"@id"},"movedTo":{"@id":"as:movedTo","@type":"@id"},"schema":"http://schema.org#","PropertyValue":"schema:PropertyValue","value":"schema:value","discoverable":"toot:discoverable","Device":"toot:Device","Ed25519Signature":"toot:Ed25519Signature","Ed25519Key":"toot:Ed25519Key","Curve25519Key":"toot:Curve25519Key","EncryptedMessage":"toot:EncryptedMessage","publicKeyBase64":"toot:publicKeyBase64","deviceId":"toot:deviceId","claim":{"@type":"@id","@id":"toot:claim"},"fingerprintKey":{"@type":"@id","@id":"toot:fingerprintKey"},"identityKey":{"@type":"@id","@id":"toot:identityKey"},"devices":{"@type":"@id","@id":"toot:devices"},"messageFranking":"toot:messageFranking","messageType":"toot:messageType","cipherText":"toot:cipherText","suspended":"toot:suspended"}],"id":"https://masto.asonix.dog/actor","type":"Application","inbox":"https://masto.asonix.dog/actor/inbox","outbox":"https://masto.asonix.dog/actor/outbox","preferredUsername":"masto.asonix.dog","url":"https://masto.asonix.dog/about/more?instance_actor=true","manuallyApprovesFollowers":true,"publicKey":{"id":"https://masto.asonix.dog/actor#main-key","owner":"https://masto.asonix.dog/actor","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n"},"endpoints":{"sharedInbox":"https://masto.asonix.dog/inbox"}}"#;
const KARJALAZET_RELAY: &str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://pleroma.karjalazet.se/schemas/litepub-0.1.jsonld",{"@language":"und"}],"alsoKnownAs":[],"attachment":[],"capabilities":{},"discoverable":false,"endpoints":{"oauthAuthorizationEndpoint":"https://pleroma.karjalazet.se/oauth/authorize","oauthRegistrationEndpoint":"https://pleroma.karjalazet.se/api/v1/apps","oauthTokenEndpoint":"https://pleroma.karjalazet.se/oauth/token","sharedInbox":"https://pleroma.karjalazet.se/inbox","uploadMedia":"https://pleroma.karjalazet.se/api/ap/upload_media"},"featured":"https://pleroma.karjalazet.se/relay/collections/featured","followers":"https://pleroma.karjalazet.se/relay/followers","following":"https://pleroma.karjalazet.se/relay/following","id":"https://pleroma.karjalazet.se/relay","inbox":"https://pleroma.karjalazet.se/relay/inbox","manuallyApprovesFollowers":false,"name":null,"outbox":"https://pleroma.karjalazet.se/relay/outbox","preferredUsername":"relay","publicKey":{"id":"https://pleroma.karjalazet.se/relay#main-key","owner":"https://pleroma.karjalazet.se/relay","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n"},"summary":"","tag":[],"type":"Person","url":"https://pleroma.karjalazet.se/relay"}"#;
const ASONIX_DOG_KEY: &str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n";
const KARJALAZET_KEY: &str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n";
#[test]
fn handles_masto_keys() {
println!("{}", ASONIX_DOG_KEY);
println!("{ASONIX_DOG_KEY}");
let _ = RsaPublicKey::from_public_key_pem(ASONIX_DOG_KEY.trim()).unwrap();
}
#[test]
fn handles_pleromo_keys() {
println!("{}", KARJALAZET_KEY);
println!("{KARJALAZET_KEY}");
let _ = RsaPublicKey::from_public_key_pem(KARJALAZET_KEY.trim()).unwrap();
}

View File

@ -1,13 +1,20 @@
use crate::error::{Error, ErrorKind};
use crate::{
data::LastOnline,
error::{Error, ErrorKind},
};
use activitystreams::iri_string::types::IriString;
use actix_web::http::header::Date;
use awc::{error::SendRequestError, Client, ClientResponse};
use base64::{engine::general_purpose::STANDARD, Engine};
use dashmap::DashMap;
use http_signature_normalization_actix::prelude::*;
use rand::thread_rng;
use rsa::{pkcs1v15::SigningKey, RsaPrivateKey};
use sha2::{Digest, Sha256};
use signature::RandomizedSigner;
use rsa::{
pkcs1v15::SigningKey,
sha2::{Digest, Sha256},
signature::RandomizedSigner,
RsaPrivateKey,
};
use std::{
cell::RefCell,
rc::Rc,
@ -54,7 +61,7 @@ impl Breakers {
if let Some(mut breaker) = self.inner.get_mut(authority) {
breaker.fail();
if !breaker.should_try() {
tracing::warn!("Failed breaker for {}", authority);
tracing::warn!("Failed breaker for {authority}");
}
false
} else {
@ -146,6 +153,7 @@ pub(crate) struct Requests {
private_key: RsaPrivateKey,
config: Config,
breakers: Breakers,
last_online: Arc<LastOnline>,
}
impl std::fmt::Debug for Requests {
@ -174,6 +182,7 @@ impl Requests {
private_key: RsaPrivateKey,
user_agent: String,
breakers: Breakers,
last_online: Arc<LastOnline>,
) -> Self {
Requests {
client: Rc::new(RefCell::new(build_client(&user_agent))),
@ -184,6 +193,7 @@ impl Requests {
private_key,
config: Config::default().mastodon_compat(),
breakers,
last_online,
}
}
@ -219,13 +229,13 @@ impl Requests {
self.reset_err();
if !res.status().is_success() {
if res.status().is_server_error() {
self.breakers.fail(&parsed_url);
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
tracing::warn!("Response from {}, {}", parsed_url, s);
tracing::warn!("Response from {parsed_url}, {s}");
}
}
}
@ -233,58 +243,55 @@ impl Requests {
return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into());
}
self.last_online.mark_seen(&parsed_url);
self.breakers.succeed(&parsed_url);
Ok(res)
}
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, Error>
pub(crate) async fn fetch_json<T>(&self, url: &IriString) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
self.do_fetch(url, "application/json").await
}
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch_json_msky<T>(&self, url: &IriString) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let mut res = self
.do_deliver(
url,
&serde_json::json!({}),
"application/json",
"application/json",
)
.await?;
let body = res
.body()
.await
.map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?;
Ok(serde_json::from_slice(body.as_ref())?)
}
#[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, Error>
pub(crate) async fn fetch<T>(&self, url: &IriString) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
self.do_fetch(url, "application/activity+json").await
}
async fn do_fetch<T>(&self, url: &str, accept: &str) -> Result<T, Error>
async fn do_fetch<T>(&self, url: &IriString, accept: &str) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let parsed_url = url.parse::<IriString>()?;
if !self.breakers.should_try(&parsed_url) {
return Err(ErrorKind::Breaker.into());
}
let signer = self.signer();
let span = tracing::Span::current();
let client: Client = self.client.borrow().clone();
let res = client
.get(url)
.insert_header(("Accept", accept))
.insert_header(Date(SystemTime::now().into()))
.signature(
self.config.clone(),
self.key_id.clone(),
move |signing_string| {
span.record("signing_string", signing_string);
span.in_scope(|| signer.sign(signing_string))
},
)
.await?
.send()
.await;
let mut res = self.check_response(&parsed_url, res).await?;
let mut res = self.do_fetch_response(url, accept).await?;
let body = res
.body()
@ -295,8 +302,16 @@ impl Requests {
}
#[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))]
pub(crate) async fn fetch_response(&self, url: IriString) -> Result<ClientResponse, Error> {
if !self.breakers.should_try(&url) {
pub(crate) async fn fetch_response(&self, url: &IriString) -> Result<ClientResponse, Error> {
self.do_fetch_response(url, "*/*").await
}
pub(crate) async fn do_fetch_response(
&self,
url: &IriString,
accept: &str,
) -> Result<ClientResponse, Error> {
if !self.breakers.should_try(url) {
return Err(ErrorKind::Breaker.into());
}
@ -306,7 +321,7 @@ impl Requests {
let client: Client = self.client.borrow().clone();
let res = client
.get(url.as_str())
.insert_header(("Accept", "*/*"))
.insert_header(("Accept", accept))
.insert_header(Date(SystemTime::now().into()))
.no_decompress()
.signature(
@ -321,7 +336,7 @@ impl Requests {
.send()
.await;
let res = self.check_response(&url, res).await?;
let res = self.check_response(url, res).await?;
Ok(res)
}
@ -331,7 +346,27 @@ impl Requests {
skip_all,
fields(inbox = inbox.to_string().as_str(), signing_string)
)]
pub(crate) async fn deliver<T>(&self, inbox: IriString, item: &T) -> Result<(), Error>
pub(crate) async fn deliver<T>(&self, inbox: &IriString, item: &T) -> Result<(), Error>
where
T: serde::ser::Serialize + std::fmt::Debug,
{
self.do_deliver(
inbox,
item,
"application/activity+json",
"application/activity+json",
)
.await?;
Ok(())
}
async fn do_deliver<T>(
&self,
inbox: &IriString,
item: &T,
content_type: &str,
accept: &str,
) -> Result<ClientResponse, Error>
where
T: serde::ser::Serialize + std::fmt::Debug,
{
@ -346,8 +381,8 @@ impl Requests {
let client: Client = self.client.borrow().clone();
let (req, body) = client
.post(inbox.as_str())
.insert_header(("Accept", "application/activity+json"))
.insert_header(("Content-Type", "application/activity+json"))
.insert_header(("Accept", accept))
.insert_header(("Content-Type", content_type))
.insert_header(Date(SystemTime::now().into()))
.signature_with_digest(
self.config.clone(),
@ -364,9 +399,9 @@ impl Requests {
let res = req.send_body(body).await;
self.check_response(&inbox, res).await?;
let res = self.check_response(inbox, res).await?;
Ok(())
Ok(res)
}
fn signer(&self) -> Signer {
@ -383,7 +418,8 @@ struct Signer {
impl Signer {
fn sign(&self, signing_string: &str) -> Result<String, Error> {
let signing_key = SigningKey::<Sha256>::new_with_prefix(self.private_key.clone());
let signature = signing_key.try_sign_with_rng(thread_rng(), signing_string.as_bytes())?;
Ok(base64::encode(signature.as_ref()))
let signature =
signing_key.try_sign_with_rng(&mut thread_rng(), signing_string.as_bytes())?;
Ok(STANDARD.encode(signature.as_ref()))
}
}

View File

@ -1,4 +1,5 @@
mod actor;
mod healthz;
mod inbox;
mod index;
mod media;
@ -7,6 +8,7 @@ mod statics;
pub(crate) use self::{
actor::route as actor,
healthz::route as healthz,
inbox::route as inbox,
index::route as index,
media::route as media,

7
src/routes/healthz.rs Normal file
View File

@ -0,0 +1,7 @@
use crate::{data::State, error::Error};
use actix_web::{web, HttpResponse};
pub(crate) async fn route(state: web::Data<State>) -> Result<HttpResponse, Error> {
state.db.check_health().await?;
Ok(HttpResponse::Ok().finish())
}

View File

@ -16,7 +16,8 @@ use activitystreams::{
use actix_web::{web, HttpResponse};
use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified};
#[tracing::instrument(name = "Inbox", skip_all)]
#[tracing::instrument(name = "Inbox", skip_all, fields(id = tracing::field::debug(&input.id_unchecked()), kind = tracing::field::debug(&input.kind())))]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn route(
state: web::Data<State>,
actors: web::Data<ActorCache>,
@ -24,22 +25,48 @@ pub(crate) async fn route(
client: web::Data<Requests>,
jobs: web::Data<JobServer>,
input: web::Json<AcceptedActivities>,
verified: Option<(SignatureVerified, DigestVerified)>,
digest_verified: Option<DigestVerified>,
signature_verified: Option<SignatureVerified>,
) -> Result<HttpResponse, Error> {
let input = input.into_inner();
let actor = actors
.get(
input.actor()?.as_single_id().ok_or(ErrorKind::MissingId)?,
&client,
)
.await?
.into_inner();
let kind = input.kind().ok_or(ErrorKind::MissingKind)?;
let is_allowed = state.db.is_allowed(actor.id.clone());
let is_connected = state.db.is_connected(actor.id.clone());
if digest_verified.is_some() && signature_verified.is_none() && *kind == ValidTypes::Delete {
return Ok(accepted(serde_json::json!({})));
} else if config.validate_signatures()
&& (digest_verified.is_none() || signature_verified.is_none())
{
return Err(ErrorKind::NoSignature(None).into());
}
let (is_allowed, is_connected) = tokio::try_join!(is_allowed, is_connected)?;
let actor_id = if input.id_unchecked().is_some() {
input.actor()?.as_single_id().ok_or(ErrorKind::MissingId)?
} else {
input
.actor_unchecked()
.as_single_id()
.ok_or(ErrorKind::MissingId)?
};
let actor = actors.get(actor_id, &client).await?.into_inner();
if let Some(verified) = signature_verified {
if actor.public_key_id.as_str() != verified.key_id() {
tracing::error!("Actor signed with wrong key");
return Err(ErrorKind::BadActor(
actor.public_key_id.to_string(),
verified.key_id().to_owned(),
)
.into());
}
} else if config.validate_signatures() {
tracing::error!("This case should never be reachable, since I handle signature checks earlier in the flow. If you see this in a log it means I did it wrong");
return Err(ErrorKind::NoSignature(Some(actor.public_key_id.to_string())).into());
}
let is_allowed = state.db.is_allowed(actor.id.clone()).await?;
let is_connected = state.db.is_connected(actor.id.clone()).await?;
if !is_allowed {
return Err(ErrorKind::NotAllowed(actor.id.to_string()).into());
@ -49,29 +76,16 @@ pub(crate) async fn route(
return Err(ErrorKind::NotSubscribed(actor.id.to_string()).into());
}
if config.validate_signatures() && verified.is_none() {
return Err(ErrorKind::NoSignature(actor.public_key_id.to_string()).into());
} else if config.validate_signatures() {
if let Some((verified, _)) = verified {
if actor.public_key_id.as_str() != verified.key_id() {
tracing::error!("Bad actor, more info: {:?}", input);
return Err(ErrorKind::BadActor(
actor.public_key_id.to_string(),
verified.key_id().to_owned(),
)
.into());
}
}
}
match input.kind().ok_or(ErrorKind::MissingKind)? {
match kind {
ValidTypes::Accept => handle_accept(&config, input).await?,
ValidTypes::Reject => handle_reject(&config, &jobs, input, actor).await?,
ValidTypes::Announce | ValidTypes::Create => {
handle_announce(&state, &jobs, input, actor).await?
}
ValidTypes::Follow => handle_follow(&config, &jobs, input, actor).await?,
ValidTypes::Delete | ValidTypes::Update => handle_forward(&jobs, input, actor).await?,
ValidTypes::Add | ValidTypes::Delete | ValidTypes::Remove | ValidTypes::Update => {
handle_forward(&jobs, input, actor).await?
}
ValidTypes::Undo => handle_undo(&config, &jobs, input, actor, is_connected).await?,
};
@ -203,7 +217,7 @@ async fn handle_announce(
.as_single_id()
.ok_or(ErrorKind::MissingId)?;
if state.is_cached(object_id).await {
if state.is_cached(object_id) {
return Err(ErrorKind::Duplicate.into());
}

View File

@ -71,7 +71,7 @@ pub(crate) async fn route(
let mut buf = BufWriter::new(Vec::new());
crate::templates::index(&mut buf, &local, &nodes, &config)?;
crate::templates::index_html(&mut buf, &local, &nodes, &config)?;
let html = buf.into_inner().map_err(|e| {
tracing::error!("Error rendering template, {}", e.error());
ErrorKind::FlushBuffer

View File

@ -11,7 +11,7 @@ pub(crate) async fn route(
let uuid = uuid.into_inner();
if let Some(url) = media.get_url(uuid).await? {
let res = requests.fetch_response(url).await?;
let res = requests.fetch_response(&url).await?;
let mut response = HttpResponse::build(res.status());

View File

@ -24,18 +24,18 @@ struct Links {
links: Vec<Link>,
}
#[tracing::instrument(name = "NodeInfo")]
#[tracing::instrument(name = "NodeInfo", skip_all)]
pub(crate) async fn route(
config: web::Data<Config>,
state: web::Data<State>,
) -> web::Json<NodeInfo> {
let (inboxes, blocks) = tokio::join!(state.db.inboxes(), async {
if config.publish_blocks() {
Some(state.db.blocks().await.unwrap_or_default())
} else {
None
}
});
let inboxes = state.db.inboxes().await;
let blocks = if config.publish_blocks() {
Some(state.db.blocks().await.unwrap_or_default())
} else {
None
};
let peers = inboxes
.unwrap_or_default()
@ -44,6 +44,8 @@ pub(crate) async fn route(
.map(|s| s.to_owned())
.collect();
let open_registrations = !config.restricted_mode();
web::Json(NodeInfo {
version: NodeInfoVersion,
software: Software {
@ -55,7 +57,7 @@ pub(crate) async fn route(
inbound: vec![],
outbound: vec![],
},
open_registrations: false,
open_registrations,
usage: Usage {
users: Users {
total: 1,

View File

@ -5,7 +5,7 @@ use actix_web::{
};
#[allow(clippy::async_yields_async)]
#[tracing::instrument(name = "Statistics")]
#[tracing::instrument(name = "Statics")]
pub(crate) async fn route(filename: web::Path<String>) -> HttpResponse {
if let Some(data) = StaticFile::get(&filename.into_inner()) {
HttpResponse::Ok()

View File

@ -89,19 +89,19 @@ async fn answer(bot: Bot, msg: Message, cmd: Command, db: Db) -> ResponseResult<
.await?;
}
Command::Block { domain } if db.add_blocks(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{} has been blocked", domain))
bot.send_message(msg.chat.id, format!("{domain} has been blocked"))
.await?;
}
Command::Unblock { domain } if db.remove_blocks(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{} has been unblocked", domain))
bot.send_message(msg.chat.id, format!("{domain} has been unblocked"))
.await?;
}
Command::Allow { domain } if db.add_allows(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{} has been allowed", domain))
bot.send_message(msg.chat.id, format!("{domain} has been allowed"))
.await?;
}
Command::Disallow { domain } if db.remove_allows(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{} has been disallowed", domain))
bot.send_message(msg.chat.id, format!("{domain} has been disallowed"))
.await?;
}
Command::ListAllowed => {

View File

@ -0,0 +1,15 @@
[Unit]
Description=Activitypub Relay
Documentation=https://git.asonix.dog/asonix/relay
Wants=network.target
After=network.target
[Install]
WantedBy=multi-user.target
[Service]
Type=simple
EnvironmentFile=/etc/systemd/system/example-relay.service.env
ExecStart=/path/to/relay
Restart=always

View File

@ -0,0 +1,19 @@
HOSTNAME='relay.example.com'
ADDR='0.0.0.0'
PORT='8080'
RESTRICTED_MODE='true'
VALIDATE_SIGNATURES='true'
HTTPS='true'
PRETTY_LOG='false'
PUBLISH_BLOCKS='true'
DEBUG='false'
SLED_PATH='/opt/sled'
TELEGRAM_ADMIN_HANDLE='myhandle'
RUST_BACKTRACE='full'
FOOTER_BLURB='Contact <a href="https://masto.example.com/@example">@example</a> for inquiries.'
LOCAL_DOMAINS='masto.example.com'
LOCAL_BLURB='<p>An ActivityPub relay for servers. Currently running somewhere. Let me know if you want to join!</p>'
OPENTELEMETRY_URL='http://otel.example.com:4317'
API_TOKEN='blahblahblahblahblahblahblah'
TELEGRAM_TOKEN='blahblahblahblahblahblahblah'

View File

@ -0,0 +1,11 @@
[Unit]
Description=Activitypub Relay Socket
Before=multi-user.target
After=network.target
[Socket]
Service=example-relay.service
ListenStream=8080
[Install]
WantedBy=sockets.target

View File

@ -6,7 +6,7 @@
<div class="admin">
<div class="left">
<figure class="avatar">
<img src="@contact.avatar" alt="@contact.display_name's avatar">
<img loading="lazy" src="@contact.avatar" alt="@contact.display_name's avatar">
</figure>
</div>
<div class="right">

View File

@ -1,7 +1,7 @@
@use crate::{
config::{Config, UrlKind},
data::Node,
templates::{info, instance, statics::index_css},
templates::{info_html, instance_html, statics::index_css},
};
@(local: &[Node], nodes: &[Node], config: &Config)
@ -9,7 +9,7 @@ templates::{info, instance, statics::index_css},
<!doctype html>
<html>
<head lang="en">
<head lang="fr">
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>@config.hostname() | Relais ActivityPub</title>
@ -39,13 +39,13 @@ templates::{info, instance, statics::index_css},
@for node in local {
@if let Some(inst) = node.instance.as_ref() {
<li>
@:instance(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(),
@:instance_html(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(),
&node.base)
</li>
} else {
@if let Some(inf) = node.info.as_ref() {
<li>
@:info(inf, &node.base)
@:info_html(inf, &node.base)
</li>
}
}
@ -94,13 +94,13 @@ templates::{info, instance, statics::index_css},
@for node in nodes {
@if let Some(inst) = node.instance.as_ref() {
<li>
@:instance(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(),
@:instance_html(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(),
&node.base)
</li>
} else {
@if let Some(inf) = node.info.as_ref() {
<li>
@:info(inf, &node.base)
@:info_html(inf, &node.base)
</li>
}
}

View File

@ -1,4 +1,4 @@
@use crate::{db::{Contact, Instance}, templates::admin};
@use crate::{db::{Contact, Instance}, templates::admin_html};
@use activitystreams::iri_string::types::IriString;
@(instance: &Instance, software: Option<&str>, contact: Option<&Contact>, base: &IriString)
@ -32,8 +32,8 @@
}
</div>
@if let Some(contact) = contact {
<h5 class="instance-admin">Administré par:</h5>
@:admin(contact, base)
<h5 class="instance-admin">Administré par:</h5>
@:admin_html(contact, base)
}
}
</section>