Compare commits

..

No commits in common. "c5112cb9bb899323dea90b9b5a93d00bf4f09414" and "4e5e257f24a0197925fdd1a18cd63e80b76fd019" have entirely different histories.

52 changed files with 1407 additions and 1967 deletions

2
.env
View File

@ -9,5 +9,3 @@ FOOTER_BLURB="Opéré par <a href=\"https://mastodon.xolus.net/@max\">@max</a>"
LOCAL_DOMAINS="xolus.net"
LOCAL_BLURB="<p>Relais ActivityPub francophone</p>"
# OPENTELEMETRY_URL=http://localhost:4317
PROMETHEUS_ADDR=127.0.0.1
PROMETHEUS_PORT=9000

998
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,7 @@
[package]
name = "ap-relay"
description = "A simple activitypub relay"
version = "0.3.82"
version = "0.3.66"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"
@ -29,23 +29,20 @@ actix-web = { version = "4.0.1", default-features = false, features = [
"compress-gzip",
] }
actix-webfinger = "0.4.0"
activitystreams = "0.7.0-alpha.21"
activitystreams-ext = "0.1.0-alpha.3"
activitystreams = "0.7.0-alpha.19"
activitystreams-ext = "0.1.0-alpha.2"
ammonia = "3.1.0"
awc = { version = "3.0.0", default-features = false, features = ["rustls"] }
bcrypt = "0.14"
base64 = "0.21"
bcrypt = "0.13"
base64 = "0.13"
clap = { version = "4.0.0", features = ["derive"] }
config = "0.13.0"
console-subscriber = { version = "0.1", optional = true }
dashmap = "5.1.0"
dotenv = "0.15.0"
futures-util = "0.3.17"
lru = "0.9.0"
lru = "0.8.0"
metrics = "0.20.1"
metrics-exporter-prometheus = { version = "0.11.0", default-features = false, features = [
"http-listener",
] }
metrics-util = "0.14.0"
mime = "0.3.16"
minify-html = "0.10.0"
@ -54,20 +51,21 @@ opentelemetry-otlp = "0.11"
pin-project-lite = "0.2.9"
quanta = "0.10.1"
rand = "0.8"
rsa = { version = "0.8", features = ["sha2"] }
rsa-magic-public-key = "0.7.0"
rsa = "0.7"
rsa-magic-public-key = "0.6.0"
rustls = "0.20.7"
rustls-pemfile = "1.0.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sha2 = { version = "0.10", features = ["oid"] }
signature = "1.6.4"
sled = "0.34.7"
teloxide = { version = "0.12.0", default-features = false, features = [
teloxide = { version = "0.11.1", default-features = false, features = [
"ctrlc_handler",
"macros",
"rustls",
] }
thiserror = "1.0"
time = { version = "0.3.17", features = ["serde"] }
tracing = "0.1"
tracing-awc = "0.1.6"
tracing-error = "0.2"
@ -88,18 +86,18 @@ default-features = false
features = ["background-jobs-actix", "error-logging"]
[dependencies.http-signature-normalization-actix]
version = "0.8.0"
version = "0.6.0"
default-features = false
features = ["client", "server", "sha-2"]
[dependencies.tracing-actix-web]
version = "0.7.0"
version = "0.6.1"
[build-dependencies]
anyhow = "1.0"
dotenv = "0.15.0"
ructe = { version = "0.16.0", features = ["sass", "mime03"] }
toml = "0.7.0"
ructe = { version = "0.15.0", features = ["sass", "mime03"] }
toml = "0.5.8"
[profile.dev.package.rsa]
opt-level = 3

View File

@ -6,11 +6,11 @@ _A simple and efficient activitypub relay_
If running docker, you can start the relay with the following command:
```
$ sudo docker run --rm -it \
-v "$(pwd):/mnt/" \
-v "./:/mnt/" \
-e ADDR=0.0.0.0 \
-e SLED_PATH=/mnt/sled/db-0.34 \
-p 8080:8080 \
asonix/relay:0.3.78
asonix/relay:0.3.52
```
This will launch the relay with the database stored in "./sled/db-0.34" and listening on port 8080
#### Cargo
@ -103,8 +103,6 @@ TLS_CERT=/path/to/cert
FOOTER_BLURB="Contact <a href=\"https://masto.asonix.dog/@asonix\">@asonix</a> for inquiries"
LOCAL_DOMAINS=masto.asonix.dog
LOCAL_BLURB="<p>Welcome to my cool relay where I have cool relay things happening. I hope you enjoy your stay!</p>"
PROMETHEUS_ADDR=0.0.0.0
PROMETHEUS_PORT=9000
```
#### Descriptions
@ -130,8 +128,6 @@ Where to store the on-disk database of connected servers. This defaults to `./sl
The log level to print. Available levels are `ERROR`, `WARN`, `INFO`, `DEBUG`, and `TRACE`. You can also specify module paths to enable some logs but not others, such as `RUST_LOG=warn,tracing_actix_web=info,relay=info`. This defaults to `warn`
##### `SOURCE_REPO`
The URL to the source code for the relay. This defaults to `https://git.asonix.dog/asonix/relay`, but should be changed if you're running a fork hosted elsewhere.
##### `REPOSITORY_COMMIT_BASE`
The base path of the repository commit hash reference. For example, `/src/commit/` for Gitea, `/tree/` for GitLab.
##### `API_TOKEN`
The Secret token used to access the admin APIs. This must be set for the commandline to function
##### `OPENTELEMETRY_URL`
@ -150,10 +146,6 @@ Optional - Add custom notes in the footer of the page
Optional - domains of mastodon servers run by the same admin as the relay
##### `LOCAL_BLURB`
Optional - description for the relay
##### `PROMETHEUS_ADDR`
Optional - Address to bind to for serving the prometheus scrape endpoint
##### `PROMETHEUS_PORT`
Optional - Port to bind to for serving the prometheus scrape endpoint
### Subscribing
Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings.
@ -173,16 +165,10 @@ example, if the server is `https://relay.my.tld`, the correct URL would be
- Follow Public, become a listener of the relay
- Undo Follow {self-actor}, stop listening on the relay, an Undo Follow will be sent back
- Undo Follow Public, stop listening on the relay
- Delete {anything}, the Delete {anything} is relayed verbatim to listening servers.
- Delete {anything}, the Delete {anything} is relayed verbatim to listening servers
Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature
- Update {anything}, the Update {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature
- Add {anything}, the Add {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature
- Remove {anything}, the Remove {anything} is relayed verbatim to listening servers.
- Update {anything}, the Update {anything} is relayed verbatim to listening servers
Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature
@ -190,9 +176,6 @@ example, if the server is `https://relay.my.tld`, the correct URL would be
- Webfinger
- NodeInfo
### Known issues
Pleroma and Akkoma do not support validating JSON-LD signatures, meaning many activities such as Delete, Update, Add, and Remove will be rejected with a message similar to `WARN: Response from https://example.com/inbox, "Invalid HTTP Signature"`. This is normal and not an issue with the relay.
### Contributing
Feel free to open issues for anything you find an issue with. Please note that any contributed code will be licensed under the AGPLv3.

41
docker/prod/Dockerfile Normal file
View File

@ -0,0 +1,41 @@
ARG REPO_ARCH=amd64
# cross-build environment
FROM asonix/rust-builder:$REPO_ARCH-latest AS builder
ARG TAG=main
ARG BINARY=relay
ARG PROJECT=relay
ARG GIT_REPOSITORY=https://git.asonix.dog/asonix/$PROJECT
ENV \
BINARY=${BINARY}
ADD \
--chown=build:build \
$GIT_REPOSITORY/archive/$TAG.tar.gz \
/opt/build/repo.tar.gz
RUN \
tar zxf repo.tar.gz
WORKDIR /opt/build/$PROJECT
RUN \
build
# production environment
FROM asonix/rust-runner:$REPO_ARCH-latest
ARG BINARY=relay
ENV \
BINARY=${BINARY}
COPY \
--from=builder \
/opt/build/binary \
/usr/bin/${BINARY}
ENTRYPOINT ["/sbin/tini", "--"]
CMD /usr/bin/${BINARY}

37
docker/prod/build-image.sh Executable file
View File

@ -0,0 +1,37 @@
#!/usr/bin/env bash
function require() {
if [ "$1" = "" ]; then
echo "input '$2' required"
print_help
exit 1
fi
}
function print_help() {
echo "deploy.sh"
echo ""
echo "Usage:"
echo " deploy.sh [repo] [tag] [arch]"
echo ""
echo "Args:"
echo " repo: The docker repository to publish the image"
echo " tag: The tag applied to the docker image"
echo " arch: The architecuture of the doker image"
}
REPO=$1
TAG=$2
ARCH=$3
require "$REPO" repo
require "$TAG" tag
require "$ARCH" arch
sudo docker build \
--pull \
--build-arg TAG=$TAG \
--build-arg REPO_ARCH=$ARCH \
-t $REPO:$ARCH-$TAG \
-f Dockerfile \
.

87
docker/prod/deploy.sh Executable file
View File

@ -0,0 +1,87 @@
#!/usr/bin/env bash
function require() {
if [ "$1" = "" ]; then
echo "input '$2' required"
print_help
exit 1
fi
}
function print_help() {
echo "deploy.sh"
echo ""
echo "Usage:"
echo " deploy.sh [tag] [branch] [push]"
echo ""
echo "Args:"
echo " tag: The git tag to be applied to the repository and docker build"
echo " branch: The git branch to use for tagging and publishing"
echo " push: Whether or not to push the image"
echo ""
echo "Examples:"
echo " ./deploy.sh v0.3.0-alpha.13 main true"
echo " ./deploy.sh v0.3.0-alpha.13-shell-out asonix/shell-out false"
}
function build_image() {
tag=$1
arch=$2
push=$3
./build-image.sh asonix/relay $tag $arch
sudo docker tag asonix/relay:$arch-$tag asonix/relay:$arch-latest
if [ "$push" == "true" ]; then
sudo docker push asonix/relay:$arch-$tag
sudo docker push asonix/relay:$arch-latest
fi
}
# Creating the new tag
new_tag="$1"
branch="$2"
push=$3
require "$new_tag" "tag"
require "$branch" "branch"
require "$push" "push"
if ! sudo docker run --rm -it arm64v8/alpine:3.11 /bin/sh -c 'echo "docker is configured correctly"'
then
echo "docker is not configured to run on qemu-emulated architectures, fixing will require sudo"
sudo docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
fi
set -xe
git checkout $branch
# Changing the docker-compose prod
sed -i "s/asonix\/relay:.*/asonix\/relay:$new_tag/" docker-compose.yml
git add ../prod/docker-compose.yml
# The commit
git commit -m"Version $new_tag"
git tag $new_tag
# Push
git push origin $new_tag
git push
# Build for arm64v8, arm32v7 and amd64
build_image $new_tag arm64v8 $push
build_image $new_tag arm32v7 $push
build_image $new_tag amd64 $push
# Build for other archs
# TODO
if [ "$push" == "true" ]; then
./manifest.sh relay $new_tag
./manifest.sh relay latest
# pushd ../../
# cargo publish
# popd
fi

View File

@ -2,7 +2,7 @@ version: '3.3'
services:
relay:
image: asonix/relay:v0.3.73
image: asonix/relay:v0.3.8
ports:
- "8079:8079"
restart: always
@ -14,7 +14,6 @@ services:
- RESTRICTED_MODE=false
- VALIDATE_SIGNATURES=true
- HTTPS=true
- SLED_PATH=/mnt/sled/db-0.34
- DATABASE_URL=postgres://pg_user:pg_pass@pg_host:pg_port/pg_database
- PRETTY_LOG=false
- PUBLISH_BLOCKS=true
- API_TOKEN=somepasswordishtoken

43
docker/prod/manifest.sh Executable file
View File

@ -0,0 +1,43 @@
#!/usr/bin/env bash
function require() {
if [ "$1" = "" ]; then
echo "input '$2' required"
print_help
exit 1
fi
}
function print_help() {
echo "deploy.sh"
echo ""
echo "Usage:"
echo " manifest.sh [repo] [tag]"
echo ""
echo "Args:"
echo " repo: The docker repository to update"
echo " tag: The git tag to be applied to the image manifest"
}
REPO=$1
TAG=$2
require "$REPO" "repo"
require "$TAG" "tag"
set -xe
sudo docker manifest create asonix/$REPO:$TAG \
-a asonix/$REPO:arm64v8-$TAG \
-a asonix/$REPO:arm32v7-$TAG \
-a asonix/$REPO:amd64-$TAG
sudo docker manifest annotate asonix/$REPO:$TAG \
asonix/$REPO:arm64v8-$TAG --os linux --arch arm64 --variant v8
sudo docker manifest annotate asonix/$REPO:$TAG \
asonix/$REPO:arm32v7-$TAG --os linux --arch arm --variant v7
sudo docker manifest annotate asonix/$REPO:$TAG \
asonix/$REPO:amd64-$TAG --os linux --arch amd64
sudo docker manifest push asonix/$REPO:$TAG --purge

View File

@ -1,6 +1,4 @@
use activitystreams::iri_string::types::IriString;
use std::collections::{BTreeMap, BTreeSet};
use time::OffsetDateTime;
pub mod client;
pub mod routes;
@ -24,9 +22,3 @@ pub(crate) struct BlockedDomains {
pub(crate) struct ConnectedActors {
pub(crate) connected_actors: Vec<IriString>,
}
#[derive(serde::Deserialize, serde::Serialize)]
pub(crate) struct LastSeen {
pub(crate) last_seen: BTreeMap<OffsetDateTime, BTreeSet<String>>,
pub(crate) never: Vec<String>,
}

View File

@ -1,5 +1,5 @@
use crate::{
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains, LastSeen},
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains},
collector::Snapshot,
config::{AdminUrlKind, Config},
error::{Error, ErrorKind},
@ -55,10 +55,6 @@ pub(crate) async fn stats(client: &Client, config: &Config) -> Result<Snapshot,
get_results(client, config, AdminUrlKind::Stats).await
}
pub(crate) async fn last_seen(client: &Client, config: &Config) -> Result<LastSeen, Error> {
get_results(client, config, AdminUrlKind::LastSeen).await
}
async fn get_results<T: DeserializeOwned>(
client: &Client,
config: &Config,

View File

@ -1,5 +1,5 @@
use crate::{
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains, LastSeen},
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains},
collector::{MemoryCollector, Snapshot},
error::Error,
extractors::Admin,
@ -8,8 +8,6 @@ use actix_web::{
web::{Data, Json},
HttpResponse,
};
use std::collections::{BTreeMap, BTreeSet};
use time::OffsetDateTime;
pub(crate) async fn allow(
admin: Admin,
@ -71,20 +69,3 @@ pub(crate) async fn stats(
) -> Result<Json<Snapshot>, Error> {
Ok(Json(collector.snapshot()))
}
pub(crate) async fn last_seen(admin: Admin) -> Result<Json<LastSeen>, Error> {
let nodes = admin.db_ref().last_seen().await?;
let mut last_seen: BTreeMap<OffsetDateTime, BTreeSet<String>> = BTreeMap::new();
let mut never = Vec::new();
for (domain, datetime) in nodes {
if let Some(datetime) = datetime {
last_seen.entry(datetime).or_default().insert(domain);
} else {
never.push(domain);
}
}
Ok(Json(LastSeen { last_seen, never }))
}

View File

@ -34,13 +34,11 @@ pub struct PublicKey {
#[serde(rename_all = "PascalCase")]
pub enum ValidTypes {
Accept,
Add,
Announce,
Create,
Delete,
Follow,
Reject,
Remove,
Undo,
Update,
}

View File

@ -17,22 +17,11 @@ pub(crate) struct Args {
#[arg(short, long, help = "Get statistics from the server")]
stats: bool,
#[arg(
short,
long,
help = "List domains by when they were last succesfully contacted"
)]
contacted: bool,
}
impl Args {
pub(crate) fn any(&self) -> bool {
!self.blocks.is_empty()
|| !self.allowed.is_empty()
|| self.list
|| self.stats
|| self.contacted
!self.blocks.is_empty() || !self.allowed.is_empty() || self.list || self.stats
}
pub(crate) fn new() -> Self {
@ -58,8 +47,4 @@ impl Args {
pub(crate) fn stats(&self) -> bool {
self.stats
}
pub(crate) fn contacted(&self) -> bool {
self.contacted
}
}

View File

@ -5,8 +5,7 @@ fn git_info() {
if let Ok(output) = Command::new("git").args(["rev-parse", "HEAD"]).output() {
if output.status.success() {
let git_hash = String::from_utf8_lossy(&output.stdout);
println!("cargo:rustc-env=GIT_HASH={git_hash}");
println!("cargo:rustc-env=GIT_SHORT_HASH={}", &git_hash[..8])
println!("cargo:rustc-env=GIT_HASH={}", git_hash);
}
}
@ -16,7 +15,7 @@ fn git_info() {
{
if output.status.success() {
let git_branch = String::from_utf8_lossy(&output.stdout);
println!("cargo:rustc-env=GIT_BRANCH={git_branch}");
println!("cargo:rustc-env=GIT_BRANCH={}", git_branch);
}
}
}
@ -24,7 +23,7 @@ fn git_info() {
fn version_info() -> Result<(), anyhow::Error> {
let cargo_toml = Path::new(&std::env::var("CARGO_MANIFEST_DIR")?).join("Cargo.toml");
let mut file = File::open(cargo_toml)?;
let mut file = File::open(&cargo_toml)?;
let mut cargo_data = String::new();
file.read_to_string(&mut cargo_data)?;
@ -32,11 +31,11 @@ fn version_info() -> Result<(), anyhow::Error> {
let data: toml::Value = toml::from_str(&cargo_data)?;
if let Some(version) = data["package"]["version"].as_str() {
println!("cargo:rustc-env=PKG_VERSION={version}");
println!("cargo:rustc-env=PKG_VERSION={}", version);
}
if let Some(name) = data["package"]["name"].as_str() {
println!("cargo:rustc-env=PKG_NAME={name}");
println!("cargo:rustc-env=PKG_NAME={}", name);
}
Ok(())

View File

@ -1,5 +1,414 @@
mod double;
mod stats;
use metrics::{Key, Recorder, SetRecorderError};
use metrics_util::{
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
MetricKindMask, Summary,
};
use quanta::Clock;
use std::{
collections::{BTreeMap, HashMap},
sync::{atomic::Ordering, Arc, RwLock},
time::Duration,
};
pub(crate) use double::DoubleRecorder;
pub(crate) use stats::{MemoryCollector, Snapshot};
const SECONDS: u64 = 1;
const MINUTES: u64 = 60 * SECONDS;
const HOURS: u64 = 60 * MINUTES;
const DAYS: u64 = 24 * HOURS;
type DistributionMap = BTreeMap<Vec<(String, String)>, Summary>;
#[derive(Clone)]
pub struct MemoryCollector {
inner: Arc<Inner>,
}
struct Inner {
descriptions: RwLock<HashMap<String, metrics::SharedString>>,
distributions: RwLock<HashMap<String, DistributionMap>>,
recency: Recency<Key>,
registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Counter {
labels: BTreeMap<String, String>,
value: u64,
}
impl std::fmt::Display for Counter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ");
write!(f, "{} - {}", labels, self.value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Gauge {
labels: BTreeMap<String, String>,
value: f64,
}
impl std::fmt::Display for Gauge {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ");
write!(f, "{} - {}", labels, self.value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Histogram {
labels: BTreeMap<String, String>,
value: Vec<(f64, Option<f64>)>,
}
impl std::fmt::Display for Histogram {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ");
let value = self
.value
.iter()
.map(|(k, v)| {
if let Some(v) = v {
format!("{}: {:.6}", k, v)
} else {
format!("{}: None,", k)
}
})
.collect::<Vec<_>>()
.join(", ");
write!(f, "{} - {}", labels, value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Snapshot {
counters: HashMap<String, Vec<Counter>>,
gauges: HashMap<String, Vec<Gauge>>,
histograms: HashMap<String, Vec<Histogram>>,
}
const PAIRS: [((&str, &str), &str); 2] = [
(
(
"background-jobs.worker.started",
"background-jobs.worker.finished",
),
"background-jobs.worker.running",
),
(
(
"background-jobs.job.started",
"background-jobs.job.finished",
),
"background-jobs.job.running",
),
];
#[derive(Default)]
struct MergeCounter {
start: Option<Counter>,
finish: Option<Counter>,
}
impl MergeCounter {
fn merge(self) -> Option<Counter> {
match (self.start, self.finish) {
(Some(start), Some(end)) => Some(Counter {
labels: start.labels,
value: start.value.saturating_sub(end.value),
}),
(Some(only), None) => Some(only),
(None, Some(only)) => Some(Counter {
labels: only.labels,
value: 0,
}),
(None, None) => None,
}
}
}
impl Snapshot {
pub(crate) fn present(self) {
if !self.counters.is_empty() {
println!("Counters");
let mut merging = HashMap::new();
for (key, counters) in self.counters {
if let Some(((start, _), name)) = PAIRS
.iter()
.find(|((start, finish), _)| *start == key || *finish == key)
{
let entry = merging.entry(name).or_insert_with(HashMap::new);
for counter in counters {
let mut merge_counter = entry
.entry(counter.labels.clone())
.or_insert_with(MergeCounter::default);
if key == *start {
merge_counter.start = Some(counter);
} else {
merge_counter.finish = Some(counter);
}
}
continue;
}
println!("\t{}", key);
for counter in counters {
println!("\t\t{}", counter);
}
}
for (key, counters) in merging {
println!("\t{}", key);
for (_, counter) in counters {
if let Some(counter) = counter.merge() {
println!("\t\t{}", counter);
}
}
}
}
if !self.gauges.is_empty() {
println!("Gauges");
for (key, gauges) in self.gauges {
println!("\t{}", key);
for gauge in gauges {
println!("\t\t{}", gauge);
}
}
}
if !self.histograms.is_empty() {
println!("Histograms");
for (key, histograms) in self.histograms {
println!("\t{}", key);
for histogram in histograms {
println!("\t\t{}", histogram);
}
}
}
}
}
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
let labels = key
.labels()
.into_iter()
.map(|label| (label.key().to_string(), label.value().to_string()))
.collect();
let name = key.name().to_string();
(name, labels)
}
impl Inner {
fn snapshot_counters(&self) -> HashMap<String, Vec<Counter>> {
let mut counters = HashMap::new();
for (key, counter) in self.registry.get_counter_handles() {
let gen = counter.get_generation();
if !self.recency.should_store_counter(&key, gen, &self.registry) {
continue;
}
let (name, labels) = key_to_parts(&key);
let value = counter.get_inner().load(Ordering::Acquire);
counters.entry(name).or_insert_with(Vec::new).push(Counter {
labels: labels.into_iter().collect(),
value,
});
}
counters
}
fn snapshot_gauges(&self) -> HashMap<String, Vec<Gauge>> {
let mut gauges = HashMap::new();
for (key, gauge) in self.registry.get_gauge_handles() {
let gen = gauge.get_generation();
if !self.recency.should_store_gauge(&key, gen, &self.registry) {
continue;
}
let (name, labels) = key_to_parts(&key);
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
gauges.entry(name).or_insert_with(Vec::new).push(Gauge {
labels: labels.into_iter().collect(),
value,
})
}
gauges
}
fn snapshot_histograms(&self) -> HashMap<String, Vec<Histogram>> {
for (key, histogram) in self.registry.get_histogram_handles() {
let gen = histogram.get_generation();
let (name, labels) = key_to_parts(&key);
if !self
.recency
.should_store_histogram(&key, gen, &self.registry)
{
let mut d = self.distributions.write().unwrap();
let delete_by_name = if let Some(by_name) = d.get_mut(&name) {
by_name.remove(&labels);
by_name.is_empty()
} else {
false
};
drop(d);
if delete_by_name {
self.descriptions.write().unwrap().remove(&name);
}
continue;
}
let mut d = self.distributions.write().unwrap();
let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new);
let entry = outer_entry
.entry(labels)
.or_insert_with(Summary::with_defaults);
histogram.get_inner().clear_with(|samples| {
for sample in samples {
entry.add(*sample);
}
})
}
let d = self.distributions.read().unwrap().clone();
d.into_iter()
.map(|(key, value)| {
(
key,
value
.into_iter()
.map(|(labels, summary)| Histogram {
labels: labels.into_iter().collect(),
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
.into_iter()
.map(|q| (q, summary.quantile(q)))
.collect(),
})
.collect(),
)
})
.collect()
}
fn snapshot(&self) -> Snapshot {
Snapshot {
counters: self.snapshot_counters(),
gauges: self.snapshot_gauges(),
histograms: self.snapshot_histograms(),
}
}
}
impl MemoryCollector {
pub(crate) fn new() -> Self {
MemoryCollector {
inner: Arc::new(Inner {
descriptions: Default::default(),
distributions: Default::default(),
recency: Recency::new(
Clock::new(),
MetricKindMask::ALL,
Some(Duration::from_secs(5 * DAYS)),
),
registry: Registry::new(GenerationalStorage::atomic()),
}),
}
}
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
metrics::set_boxed_recorder(Box::new(self.clone()))
}
pub(crate) fn snapshot(&self) -> Snapshot {
self.inner.snapshot()
}
fn add_description_if_missing(
&self,
key: &metrics::KeyName,
description: metrics::SharedString,
) {
let mut d = self.inner.descriptions.write().unwrap();
d.entry(key.as_str().to_owned()).or_insert(description);
}
}
impl Recorder for MemoryCollector {
fn describe_counter(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn describe_gauge(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn describe_histogram(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn register_counter(&self, key: &Key) -> metrics::Counter {
self.inner
.registry
.get_or_create_counter(key, |c| c.clone().into())
}
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
self.inner
.registry
.get_or_create_gauge(key, |c| c.clone().into())
}
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
self.inner
.registry
.get_or_create_histogram(key, |c| c.clone().into())
}
}

View File

@ -1,133 +0,0 @@
use metrics::{CounterFn, GaugeFn, HistogramFn, Key, Recorder, SetRecorderError};
use std::sync::Arc;
#[derive(Clone)]
pub(crate) struct DoubleRecorder<R, S> {
first: R,
second: S,
}
struct DoubleCounter {
first: metrics::Counter,
second: metrics::Counter,
}
struct DoubleGauge {
first: metrics::Gauge,
second: metrics::Gauge,
}
struct DoubleHistogram {
first: metrics::Histogram,
second: metrics::Histogram,
}
impl<R, S> DoubleRecorder<R, S> {
pub(crate) fn new(first: R, second: S) -> Self {
DoubleRecorder { first, second }
}
pub(crate) fn install(self) -> Result<(), SetRecorderError>
where
R: Recorder + 'static,
S: Recorder + 'static,
{
metrics::set_boxed_recorder(Box::new(self))
}
}
impl<R, S> Recorder for DoubleRecorder<R, S>
where
R: Recorder,
S: Recorder,
{
fn describe_counter(
&self,
key: metrics::KeyName,
unit: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.first
.describe_counter(key.clone(), unit, description.clone());
self.second.describe_counter(key, unit, description);
}
fn describe_gauge(
&self,
key: metrics::KeyName,
unit: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.first
.describe_gauge(key.clone(), unit, description.clone());
self.second.describe_gauge(key, unit, description);
}
fn describe_histogram(
&self,
key: metrics::KeyName,
unit: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.first
.describe_histogram(key.clone(), unit, description.clone());
self.second.describe_histogram(key, unit, description);
}
fn register_counter(&self, key: &Key) -> metrics::Counter {
let first = self.first.register_counter(key);
let second = self.second.register_counter(key);
metrics::Counter::from_arc(Arc::new(DoubleCounter { first, second }))
}
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
let first = self.first.register_gauge(key);
let second = self.second.register_gauge(key);
metrics::Gauge::from_arc(Arc::new(DoubleGauge { first, second }))
}
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
let first = self.first.register_histogram(key);
let second = self.second.register_histogram(key);
metrics::Histogram::from_arc(Arc::new(DoubleHistogram { first, second }))
}
}
impl CounterFn for DoubleCounter {
fn increment(&self, value: u64) {
self.first.increment(value);
self.second.increment(value);
}
fn absolute(&self, value: u64) {
self.first.absolute(value);
self.second.absolute(value);
}
}
impl GaugeFn for DoubleGauge {
fn increment(&self, value: f64) {
self.first.increment(value);
self.second.increment(value);
}
fn decrement(&self, value: f64) {
self.first.decrement(value);
self.second.decrement(value);
}
fn set(&self, value: f64) {
self.first.set(value);
self.second.set(value);
}
}
impl HistogramFn for DoubleHistogram {
fn record(&self, value: f64) {
self.first.record(value);
self.second.record(value);
}
}

View File

@ -1,414 +0,0 @@
use metrics::{Key, Recorder, SetRecorderError};
use metrics_util::{
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
MetricKindMask, Summary,
};
use quanta::Clock;
use std::{
collections::{BTreeMap, HashMap},
sync::{atomic::Ordering, Arc, RwLock},
time::Duration,
};
const SECONDS: u64 = 1;
const MINUTES: u64 = 60 * SECONDS;
const HOURS: u64 = 60 * MINUTES;
const DAYS: u64 = 24 * HOURS;
type DistributionMap = BTreeMap<Vec<(String, String)>, Summary>;
#[derive(Clone)]
pub struct MemoryCollector {
inner: Arc<Inner>,
}
struct Inner {
descriptions: RwLock<HashMap<String, metrics::SharedString>>,
distributions: RwLock<HashMap<String, DistributionMap>>,
recency: Recency<Key>,
registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Counter {
labels: BTreeMap<String, String>,
value: u64,
}
impl std::fmt::Display for Counter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<_>>()
.join(", ");
write!(f, "{labels} - {}", self.value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Gauge {
labels: BTreeMap<String, String>,
value: f64,
}
impl std::fmt::Display for Gauge {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<_>>()
.join(", ");
write!(f, "{labels} - {}", self.value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Histogram {
labels: BTreeMap<String, String>,
value: Vec<(f64, Option<f64>)>,
}
impl std::fmt::Display for Histogram {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<_>>()
.join(", ");
let value = self
.value
.iter()
.map(|(k, v)| {
if let Some(v) = v {
format!("{k}: {v:.6}")
} else {
format!("{k}: None,")
}
})
.collect::<Vec<_>>()
.join(", ");
write!(f, "{labels} - {value}")
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Snapshot {
counters: HashMap<String, Vec<Counter>>,
gauges: HashMap<String, Vec<Gauge>>,
histograms: HashMap<String, Vec<Histogram>>,
}
const PAIRS: [((&str, &str), &str); 2] = [
(
(
"background-jobs.worker.started",
"background-jobs.worker.finished",
),
"background-jobs.worker.running",
),
(
(
"background-jobs.job.started",
"background-jobs.job.finished",
),
"background-jobs.job.running",
),
];
#[derive(Default)]
struct MergeCounter {
start: Option<Counter>,
finish: Option<Counter>,
}
impl MergeCounter {
fn merge(self) -> Option<Counter> {
match (self.start, self.finish) {
(Some(start), Some(end)) => Some(Counter {
labels: start.labels,
value: start.value.saturating_sub(end.value),
}),
(Some(only), None) => Some(only),
(None, Some(only)) => Some(Counter {
labels: only.labels,
value: 0,
}),
(None, None) => None,
}
}
}
impl Snapshot {
pub(crate) fn present(self) {
if !self.counters.is_empty() {
println!("Counters");
let mut merging = HashMap::new();
for (key, counters) in self.counters {
if let Some(((start, _), name)) = PAIRS
.iter()
.find(|((start, finish), _)| *start == key || *finish == key)
{
let entry = merging.entry(name).or_insert_with(HashMap::new);
for counter in counters {
let mut merge_counter = entry
.entry(counter.labels.clone())
.or_insert_with(MergeCounter::default);
if key == *start {
merge_counter.start = Some(counter);
} else {
merge_counter.finish = Some(counter);
}
}
continue;
}
println!("\t{key}");
for counter in counters {
println!("\t\t{counter}");
}
}
for (key, counters) in merging {
println!("\t{key}");
for (_, counter) in counters {
if let Some(counter) = counter.merge() {
println!("\t\t{counter}");
}
}
}
}
if !self.gauges.is_empty() {
println!("Gauges");
for (key, gauges) in self.gauges {
println!("\t{key}");
for gauge in gauges {
println!("\t\t{gauge}");
}
}
}
if !self.histograms.is_empty() {
println!("Histograms");
for (key, histograms) in self.histograms {
println!("\t{key}");
for histogram in histograms {
println!("\t\t{histogram}");
}
}
}
}
}
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
let labels = key
.labels()
.into_iter()
.map(|label| (label.key().to_string(), label.value().to_string()))
.collect();
let name = key.name().to_string();
(name, labels)
}
impl Inner {
fn snapshot_counters(&self) -> HashMap<String, Vec<Counter>> {
let mut counters = HashMap::new();
for (key, counter) in self.registry.get_counter_handles() {
let gen = counter.get_generation();
if !self.recency.should_store_counter(&key, gen, &self.registry) {
continue;
}
let (name, labels) = key_to_parts(&key);
let value = counter.get_inner().load(Ordering::Acquire);
counters.entry(name).or_insert_with(Vec::new).push(Counter {
labels: labels.into_iter().collect(),
value,
});
}
counters
}
fn snapshot_gauges(&self) -> HashMap<String, Vec<Gauge>> {
let mut gauges = HashMap::new();
for (key, gauge) in self.registry.get_gauge_handles() {
let gen = gauge.get_generation();
if !self.recency.should_store_gauge(&key, gen, &self.registry) {
continue;
}
let (name, labels) = key_to_parts(&key);
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
gauges.entry(name).or_insert_with(Vec::new).push(Gauge {
labels: labels.into_iter().collect(),
value,
})
}
gauges
}
fn snapshot_histograms(&self) -> HashMap<String, Vec<Histogram>> {
for (key, histogram) in self.registry.get_histogram_handles() {
let gen = histogram.get_generation();
let (name, labels) = key_to_parts(&key);
if !self
.recency
.should_store_histogram(&key, gen, &self.registry)
{
let mut d = self.distributions.write().unwrap();
let delete_by_name = if let Some(by_name) = d.get_mut(&name) {
by_name.remove(&labels);
by_name.is_empty()
} else {
false
};
drop(d);
if delete_by_name {
self.descriptions.write().unwrap().remove(&name);
}
continue;
}
let mut d = self.distributions.write().unwrap();
let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new);
let entry = outer_entry
.entry(labels)
.or_insert_with(Summary::with_defaults);
histogram.get_inner().clear_with(|samples| {
for sample in samples {
entry.add(*sample);
}
})
}
let d = self.distributions.read().unwrap().clone();
d.into_iter()
.map(|(key, value)| {
(
key,
value
.into_iter()
.map(|(labels, summary)| Histogram {
labels: labels.into_iter().collect(),
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
.into_iter()
.map(|q| (q, summary.quantile(q)))
.collect(),
})
.collect(),
)
})
.collect()
}
fn snapshot(&self) -> Snapshot {
Snapshot {
counters: self.snapshot_counters(),
gauges: self.snapshot_gauges(),
histograms: self.snapshot_histograms(),
}
}
}
impl MemoryCollector {
pub(crate) fn new() -> Self {
MemoryCollector {
inner: Arc::new(Inner {
descriptions: Default::default(),
distributions: Default::default(),
recency: Recency::new(
Clock::new(),
MetricKindMask::ALL,
Some(Duration::from_secs(5 * DAYS)),
),
registry: Registry::new(GenerationalStorage::atomic()),
}),
}
}
pub(crate) fn snapshot(&self) -> Snapshot {
self.inner.snapshot()
}
fn add_description_if_missing(
&self,
key: &metrics::KeyName,
description: metrics::SharedString,
) {
let mut d = self.inner.descriptions.write().unwrap();
d.entry(key.as_str().to_owned()).or_insert(description);
}
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
metrics::set_boxed_recorder(Box::new(self.clone()))
}
}
impl Recorder for MemoryCollector {
fn describe_counter(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn describe_gauge(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn describe_histogram(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn register_counter(&self, key: &Key) -> metrics::Counter {
self.inner
.registry
.get_or_create_counter(key, |c| c.clone().into())
}
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
self.inner
.registry
.get_or_create_gauge(key, |c| c.clone().into())
}
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
self.inner
.registry
.get_or_create_histogram(key, |c| c.clone().into())
}
}

View File

@ -1,24 +1,22 @@
use crate::{
data::{ActorCache, State},
error::Error,
extractors::{AdminConfig, XApiToken},
middleware::MyVerify,
requests::Requests,
};
use activitystreams::{
iri,
iri_string::{
format::ToDedicatedString,
resolve::FixedBaseResolver,
types::{IriAbsoluteString, IriFragmentStr, IriRelativeStr, IriString},
},
};
use config::Environment;
use http_signature_normalization_actix::prelude::VerifyDigest;
use rsa::sha2::{Digest, Sha256};
use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature};
use rustls::{Certificate, PrivateKey};
use std::{
io::BufReader,
net::{IpAddr, SocketAddr},
path::PathBuf,
};
use sha2::{Digest, Sha256};
use std::{io::BufReader, net::IpAddr, path::PathBuf};
use uuid::Uuid;
#[derive(Clone, Debug, serde::Deserialize)]
@ -33,7 +31,6 @@ pub(crate) struct ParsedConfig {
publish_blocks: bool,
sled_path: PathBuf,
source_repo: IriString,
repository_commit_base: String,
opentelemetry_url: Option<IriString>,
telegram_token: Option<String>,
telegram_admin_handle: Option<String>,
@ -43,8 +40,6 @@ pub(crate) struct ParsedConfig {
footer_blurb: Option<String>,
local_domains: Option<String>,
local_blurb: Option<String>,
prometheus_addr: Option<IpAddr>,
prometheus_port: Option<u16>,
}
#[derive(Clone)]
@ -67,7 +62,6 @@ pub struct Config {
footer_blurb: Option<String>,
local_domains: Vec<String>,
local_blurb: Option<String>,
prometheus_config: Option<PrometheusConfig>,
}
#[derive(Clone)]
@ -76,12 +70,6 @@ struct TlsConfig {
cert: PathBuf,
}
#[derive(Clone, Debug)]
struct PrometheusConfig {
addr: IpAddr,
port: u16,
}
#[derive(Debug)]
pub enum UrlKind {
Activity,
@ -106,7 +94,6 @@ pub enum AdminUrlKind {
Blocked,
Connected,
Stats,
LastSeen,
}
impl std::fmt::Debug for Config {
@ -134,7 +121,6 @@ impl std::fmt::Debug for Config {
.field("footer_blurb", &self.footer_blurb)
.field("local_domains", &self.local_domains)
.field("local_blurb", &self.local_blurb)
.field("prometheus_config", &self.prometheus_config)
.finish()
}
}
@ -152,7 +138,6 @@ impl Config {
.set_default("publish_blocks", false)?
.set_default("sled_path", "./sled/db-0-34")?
.set_default("source_repo", "https://git.asonix.dog/asonix/relay")?
.set_default("repository_commit_base", "/src/commit/")?
.set_default("opentelemetry_url", None as Option<&str>)?
.set_default("telegram_token", None as Option<&str>)?
.set_default("telegram_admin_handle", None as Option<&str>)?
@ -162,15 +147,13 @@ impl Config {
.set_default("footer_blurb", None as Option<&str>)?
.set_default("local_domains", None as Option<&str>)?
.set_default("local_blurb", None as Option<&str>)?
.set_default("prometheus_addr", None as Option<&str>)?
.set_default("prometheus_port", None as Option<u16>)?
.add_source(Environment::default())
.build()?;
let config: ParsedConfig = config.try_deserialize()?;
let scheme = if config.https { "https" } else { "http" };
let base_uri = iri!(format!("{scheme}://{}", config.hostname)).into_absolute();
let base_uri = iri!(format!("{}://{}", scheme, config.hostname)).into_absolute();
let tls = match (config.tls_key, config.tls_cert) {
(Some(key), Some(cert)) => Some(TlsConfig { key, cert }),
@ -192,29 +175,6 @@ impl Config {
.map(|d| d.to_string())
.collect();
let prometheus_config = match (config.prometheus_addr, config.prometheus_port) {
(Some(addr), Some(port)) => Some(PrometheusConfig { addr, port }),
(Some(_), None) => {
tracing::warn!("PROMETHEUS_ADDR is set but PROMETHEUS_PORT is not set, not building Prometheus config");
None
}
(None, Some(_)) => {
tracing::warn!("PROMETHEUS_PORT is set but PROMETHEUS_ADDR is not set, not building Prometheus config");
None
}
(None, None) => None,
};
let source_url = match Self::git_hash() {
Some(hash) => format!(
"{}{}{hash}",
config.source_repo, config.repository_commit_base
)
.parse()
.expect("constructed source URL is valid"),
None => config.source_repo.clone(),
};
Ok(Config {
hostname: config.hostname,
addr: config.addr,
@ -225,7 +185,7 @@ impl Config {
publish_blocks: config.publish_blocks,
base_uri,
sled_path: config.sled_path,
source_repo: source_url,
source_repo: config.source_repo,
opentelemetry_url: config.opentelemetry_url,
telegram_token: config.telegram_token,
telegram_admin_handle: config.telegram_admin_handle,
@ -234,16 +194,9 @@ impl Config {
footer_blurb: config.footer_blurb,
local_domains,
local_blurb: config.local_blurb,
prometheus_config,
})
}
pub(crate) fn prometheus_bind_address(&self) -> Option<SocketAddr> {
let config = self.prometheus_config.as_ref()?;
Some((config.addr, config.port).into())
}
pub(crate) fn open_keys(&self) -> Result<Option<(Vec<Certificate>, PrivateKey)>, Error> {
let tls = if let Some(tls) = &self.tls {
tls
@ -323,6 +276,19 @@ impl Config {
}
}
pub(crate) fn signature_middleware(
&self,
requests: Requests,
actors: ActorCache,
state: State,
) -> VerifySignature<MyVerify> {
if self.validate_signatures {
VerifySignature::new(MyVerify(requests, actors, state), Default::default())
} else {
VerifySignature::new(MyVerify(requests, actors, state), Default::default()).optional()
}
}
pub(crate) fn x_api_token(&self) -> Option<XApiToken> {
self.api_token.clone().map(XApiToken::new)
}
@ -332,7 +298,7 @@ impl Config {
match AdminConfig::build(api_token) {
Ok(conf) => Some(actix_web::web::Data::new(conf)),
Err(e) => {
tracing::error!("Error creating admin config: {e}");
tracing::error!("Error creating admin config: {}", e);
None
}
}
@ -371,7 +337,7 @@ impl Config {
pub(crate) fn software_version() -> String {
if let Some(git) = Self::git_version() {
return format!("v{}-{git}", Self::version());
return format!("v{}-{}", Self::version(), git);
}
format!("v{}", Self::version())
@ -379,9 +345,9 @@ impl Config {
fn git_version() -> Option<String> {
let branch = Self::git_branch()?;
let hash = Self::git_short_hash()?;
let hash = Self::git_hash()?;
Some(format!("{branch}-{hash}"))
Some(format!("{}-{}", branch, hash))
}
fn name() -> &'static str {
@ -400,10 +366,6 @@ impl Config {
option_env!("GIT_HASH")
}
fn git_short_hash() -> Option<&'static str> {
option_env!("GIT_SHORT_HASH")
}
pub(crate) fn user_agent(&self) -> String {
format!(
"{} ({}/{}; +{})",
@ -433,44 +395,37 @@ impl Config {
self.do_generate_url(kind).expect("Generated valid IRI")
}
#[tracing::instrument(level = "debug", skip_all, fields(base_uri = tracing::field::debug(&self.base_uri), kind = tracing::field::debug(&kind)))]
fn do_generate_url(&self, kind: UrlKind) -> Result<IriString, Error> {
let iri = match kind {
UrlKind::Activity => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new(&format!("activity/{}", Uuid::new_v4()))?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Activity => FixedBaseResolver::new(self.base_uri.as_ref()).try_resolve(
IriRelativeStr::new(&format!("activity/{}", Uuid::new_v4()))?.as_ref(),
)?,
UrlKind::Actor => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new("actor")?.as_ref())
.try_to_dedicated_string()?,
.try_resolve(IriRelativeStr::new("actor")?.as_ref())?,
UrlKind::Followers => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new("followers")?.as_ref())
.try_to_dedicated_string()?,
.try_resolve(IriRelativeStr::new("followers")?.as_ref())?,
UrlKind::Following => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new("following")?.as_ref())
.try_to_dedicated_string()?,
.try_resolve(IriRelativeStr::new("following")?.as_ref())?,
UrlKind::Inbox => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new("inbox")?.as_ref())
.try_to_dedicated_string()?,
.try_resolve(IriRelativeStr::new("inbox")?.as_ref())?,
UrlKind::Index => self.base_uri.clone().into(),
UrlKind::MainKey => {
let actor = IriRelativeStr::new("actor")?;
let fragment = IriFragmentStr::new("main-key")?;
let mut resolved = FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(actor.as_ref())
.try_to_dedicated_string()?;
let mut resolved =
FixedBaseResolver::new(self.base_uri.as_ref()).try_resolve(actor.as_ref())?;
resolved.set_fragment(Some(fragment));
resolved
}
UrlKind::Media(uuid) => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new(&format!("media/{uuid}"))?.as_ref())
.try_to_dedicated_string()?,
.try_resolve(IriRelativeStr::new(&format!("media/{}", uuid))?.as_ref())?,
UrlKind::NodeInfo => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new("nodeinfo/2.0.json")?.as_ref())
.try_to_dedicated_string()?,
.try_resolve(IriRelativeStr::new("nodeinfo/2.0.json")?.as_ref())?,
UrlKind::Outbox => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new("outbox")?.as_ref())
.try_to_dedicated_string()?,
.try_resolve(IriRelativeStr::new("outbox")?.as_ref())?,
};
Ok(iri)
@ -482,22 +437,25 @@ impl Config {
}
fn do_generate_admin_url(&self, kind: AdminUrlKind) -> Result<IriString, Error> {
let path = match kind {
AdminUrlKind::Allow => "api/v1/admin/allow",
AdminUrlKind::Disallow => "api/v1/admin/disallow",
AdminUrlKind::Block => "api/v1/admin/block",
AdminUrlKind::Unblock => "api/v1/admin/unblock",
AdminUrlKind::Allowed => "api/v1/admin/allowed",
AdminUrlKind::Blocked => "api/v1/admin/blocked",
AdminUrlKind::Connected => "api/v1/admin/connected",
AdminUrlKind::Stats => "api/v1/admin/stats",
AdminUrlKind::LastSeen => "api/v1/admin/last_seen",
let iri = match kind {
AdminUrlKind::Allow => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/allow")?.as_ref())?,
AdminUrlKind::Disallow => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/disallow")?.as_ref())?,
AdminUrlKind::Block => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/block")?.as_ref())?,
AdminUrlKind::Unblock => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/unblock")?.as_ref())?,
AdminUrlKind::Allowed => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/allowed")?.as_ref())?,
AdminUrlKind::Blocked => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/blocked")?.as_ref())?,
AdminUrlKind::Connected => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/connected")?.as_ref())?,
AdminUrlKind::Stats => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/stats")?.as_ref())?,
};
let iri = FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new(path)?.as_ref())
.try_to_dedicated_string()?;
Ok(iri)
}
}

View File

@ -1,11 +1,9 @@
mod actor;
mod last_online;
mod media;
mod node;
mod state;
pub(crate) use actor::ActorCache;
pub(crate) use last_online::LastOnline;
pub(crate) use media::MediaCache;
pub(crate) use node::{Node, NodeCache};
pub(crate) use state::State;

View File

@ -37,7 +37,7 @@ impl ActorCache {
ActorCache { db }
}
#[tracing::instrument(level = "debug" name = "Get Actor", skip_all, fields(id = id.to_string().as_str()))]
#[tracing::instrument(level = "debug" name = "Get Actor", skip_all, fields(id = id.to_string().as_str(), requests))]
pub(crate) async fn get(
&self,
id: &IriString,
@ -56,8 +56,12 @@ impl ActorCache {
#[tracing::instrument(level = "debug", name = "Add Connection", skip(self))]
pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> {
self.db.add_connection(actor.id.clone()).await?;
self.db.save_actor(actor).await
let add_connection = self.db.add_connection(actor.id.clone());
let save_actor = self.db.save_actor(actor);
tokio::try_join!(add_connection, save_actor)?;
Ok(())
}
#[tracing::instrument(level = "debug", name = "Remove Connection", skip(self))]
@ -65,13 +69,13 @@ impl ActorCache {
self.db.remove_connection(actor.id.clone()).await
}
#[tracing::instrument(level = "debug", name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str()))]
#[tracing::instrument(level = "debug", name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str(), requests))]
pub(crate) async fn get_no_cache(
&self,
id: &IriString,
requests: &Requests,
) -> Result<Actor, Error> {
let accepted_actor = requests.fetch::<AcceptedActors>(id).await?;
let accepted_actor = requests.fetch::<AcceptedActors>(id.as_str()).await?;
let input_authority = id.authority_components().ok_or(ErrorKind::MissingDomain)?;
let accepted_actor_id = accepted_actor

View File

@ -1,28 +0,0 @@
use activitystreams::iri_string::types::IriStr;
use std::{collections::HashMap, sync::Mutex};
use time::OffsetDateTime;
pub(crate) struct LastOnline {
domains: Mutex<HashMap<String, OffsetDateTime>>,
}
impl LastOnline {
pub(crate) fn mark_seen(&self, iri: &IriStr) {
if let Some(authority) = iri.authority_str() {
self.domains
.lock()
.unwrap()
.insert(authority.to_string(), OffsetDateTime::now_utc());
}
}
pub(crate) fn take(&self) -> HashMap<String, OffsetDateTime> {
std::mem::take(&mut *self.domains.lock().unwrap())
}
pub(crate) fn empty() -> Self {
Self {
domains: Mutex::new(HashMap::default()),
}
}
}

View File

@ -36,9 +36,11 @@ impl NodeCache {
#[tracing::instrument(level = "debug", name = "Get nodes", skip(self))]
pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> {
let infos = self.db.connected_info().await?;
let instances = self.db.connected_instance().await?;
let contacts = self.db.connected_contact().await?;
let infos = self.db.connected_info();
let instances = self.db.connected_instance();
let contacts = self.db.connected_contact();
let (infos, instances, contacts) = tokio::try_join!(infos, instances, contacts)?;
let vec = self
.db
@ -182,7 +184,7 @@ impl Node {
let authority = url.authority_str().ok_or(ErrorKind::MissingDomain)?;
let scheme = url.scheme_str();
let base = iri!(format!("{scheme}://{authority}"));
let base = iri!(format!("{}://{}", scheme, authority));
Ok(Node {
base,

View File

@ -10,9 +10,8 @@ use actix_web::web;
use lru::LruCache;
use rand::thread_rng;
use rsa::{RsaPrivateKey, RsaPublicKey};
use std::sync::{Arc, RwLock};
use super::LastOnline;
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Clone)]
pub struct State {
@ -21,7 +20,6 @@ pub struct State {
object_cache: Arc<RwLock<LruCache<IriString, IriString>>>,
node_cache: NodeCache,
breakers: Breakers,
pub(crate) last_online: Arc<LastOnline>,
pub(crate) db: Db,
}
@ -46,7 +44,6 @@ impl State {
self.private_key.clone(),
config.user_agent(),
self.breakers.clone(),
self.last_online.clone(),
)
}
@ -81,12 +78,12 @@ impl State {
.collect())
}
pub(crate) fn is_cached(&self, object_id: &IriString) -> bool {
self.object_cache.read().unwrap().contains(object_id)
pub(crate) async fn is_cached(&self, object_id: &IriString) -> bool {
self.object_cache.read().await.contains(object_id)
}
pub(crate) fn cache(&self, object_id: IriString, actor_id: IriString) {
self.object_cache.write().unwrap().put(object_id, actor_id);
pub(crate) async fn cache(&self, object_id: IriString, actor_id: IriString) {
self.object_cache.write().await.put(object_id, actor_id);
}
#[tracing::instrument(level = "debug", name = "Building state", skip_all)]
@ -118,7 +115,6 @@ impl State {
node_cache: NodeCache::new(db.clone()),
breakers: Breakers::default(),
db,
last_online: Arc::new(LastOnline::empty()),
};
Ok(state)

157
src/db.rs
View File

@ -7,16 +7,8 @@ use rsa::{
pkcs8::{DecodePrivateKey, EncodePrivateKey},
RsaPrivateKey,
};
use sled::{Batch, Tree};
use std::{
collections::{BTreeMap, HashMap},
sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::SystemTime,
};
use time::OffsetDateTime;
use sled::Tree;
use std::{collections::HashMap, sync::Arc, time::SystemTime};
use uuid::Uuid;
#[derive(Clone, Debug)]
@ -25,8 +17,6 @@ pub(crate) struct Db {
}
struct Inner {
healthz: Tree,
healthz_counter: Arc<AtomicU64>,
actor_id_actor: Tree,
public_key_id_actor_id: Tree,
connected_actor_ids: Tree,
@ -38,7 +28,6 @@ struct Inner {
actor_id_info: Tree,
actor_id_instance: Tree,
actor_id_contact: Tree,
last_seen: Tree,
restricted_mode: bool,
}
@ -247,8 +236,6 @@ impl Db {
fn build_inner(restricted_mode: bool, db: sled::Db) -> Result<Self, Error> {
Ok(Db {
inner: Arc::new(Inner {
healthz: db.open_tree("healthz")?,
healthz_counter: Arc::new(AtomicU64::new(0)),
actor_id_actor: db.open_tree("actor-id-actor")?,
public_key_id_actor_id: db.open_tree("public-key-id-actor-id")?,
connected_actor_ids: db.open_tree("connected-actor-ids")?,
@ -260,7 +247,6 @@ impl Db {
actor_id_info: db.open_tree("actor-id-info")?,
actor_id_instance: db.open_tree("actor-id-instance")?,
actor_id_contact: db.open_tree("actor-id-contact")?,
last_seen: db.open_tree("last-seen")?,
restricted_mode,
}),
})
@ -268,7 +254,7 @@ impl Db {
async fn unblock<T>(
&self,
f: impl FnOnce(&Inner) -> Result<T, Error> + Send + 'static,
f: impl Fn(&Inner) -> Result<T, Error> + Send + 'static,
) -> Result<T, Error>
where
T: Send + 'static,
@ -280,63 +266,6 @@ impl Db {
Ok(t)
}
pub(crate) async fn check_health(&self) -> Result<(), Error> {
let next = self.inner.healthz_counter.fetch_add(1, Ordering::Relaxed);
self.unblock(move |inner| {
inner
.healthz
.insert("healthz", &next.to_be_bytes()[..])
.map_err(Error::from)
})
.await?;
self.inner.healthz.flush_async().await?;
self.unblock(move |inner| inner.healthz.get("healthz").map_err(Error::from))
.await?;
Ok(())
}
pub(crate) async fn mark_last_seen(
&self,
nodes: HashMap<String, OffsetDateTime>,
) -> Result<(), Error> {
let mut batch = Batch::default();
for (domain, datetime) in nodes {
let datetime_string = serde_json::to_vec(&datetime)?;
batch.insert(domain.as_bytes(), datetime_string);
}
self.unblock(move |inner| inner.last_seen.apply_batch(batch).map_err(Error::from))
.await
}
pub(crate) async fn last_seen(
&self,
) -> Result<BTreeMap<String, Option<OffsetDateTime>>, Error> {
self.unblock(|inner| {
let mut map = BTreeMap::new();
for iri in inner.connected() {
let Some(authority_str) = iri.authority_str() else {
continue;
};
if let Some(datetime) = inner.last_seen.get(authority_str)? {
map.insert(
authority_str.to_string(),
Some(serde_json::from_slice(&datetime)?),
);
} else {
map.insert(authority_str.to_string(), None);
}
}
Ok(map)
})
.await
}
pub(crate) async fn connected_ids(&self) -> Result<Vec<IriString>, Error> {
self.unblock(|inner| Ok(inner.connected().collect())).await
}
@ -356,12 +285,12 @@ impl Db {
pub(crate) async fn info(&self, actor_id: IriString) -> Result<Option<Info>, Error> {
self.unblock(move |inner| {
inner
.actor_id_info
.get(actor_id.as_str().as_bytes())?
.map(|ivec| serde_json::from_slice(&ivec))
.transpose()
.map_err(Error::from)
if let Some(ivec) = inner.actor_id_info.get(actor_id.as_str().as_bytes())? {
let info = serde_json::from_slice(&ivec)?;
Ok(Some(info))
} else {
Ok(None)
}
})
.await
}
@ -390,12 +319,12 @@ impl Db {
pub(crate) async fn instance(&self, actor_id: IriString) -> Result<Option<Instance>, Error> {
self.unblock(move |inner| {
inner
.actor_id_instance
.get(actor_id.as_str().as_bytes())?
.map(|ivec| serde_json::from_slice(&ivec))
.transpose()
.map_err(Error::from)
if let Some(ivec) = inner.actor_id_instance.get(actor_id.as_str().as_bytes())? {
let instance = serde_json::from_slice(&ivec)?;
Ok(Some(instance))
} else {
Ok(None)
}
})
.await
}
@ -424,12 +353,12 @@ impl Db {
pub(crate) async fn contact(&self, actor_id: IriString) -> Result<Option<Contact>, Error> {
self.unblock(move |inner| {
inner
.actor_id_contact
.get(actor_id.as_str().as_bytes())?
.map(|ivec| serde_json::from_slice(&ivec))
.transpose()
.map_err(Error::from)
if let Some(ivec) = inner.actor_id_contact.get(actor_id.as_str().as_bytes())? {
let contact = serde_json::from_slice(&ivec)?;
Ok(Some(contact))
} else {
Ok(None)
}
})
.await
}
@ -454,20 +383,22 @@ impl Db {
pub(crate) async fn media_id(&self, url: IriString) -> Result<Option<Uuid>, Error> {
self.unblock(move |inner| {
Ok(inner
.media_url_media_id
.get(url.as_str().as_bytes())?
.and_then(uuid_from_ivec))
if let Some(ivec) = inner.media_url_media_id.get(url.as_str().as_bytes())? {
Ok(uuid_from_ivec(ivec))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn media_url(&self, id: Uuid) -> Result<Option<IriString>, Error> {
self.unblock(move |inner| {
Ok(inner
.media_id_media_url
.get(id.as_bytes())?
.and_then(url_from_ivec))
if let Some(ivec) = inner.media_id_media_url.get(id.as_bytes())? {
Ok(url_from_ivec(ivec))
} else {
Ok(None)
}
})
.await
}
@ -488,7 +419,7 @@ impl Db {
pub(crate) async fn is_connected(&self, base_id: IriString) -> Result<bool, Error> {
let scheme = base_id.scheme_str();
let authority = base_id.authority_str().ok_or(ErrorKind::MissingDomain)?;
let prefix = format!("{scheme}://{authority}");
let prefix = format!("{}://{}", scheme, authority);
self.unblock(move |inner| {
let connected = inner
@ -507,22 +438,26 @@ impl Db {
public_key_id: IriString,
) -> Result<Option<IriString>, Error> {
self.unblock(move |inner| {
Ok(inner
if let Some(ivec) = inner
.public_key_id_actor_id
.get(public_key_id.as_str().as_bytes())?
.and_then(url_from_ivec))
{
Ok(url_from_ivec(ivec))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn actor(&self, actor_id: IriString) -> Result<Option<Actor>, Error> {
self.unblock(move |inner| {
inner
.actor_id_actor
.get(actor_id.as_str().as_bytes())?
.map(|ivec| serde_json::from_slice(&ivec))
.transpose()
.map_err(Error::from)
if let Some(ivec) = inner.actor_id_actor.get(actor_id.as_str().as_bytes())? {
let actor = serde_json::from_slice(&ivec)?;
Ok(Some(actor))
} else {
Ok(None)
}
})
.await
}
@ -544,7 +479,7 @@ impl Db {
}
pub(crate) async fn remove_connection(&self, actor_id: IriString) -> Result<(), Error> {
tracing::debug!("Removing Connection: {actor_id}");
tracing::debug!("Removing Connection: {}", actor_id);
self.unblock(move |inner| {
inner
.connected_actor_ids
@ -556,7 +491,7 @@ impl Db {
}
pub(crate) async fn add_connection(&self, actor_id: IriString) -> Result<(), Error> {
tracing::debug!("Adding Connection: {actor_id}");
tracing::debug!("Adding Connection: {}", actor_id);
self.unblock(move |inner| {
inner
.connected_actor_ids

View File

@ -10,7 +10,7 @@ use std::{convert::Infallible, fmt::Debug, io};
use tracing_error::SpanTrace;
pub(crate) struct Error {
context: String,
context: SpanTrace,
kind: ErrorKind,
}
@ -26,14 +26,6 @@ impl Error {
pub(crate) fn is_bad_request(&self) -> bool {
matches!(self.kind, ErrorKind::Status(_, StatusCode::BAD_REQUEST))
}
pub(crate) fn is_gone(&self) -> bool {
matches!(self.kind, ErrorKind::Status(_, StatusCode::GONE))
}
pub(crate) fn is_malformed_json(&self) -> bool {
matches!(self.kind, ErrorKind::Json(_))
}
}
impl std::fmt::Debug for Error {
@ -61,7 +53,7 @@ where
{
fn from(error: T) -> Self {
Error {
context: SpanTrace::capture().to_string(),
context: SpanTrace::capture(),
kind: error.into(),
}
}
@ -85,7 +77,10 @@ pub(crate) enum ErrorKind {
ParseIri(#[from] activitystreams::iri_string::validate::Error),
#[error("Couldn't normalize IRI, {0}")]
NormalizeIri(#[from] std::collections::TryReserveError),
NormalizeIri(
#[from]
activitystreams::iri_string::task::Error<activitystreams::iri_string::normalize::Error>,
),
#[error("Couldn't perform IO, {0}")]
Io(#[from] io::Error),
@ -103,13 +98,13 @@ pub(crate) enum ErrorKind {
PrepareSign(#[from] PrepareSignError),
#[error("Couldn't sign digest")]
Signature(#[from] rsa::signature::Error),
Signature(#[from] signature::Error),
#[error("Couldn't read signature")]
ReadSignature(rsa::signature::Error),
ReadSignature(signature::Error),
#[error("Couldn't verify signature")]
VerifySignature(rsa::signature::Error),
VerifySignature(signature::Error),
#[error("Couldn't parse the signature header")]
HeaderValidation(#[from] actix_web::http::header::InvalidHeaderValue),
@ -130,7 +125,7 @@ pub(crate) enum ErrorKind {
BadActor(String, String),
#[error("Signature verification is required, but no signature was given")]
NoSignature(Option<String>),
NoSignature(String),
#[error("Wrong ActivityPub kind, {0}")]
Kind(String),
@ -201,8 +196,7 @@ impl ResponseError for Error {
ErrorKind::Kind(_)
| ErrorKind::MissingKind
| ErrorKind::MissingId
| ErrorKind::ObjectCount
| ErrorKind::NoSignature(_) => StatusCode::BAD_REQUEST,
| ErrorKind::ObjectCount => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}

View File

@ -80,7 +80,7 @@ impl Admin {
#[derive(Debug, thiserror::Error)]
#[error("Failed authentication")]
pub(crate) struct Error {
context: String,
context: SpanTrace,
#[source]
kind: ErrorKind,
}
@ -88,49 +88,49 @@ pub(crate) struct Error {
impl Error {
fn invalid() -> Self {
Error {
context: SpanTrace::capture().to_string(),
context: SpanTrace::capture(),
kind: ErrorKind::Invalid,
}
}
fn missing_config() -> Self {
Error {
context: SpanTrace::capture().to_string(),
context: SpanTrace::capture(),
kind: ErrorKind::MissingConfig,
}
}
fn missing_db() -> Self {
Error {
context: SpanTrace::capture().to_string(),
context: SpanTrace::capture(),
kind: ErrorKind::MissingDb,
}
}
fn bcrypt_verify(e: BcryptError) -> Self {
Error {
context: SpanTrace::capture().to_string(),
context: SpanTrace::capture(),
kind: ErrorKind::BCryptVerify(e),
}
}
fn bcrypt_hash(e: BcryptError) -> Self {
Error {
context: SpanTrace::capture().to_string(),
context: SpanTrace::capture(),
kind: ErrorKind::BCryptHash(e),
}
}
fn parse_header(e: ParseError) -> Self {
Error {
context: SpanTrace::capture().to_string(),
context: SpanTrace::capture(),
kind: ErrorKind::ParseHeader(e),
}
}
fn canceled(_: BlockingError) -> Self {
Error {
context: SpanTrace::capture().to_string(),
context: SpanTrace::capture(),
kind: ErrorKind::Canceled,
}
}

View File

@ -5,7 +5,6 @@ mod deliver_many;
mod instance;
mod nodeinfo;
mod process_listeners;
mod record_last_online;
pub(crate) use self::{
contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance,
@ -16,7 +15,7 @@ use crate::{
config::Config,
data::{ActorCache, MediaCache, NodeCache, State},
error::{Error, ErrorKind},
jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline},
jobs::process_listeners::Listeners,
requests::Requests,
};
use background_jobs::{
@ -63,7 +62,6 @@ pub(crate) fn create_workers(
.register::<QueryInstance>()
.register::<Listeners>()
.register::<QueryContact>()
.register::<RecordLastOnline>()
.register::<apub::Announce>()
.register::<apub::Follow>()
.register::<apub::Forward>()
@ -75,7 +73,6 @@ pub(crate) fn create_workers(
.start_with_threads(parallelism);
shared.every(Duration::from_secs(60 * 5), Listeners);
shared.every(Duration::from_secs(60 * 10), RecordLastOnline);
let job_server = JobServer::new(shared.queue_handle().clone());

View File

@ -36,13 +36,13 @@ async fn get_inboxes(
state.inboxes_without(&actor.inbox, &authority).await
}
fn prepare_activity<T, U, V>(
fn prepare_activity<T, U, V, Kind>(
mut t: T,
id: impl TryInto<IriString, Error = U>,
to: impl TryInto<IriString, Error = V>,
) -> Result<T, Error>
where
T: ObjectExt + BaseExt,
T: ObjectExt<Kind> + BaseExt<Kind>,
Error: From<U> + From<V>,
{
t.set_id(id.try_into()?)

View File

@ -42,7 +42,7 @@ impl Announce {
.queue(DeliverMany::new(inboxes, announce)?)
.await?;
state.state.cache(self.object_id, activity_id);
state.state.cache(self.object_id, activity_id).await;
Ok(())
}
}

View File

@ -42,7 +42,7 @@ impl QueryContact {
let contact = match state
.requests
.fetch::<AcceptedActors>(&self.contact_id)
.fetch::<AcceptedActors>(self.contact_id.as_str())
.await
{
Ok(contact) => contact,

View File

@ -35,7 +35,7 @@ impl Deliver {
#[tracing::instrument(name = "Deliver", skip(state))]
async fn permform(self, state: JobState) -> Result<(), Error> {
if let Err(e) = state.requests.deliver(&self.to, &self.data).await {
if let Err(e) = state.requests.deliver(self.to, &self.data).await {
if e.is_breaker() {
tracing::debug!("Not trying due to failed breaker");
return Ok(());

File diff suppressed because one or more lines are too long

View File

@ -39,11 +39,11 @@ impl QueryNodeinfo {
.authority_str()
.ok_or(ErrorKind::MissingDomain)?;
let scheme = self.actor_id.scheme_str();
let well_known_uri = iri!(format!("{scheme}://{authority}/.well-known/nodeinfo"));
let well_known_uri = iri!(format!("{}://{}/.well-known/nodeinfo", scheme, authority));
let well_known = match state
.requests
.fetch_json::<WellKnown>(&well_known_uri)
.fetch_json::<WellKnown>(well_known_uri.as_str())
.await
{
Ok(well_known) => well_known,
@ -55,7 +55,7 @@ impl QueryNodeinfo {
};
let href = if let Some(link) = well_known.links.into_iter().find(|l| l.rel.is_supported()) {
iri!(&link.href)
link.href
} else {
return Ok(());
};
@ -168,7 +168,7 @@ impl<'de> serde::de::Visitor<'de> for SupportedVersionVisitor {
type Value = SupportedVersion;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "a string starting with '{SUPPORTED_VERSIONS}'")
write!(f, "a string starting with '{}'", SUPPORTED_VERSIONS)
}
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
@ -187,7 +187,7 @@ impl<'de> serde::de::Visitor<'de> for SupportedNodeinfoVisitor {
type Value = SupportedNodeinfo;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "a string starting with '{SUPPORTED_NODEINFO}'")
write!(f, "a string starting with '{}'", SUPPORTED_NODEINFO)
}
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>

View File

@ -1,28 +0,0 @@
use crate::{error::Error, jobs::JobState};
use background_jobs::{ActixJob, Backoff};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct RecordLastOnline;
impl RecordLastOnline {
#[tracing::instrument(skip(state))]
async fn perform(self, state: JobState) -> Result<(), Error> {
let nodes = state.state.last_online.take();
state.state.db.mark_last_seen(nodes).await
}
}
impl ActixJob for RecordLastOnline {
type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::RecordLastOnline";
const QUEUE: &'static str = "maintenance";
const BACKOFF: Backoff = Backoff::Linear(1);
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -2,13 +2,10 @@
#![allow(clippy::needless_borrow)]
use activitystreams::iri_string::types::IriString;
use actix_rt::task::JoinHandle;
use actix_web::{middleware::Compress, web, App, HttpServer};
use collector::{DoubleRecorder, MemoryCollector};
use collector::MemoryCollector;
#[cfg(feature = "console")]
use console_subscriber::ConsoleLayer;
use http_signature_normalization_actix::middleware::VerifySignature;
use metrics_exporter_prometheus::PrometheusBuilder;
use opentelemetry::{sdk::Resource, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use rustls::ServerConfig;
@ -38,8 +35,8 @@ use self::{
data::{ActorCache, MediaCache, State},
db::Db,
jobs::create_workers,
middleware::{DebugPayload, MyVerify, RelayResolver, Timings},
routes::{actor, healthz, inbox, index, nodeinfo, nodeinfo_meta, statics},
middleware::{DebugPayload, RelayResolver, Timings},
routes::{actor, inbox, index, nodeinfo, nodeinfo_meta, statics},
};
fn init_subscriber(
@ -97,31 +94,19 @@ fn init_subscriber(
Ok(())
}
#[actix_rt::main]
async fn main() -> Result<(), anyhow::Error> {
fn main() -> Result<(), anyhow::Error> {
dotenv::dotenv().ok();
let config = Config::build()?;
init_subscriber(Config::software_name(), config.opentelemetry_url())?;
let collector = MemoryCollector::new();
collector.install()?;
let args = Args::new();
if args.any() {
return client_main(config, args).await?;
}
let collector = MemoryCollector::new();
if let Some(bind_addr) = config.prometheus_bind_address() {
let (recorder, exporter) = PrometheusBuilder::new()
.with_http_listener(bind_addr)
.build()?;
actix_rt::spawn(exporter);
DoubleRecorder::new(recorder, collector.clone()).install()?;
} else {
collector.install()?;
return client_main(config, args);
}
tracing::warn!("Opening DB");
@ -131,15 +116,16 @@ async fn main() -> Result<(), anyhow::Error> {
let actors = ActorCache::new(db.clone());
let media = MediaCache::new(db.clone());
server_main(db, actors, media, collector, config).await??;
server_main(db, actors, media, collector, config)?;
tracing::warn!("Application exit");
Ok(())
}
fn client_main(config: Config, args: Args) -> JoinHandle<Result<(), anyhow::Error>> {
actix_rt::spawn(do_client_main(config, args))
#[actix_rt::main]
async fn client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
actix_rt::spawn(do_client_main(config, args)).await?
}
async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
@ -156,39 +142,6 @@ async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error>
println!("Updated lists");
}
if args.contacted() {
let last_seen = admin::client::last_seen(&client, &config).await?;
let mut report = String::from("Contacted:");
if !last_seen.never.is_empty() {
report += "\nNever seen:\n";
}
for domain in last_seen.never {
report += "\t";
report += &domain;
report += "\n";
}
if !last_seen.last_seen.is_empty() {
report += "\nSeen:\n";
}
for (datetime, domains) in last_seen.last_seen {
for domain in domains {
report += "\t";
report += &datetime.to_string();
report += " - ";
report += &domain;
report += "\n";
}
}
report += "\n";
println!("{report}");
}
if args.list() {
let (blocked, allowed, connected) = tokio::try_join!(
admin::client::blocked(&client, &config),
@ -221,14 +174,15 @@ async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error>
Ok(())
}
fn server_main(
#[actix_rt::main]
async fn server_main(
db: Db,
actors: ActorCache,
media: MediaCache,
collector: MemoryCollector,
config: Config,
) -> JoinHandle<Result<(), anyhow::Error>> {
actix_rt::spawn(do_server_main(db, actors, media, collector, config))
) -> Result<(), anyhow::Error> {
actix_rt::spawn(do_server_main(db, actors, media, collector, config)).await?
}
async fn do_server_main(
@ -273,15 +227,15 @@ async fn do_server_main(
app.wrap(Compress::default())
.wrap(TracingLogger::default())
.wrap(Timings)
.route("/healthz", web::get().to(healthz))
.service(web::resource("/").route(web::get().to(index)))
.service(web::resource("/media/{path}").route(web::get().to(routes::media)))
.service(
web::resource("/inbox")
.wrap(config.digest_middleware())
.wrap(VerifySignature::new(
MyVerify(state.requests(&config), actors.clone(), state.clone()),
Default::default(),
.wrap(config.signature_middleware(
state.requests(&config),
actors.clone(),
state.clone(),
))
.wrap(DebugPayload(config.debug()))
.route(web::post().to(inbox)),
@ -304,8 +258,7 @@ async fn do_server_main(
.route("/allowed", web::get().to(admin::routes::allowed))
.route("/blocked", web::get().to(admin::routes::blocked))
.route("/connected", web::get().to(admin::routes::connected))
.route("/stats", web::get().to(admin::routes::stats))
.route("/last_seen", web::get().to(admin::routes::last_seen)),
.route("/stats", web::get().to(admin::routes::stats)),
),
)
});

View File

@ -6,12 +6,10 @@ use crate::{
};
use activitystreams::{base::BaseExt, iri, iri_string::types::IriString};
use actix_web::web;
use base64::{engine::general_purpose::STANDARD, Engine};
use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm};
use rsa::{
pkcs1v15::Signature, pkcs1v15::VerifyingKey, pkcs8::DecodePublicKey, sha2::Sha256,
signature::Verifier, RsaPublicKey,
};
use rsa::{pkcs1v15::VerifyingKey, pkcs8::DecodePublicKey, RsaPublicKey};
use sha2::{Digest, Sha256};
use signature::{DigestVerifier, Signature};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug)]
@ -67,17 +65,11 @@ impl MyVerify {
actor_id
} else {
match self.0.fetch::<PublicKeyResponse>(&public_key_id).await {
Ok(res) => res.actor_id().ok_or(ErrorKind::MissingId),
Err(e) => {
if e.is_gone() {
tracing::warn!("Actor gone: {public_key_id}");
return Ok(false);
} else {
return Err(e);
}
}
}?
self.0
.fetch::<PublicKeyResponse>(public_key_id.as_str())
.await?
.actor_id()
.ok_or(ErrorKind::MissingId)?
};
// Previously we verified the sig from an actor's local cache
@ -125,13 +117,13 @@ async fn do_verify(
let span = tracing::Span::current();
web::block(move || {
span.in_scope(|| {
let decoded = STANDARD.decode(signature)?;
let signature =
Signature::try_from(decoded.as_slice()).map_err(ErrorKind::ReadSignature)?;
let decoded = base64::decode(signature)?;
let signature = Signature::from_bytes(&decoded).map_err(ErrorKind::ReadSignature)?;
let hashed = Sha256::new_with_prefix(signing_string.as_bytes());
let verifying_key = VerifyingKey::<Sha256>::new_with_prefix(public_key);
let verifying_key = VerifyingKey::new_with_prefix(public_key);
verifying_key
.verify(signing_string.as_bytes(), &signature)
.verify_digest(hashed, &signature)
.map_err(ErrorKind::VerifySignature)?;
Ok(()) as Result<(), Error>
@ -167,20 +159,20 @@ mod tests {
use crate::apub::AcceptedActors;
use rsa::{pkcs8::DecodePublicKey, RsaPublicKey};
const ASONIX_DOG_ACTOR: &str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://w3id.org/security/v1",{"manuallyApprovesFollowers":"as:manuallyApprovesFollowers","toot":"http://joinmastodon.org/ns#","featured":{"@id":"toot:featured","@type":"@id"},"featuredTags":{"@id":"toot:featuredTags","@type":"@id"},"alsoKnownAs":{"@id":"as:alsoKnownAs","@type":"@id"},"movedTo":{"@id":"as:movedTo","@type":"@id"},"schema":"http://schema.org#","PropertyValue":"schema:PropertyValue","value":"schema:value","discoverable":"toot:discoverable","Device":"toot:Device","Ed25519Signature":"toot:Ed25519Signature","Ed25519Key":"toot:Ed25519Key","Curve25519Key":"toot:Curve25519Key","EncryptedMessage":"toot:EncryptedMessage","publicKeyBase64":"toot:publicKeyBase64","deviceId":"toot:deviceId","claim":{"@type":"@id","@id":"toot:claim"},"fingerprintKey":{"@type":"@id","@id":"toot:fingerprintKey"},"identityKey":{"@type":"@id","@id":"toot:identityKey"},"devices":{"@type":"@id","@id":"toot:devices"},"messageFranking":"toot:messageFranking","messageType":"toot:messageType","cipherText":"toot:cipherText","suspended":"toot:suspended"}],"id":"https://masto.asonix.dog/actor","type":"Application","inbox":"https://masto.asonix.dog/actor/inbox","outbox":"https://masto.asonix.dog/actor/outbox","preferredUsername":"masto.asonix.dog","url":"https://masto.asonix.dog/about/more?instance_actor=true","manuallyApprovesFollowers":true,"publicKey":{"id":"https://masto.asonix.dog/actor#main-key","owner":"https://masto.asonix.dog/actor","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n"},"endpoints":{"sharedInbox":"https://masto.asonix.dog/inbox"}}"#;
const KARJALAZET_RELAY: &str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://pleroma.karjalazet.se/schemas/litepub-0.1.jsonld",{"@language":"und"}],"alsoKnownAs":[],"attachment":[],"capabilities":{},"discoverable":false,"endpoints":{"oauthAuthorizationEndpoint":"https://pleroma.karjalazet.se/oauth/authorize","oauthRegistrationEndpoint":"https://pleroma.karjalazet.se/api/v1/apps","oauthTokenEndpoint":"https://pleroma.karjalazet.se/oauth/token","sharedInbox":"https://pleroma.karjalazet.se/inbox","uploadMedia":"https://pleroma.karjalazet.se/api/ap/upload_media"},"featured":"https://pleroma.karjalazet.se/relay/collections/featured","followers":"https://pleroma.karjalazet.se/relay/followers","following":"https://pleroma.karjalazet.se/relay/following","id":"https://pleroma.karjalazet.se/relay","inbox":"https://pleroma.karjalazet.se/relay/inbox","manuallyApprovesFollowers":false,"name":null,"outbox":"https://pleroma.karjalazet.se/relay/outbox","preferredUsername":"relay","publicKey":{"id":"https://pleroma.karjalazet.se/relay#main-key","owner":"https://pleroma.karjalazet.se/relay","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n"},"summary":"","tag":[],"type":"Person","url":"https://pleroma.karjalazet.se/relay"}"#;
const ASONIX_DOG_KEY: &str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n";
const KARJALAZET_KEY: &str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n";
const ASONIX_DOG_ACTOR: &'static str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://w3id.org/security/v1",{"manuallyApprovesFollowers":"as:manuallyApprovesFollowers","toot":"http://joinmastodon.org/ns#","featured":{"@id":"toot:featured","@type":"@id"},"featuredTags":{"@id":"toot:featuredTags","@type":"@id"},"alsoKnownAs":{"@id":"as:alsoKnownAs","@type":"@id"},"movedTo":{"@id":"as:movedTo","@type":"@id"},"schema":"http://schema.org#","PropertyValue":"schema:PropertyValue","value":"schema:value","discoverable":"toot:discoverable","Device":"toot:Device","Ed25519Signature":"toot:Ed25519Signature","Ed25519Key":"toot:Ed25519Key","Curve25519Key":"toot:Curve25519Key","EncryptedMessage":"toot:EncryptedMessage","publicKeyBase64":"toot:publicKeyBase64","deviceId":"toot:deviceId","claim":{"@type":"@id","@id":"toot:claim"},"fingerprintKey":{"@type":"@id","@id":"toot:fingerprintKey"},"identityKey":{"@type":"@id","@id":"toot:identityKey"},"devices":{"@type":"@id","@id":"toot:devices"},"messageFranking":"toot:messageFranking","messageType":"toot:messageType","cipherText":"toot:cipherText","suspended":"toot:suspended"}],"id":"https://masto.asonix.dog/actor","type":"Application","inbox":"https://masto.asonix.dog/actor/inbox","outbox":"https://masto.asonix.dog/actor/outbox","preferredUsername":"masto.asonix.dog","url":"https://masto.asonix.dog/about/more?instance_actor=true","manuallyApprovesFollowers":true,"publicKey":{"id":"https://masto.asonix.dog/actor#main-key","owner":"https://masto.asonix.dog/actor","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n"},"endpoints":{"sharedInbox":"https://masto.asonix.dog/inbox"}}"#;
const KARJALAZET_RELAY: &'static str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://pleroma.karjalazet.se/schemas/litepub-0.1.jsonld",{"@language":"und"}],"alsoKnownAs":[],"attachment":[],"capabilities":{},"discoverable":false,"endpoints":{"oauthAuthorizationEndpoint":"https://pleroma.karjalazet.se/oauth/authorize","oauthRegistrationEndpoint":"https://pleroma.karjalazet.se/api/v1/apps","oauthTokenEndpoint":"https://pleroma.karjalazet.se/oauth/token","sharedInbox":"https://pleroma.karjalazet.se/inbox","uploadMedia":"https://pleroma.karjalazet.se/api/ap/upload_media"},"featured":"https://pleroma.karjalazet.se/relay/collections/featured","followers":"https://pleroma.karjalazet.se/relay/followers","following":"https://pleroma.karjalazet.se/relay/following","id":"https://pleroma.karjalazet.se/relay","inbox":"https://pleroma.karjalazet.se/relay/inbox","manuallyApprovesFollowers":false,"name":null,"outbox":"https://pleroma.karjalazet.se/relay/outbox","preferredUsername":"relay","publicKey":{"id":"https://pleroma.karjalazet.se/relay#main-key","owner":"https://pleroma.karjalazet.se/relay","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n"},"summary":"","tag":[],"type":"Person","url":"https://pleroma.karjalazet.se/relay"}"#;
const ASONIX_DOG_KEY: &'static str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n";
const KARJALAZET_KEY: &'static str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n";
#[test]
fn handles_masto_keys() {
println!("{ASONIX_DOG_KEY}");
println!("{}", ASONIX_DOG_KEY);
let _ = RsaPublicKey::from_public_key_pem(ASONIX_DOG_KEY.trim()).unwrap();
}
#[test]
fn handles_pleromo_keys() {
println!("{KARJALAZET_KEY}");
println!("{}", KARJALAZET_KEY);
let _ = RsaPublicKey::from_public_key_pem(KARJALAZET_KEY.trim()).unwrap();
}

View File

@ -1,20 +1,13 @@
use crate::{
data::LastOnline,
error::{Error, ErrorKind},
};
use crate::error::{Error, ErrorKind};
use activitystreams::iri_string::types::IriString;
use actix_web::http::header::Date;
use awc::{error::SendRequestError, Client, ClientResponse};
use base64::{engine::general_purpose::STANDARD, Engine};
use dashmap::DashMap;
use http_signature_normalization_actix::prelude::*;
use rand::thread_rng;
use rsa::{
pkcs1v15::SigningKey,
sha2::{Digest, Sha256},
signature::RandomizedSigner,
RsaPrivateKey,
};
use rsa::{pkcs1v15::SigningKey, RsaPrivateKey};
use sha2::{Digest, Sha256};
use signature::RandomizedSigner;
use std::{
cell::RefCell,
rc::Rc,
@ -61,7 +54,7 @@ impl Breakers {
if let Some(mut breaker) = self.inner.get_mut(authority) {
breaker.fail();
if !breaker.should_try() {
tracing::warn!("Failed breaker for {authority}");
tracing::warn!("Failed breaker for {}", authority);
}
false
} else {
@ -153,7 +146,6 @@ pub(crate) struct Requests {
private_key: RsaPrivateKey,
config: Config,
breakers: Breakers,
last_online: Arc<LastOnline>,
}
impl std::fmt::Debug for Requests {
@ -182,7 +174,6 @@ impl Requests {
private_key: RsaPrivateKey,
user_agent: String,
breakers: Breakers,
last_online: Arc<LastOnline>,
) -> Self {
Requests {
client: Rc::new(RefCell::new(build_client(&user_agent))),
@ -193,7 +184,6 @@ impl Requests {
private_key,
config: Config::default().mastodon_compat(),
breakers,
last_online,
}
}
@ -229,13 +219,13 @@ impl Requests {
self.reset_err();
if res.status().is_server_error() {
if !res.status().is_success() {
self.breakers.fail(&parsed_url);
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
tracing::warn!("Response from {parsed_url}, {s}");
tracing::warn!("Response from {}, {}", parsed_url, s);
}
}
}
@ -243,55 +233,58 @@ impl Requests {
return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into());
}
self.last_online.mark_seen(&parsed_url);
self.breakers.succeed(&parsed_url);
Ok(res)
}
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch_json<T>(&self, url: &IriString) -> Result<T, Error>
pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
self.do_fetch(url, "application/json").await
}
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch_json_msky<T>(&self, url: &IriString) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let mut res = self
.do_deliver(
url,
&serde_json::json!({}),
"application/json",
"application/json",
)
.await?;
let body = res
.body()
.await
.map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?;
Ok(serde_json::from_slice(body.as_ref())?)
}
#[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch<T>(&self, url: &IriString) -> Result<T, Error>
pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
self.do_fetch(url, "application/activity+json").await
}
async fn do_fetch<T>(&self, url: &IriString, accept: &str) -> Result<T, Error>
async fn do_fetch<T>(&self, url: &str, accept: &str) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let mut res = self.do_fetch_response(url, accept).await?;
let parsed_url = url.parse::<IriString>()?;
if !self.breakers.should_try(&parsed_url) {
return Err(ErrorKind::Breaker.into());
}
let signer = self.signer();
let span = tracing::Span::current();
let client: Client = self.client.borrow().clone();
let res = client
.get(url)
.insert_header(("Accept", accept))
.insert_header(Date(SystemTime::now().into()))
.signature(
self.config.clone(),
self.key_id.clone(),
move |signing_string| {
span.record("signing_string", signing_string);
span.in_scope(|| signer.sign(signing_string))
},
)
.await?
.send()
.await;
let mut res = self.check_response(&parsed_url, res).await?;
let body = res
.body()
@ -302,16 +295,8 @@ impl Requests {
}
#[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))]
pub(crate) async fn fetch_response(&self, url: &IriString) -> Result<ClientResponse, Error> {
self.do_fetch_response(url, "*/*").await
}
pub(crate) async fn do_fetch_response(
&self,
url: &IriString,
accept: &str,
) -> Result<ClientResponse, Error> {
if !self.breakers.should_try(url) {
pub(crate) async fn fetch_response(&self, url: IriString) -> Result<ClientResponse, Error> {
if !self.breakers.should_try(&url) {
return Err(ErrorKind::Breaker.into());
}
@ -321,7 +306,7 @@ impl Requests {
let client: Client = self.client.borrow().clone();
let res = client
.get(url.as_str())
.insert_header(("Accept", accept))
.insert_header(("Accept", "*/*"))
.insert_header(Date(SystemTime::now().into()))
.no_decompress()
.signature(
@ -336,7 +321,7 @@ impl Requests {
.send()
.await;
let res = self.check_response(url, res).await?;
let res = self.check_response(&url, res).await?;
Ok(res)
}
@ -346,27 +331,7 @@ impl Requests {
skip_all,
fields(inbox = inbox.to_string().as_str(), signing_string)
)]
pub(crate) async fn deliver<T>(&self, inbox: &IriString, item: &T) -> Result<(), Error>
where
T: serde::ser::Serialize + std::fmt::Debug,
{
self.do_deliver(
inbox,
item,
"application/activity+json",
"application/activity+json",
)
.await?;
Ok(())
}
async fn do_deliver<T>(
&self,
inbox: &IriString,
item: &T,
content_type: &str,
accept: &str,
) -> Result<ClientResponse, Error>
pub(crate) async fn deliver<T>(&self, inbox: IriString, item: &T) -> Result<(), Error>
where
T: serde::ser::Serialize + std::fmt::Debug,
{
@ -381,8 +346,8 @@ impl Requests {
let client: Client = self.client.borrow().clone();
let (req, body) = client
.post(inbox.as_str())
.insert_header(("Accept", accept))
.insert_header(("Content-Type", content_type))
.insert_header(("Accept", "application/activity+json"))
.insert_header(("Content-Type", "application/activity+json"))
.insert_header(Date(SystemTime::now().into()))
.signature_with_digest(
self.config.clone(),
@ -399,9 +364,9 @@ impl Requests {
let res = req.send_body(body).await;
let res = self.check_response(inbox, res).await?;
self.check_response(&inbox, res).await?;
Ok(res)
Ok(())
}
fn signer(&self) -> Signer {
@ -418,8 +383,7 @@ struct Signer {
impl Signer {
fn sign(&self, signing_string: &str) -> Result<String, Error> {
let signing_key = SigningKey::<Sha256>::new_with_prefix(self.private_key.clone());
let signature =
signing_key.try_sign_with_rng(&mut thread_rng(), signing_string.as_bytes())?;
Ok(STANDARD.encode(signature.as_ref()))
let signature = signing_key.try_sign_with_rng(thread_rng(), signing_string.as_bytes())?;
Ok(base64::encode(signature.as_ref()))
}
}

View File

@ -1,5 +1,4 @@
mod actor;
mod healthz;
mod inbox;
mod index;
mod media;
@ -8,7 +7,6 @@ mod statics;
pub(crate) use self::{
actor::route as actor,
healthz::route as healthz,
inbox::route as inbox,
index::route as index,
media::route as media,

View File

@ -1,7 +0,0 @@
use crate::{data::State, error::Error};
use actix_web::{web, HttpResponse};
pub(crate) async fn route(state: web::Data<State>) -> Result<HttpResponse, Error> {
state.db.check_health().await?;
Ok(HttpResponse::Ok().finish())
}

View File

@ -16,8 +16,7 @@ use activitystreams::{
use actix_web::{web, HttpResponse};
use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified};
#[tracing::instrument(name = "Inbox", skip_all, fields(id = tracing::field::debug(&input.id_unchecked()), kind = tracing::field::debug(&input.kind())))]
#[allow(clippy::too_many_arguments)]
#[tracing::instrument(name = "Inbox", skip_all)]
pub(crate) async fn route(
state: web::Data<State>,
actors: web::Data<ActorCache>,
@ -25,48 +24,22 @@ pub(crate) async fn route(
client: web::Data<Requests>,
jobs: web::Data<JobServer>,
input: web::Json<AcceptedActivities>,
digest_verified: Option<DigestVerified>,
signature_verified: Option<SignatureVerified>,
verified: Option<(SignatureVerified, DigestVerified)>,
) -> Result<HttpResponse, Error> {
let input = input.into_inner();
let kind = input.kind().ok_or(ErrorKind::MissingKind)?;
let actor = actors
.get(
input.actor()?.as_single_id().ok_or(ErrorKind::MissingId)?,
&client,
)
.await?
.into_inner();
if digest_verified.is_some() && signature_verified.is_none() && *kind == ValidTypes::Delete {
return Ok(accepted(serde_json::json!({})));
} else if config.validate_signatures()
&& (digest_verified.is_none() || signature_verified.is_none())
{
return Err(ErrorKind::NoSignature(None).into());
}
let is_allowed = state.db.is_allowed(actor.id.clone());
let is_connected = state.db.is_connected(actor.id.clone());
let actor_id = if input.id_unchecked().is_some() {
input.actor()?.as_single_id().ok_or(ErrorKind::MissingId)?
} else {
input
.actor_unchecked()
.as_single_id()
.ok_or(ErrorKind::MissingId)?
};
let actor = actors.get(actor_id, &client).await?.into_inner();
if let Some(verified) = signature_verified {
if actor.public_key_id.as_str() != verified.key_id() {
tracing::error!("Actor signed with wrong key");
return Err(ErrorKind::BadActor(
actor.public_key_id.to_string(),
verified.key_id().to_owned(),
)
.into());
}
} else if config.validate_signatures() {
tracing::error!("This case should never be reachable, since I handle signature checks earlier in the flow. If you see this in a log it means I did it wrong");
return Err(ErrorKind::NoSignature(Some(actor.public_key_id.to_string())).into());
}
let is_allowed = state.db.is_allowed(actor.id.clone()).await?;
let is_connected = state.db.is_connected(actor.id.clone()).await?;
let (is_allowed, is_connected) = tokio::try_join!(is_allowed, is_connected)?;
if !is_allowed {
return Err(ErrorKind::NotAllowed(actor.id.to_string()).into());
@ -76,16 +49,29 @@ pub(crate) async fn route(
return Err(ErrorKind::NotSubscribed(actor.id.to_string()).into());
}
match kind {
if config.validate_signatures() && verified.is_none() {
return Err(ErrorKind::NoSignature(actor.public_key_id.to_string()).into());
} else if config.validate_signatures() {
if let Some((verified, _)) = verified {
if actor.public_key_id.as_str() != verified.key_id() {
tracing::error!("Bad actor, more info: {:?}", input);
return Err(ErrorKind::BadActor(
actor.public_key_id.to_string(),
verified.key_id().to_owned(),
)
.into());
}
}
}
match input.kind().ok_or(ErrorKind::MissingKind)? {
ValidTypes::Accept => handle_accept(&config, input).await?,
ValidTypes::Reject => handle_reject(&config, &jobs, input, actor).await?,
ValidTypes::Announce | ValidTypes::Create => {
handle_announce(&state, &jobs, input, actor).await?
}
ValidTypes::Follow => handle_follow(&config, &jobs, input, actor).await?,
ValidTypes::Add | ValidTypes::Delete | ValidTypes::Remove | ValidTypes::Update => {
handle_forward(&jobs, input, actor).await?
}
ValidTypes::Delete | ValidTypes::Update => handle_forward(&jobs, input, actor).await?,
ValidTypes::Undo => handle_undo(&config, &jobs, input, actor, is_connected).await?,
};
@ -217,7 +203,7 @@ async fn handle_announce(
.as_single_id()
.ok_or(ErrorKind::MissingId)?;
if state.is_cached(object_id) {
if state.is_cached(object_id).await {
return Err(ErrorKind::Duplicate.into());
}

View File

@ -71,7 +71,7 @@ pub(crate) async fn route(
let mut buf = BufWriter::new(Vec::new());
crate::templates::index_html(&mut buf, &local, &nodes, &config)?;
crate::templates::index(&mut buf, &local, &nodes, &config)?;
let html = buf.into_inner().map_err(|e| {
tracing::error!("Error rendering template, {}", e.error());
ErrorKind::FlushBuffer

View File

@ -11,7 +11,7 @@ pub(crate) async fn route(
let uuid = uuid.into_inner();
if let Some(url) = media.get_url(uuid).await? {
let res = requests.fetch_response(&url).await?;
let res = requests.fetch_response(url).await?;
let mut response = HttpResponse::build(res.status());

View File

@ -24,18 +24,18 @@ struct Links {
links: Vec<Link>,
}
#[tracing::instrument(name = "NodeInfo", skip_all)]
#[tracing::instrument(name = "NodeInfo")]
pub(crate) async fn route(
config: web::Data<Config>,
state: web::Data<State>,
) -> web::Json<NodeInfo> {
let inboxes = state.db.inboxes().await;
let blocks = if config.publish_blocks() {
Some(state.db.blocks().await.unwrap_or_default())
} else {
None
};
let (inboxes, blocks) = tokio::join!(state.db.inboxes(), async {
if config.publish_blocks() {
Some(state.db.blocks().await.unwrap_or_default())
} else {
None
}
});
let peers = inboxes
.unwrap_or_default()
@ -44,8 +44,6 @@ pub(crate) async fn route(
.map(|s| s.to_owned())
.collect();
let open_registrations = !config.restricted_mode();
web::Json(NodeInfo {
version: NodeInfoVersion,
software: Software {
@ -57,7 +55,7 @@ pub(crate) async fn route(
inbound: vec![],
outbound: vec![],
},
open_registrations,
open_registrations: false,
usage: Usage {
users: Users {
total: 1,

View File

@ -5,7 +5,7 @@ use actix_web::{
};
#[allow(clippy::async_yields_async)]
#[tracing::instrument(name = "Statics")]
#[tracing::instrument(name = "Statistics")]
pub(crate) async fn route(filename: web::Path<String>) -> HttpResponse {
if let Some(data) = StaticFile::get(&filename.into_inner()) {
HttpResponse::Ok()

View File

@ -89,19 +89,19 @@ async fn answer(bot: Bot, msg: Message, cmd: Command, db: Db) -> ResponseResult<
.await?;
}
Command::Block { domain } if db.add_blocks(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{domain} has been blocked"))
bot.send_message(msg.chat.id, format!("{} has been blocked", domain))
.await?;
}
Command::Unblock { domain } if db.remove_blocks(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{domain} has been unblocked"))
bot.send_message(msg.chat.id, format!("{} has been unblocked", domain))
.await?;
}
Command::Allow { domain } if db.add_allows(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{domain} has been allowed"))
bot.send_message(msg.chat.id, format!("{} has been allowed", domain))
.await?;
}
Command::Disallow { domain } if db.remove_allows(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{domain} has been disallowed"))
bot.send_message(msg.chat.id, format!("{} has been disallowed", domain))
.await?;
}
Command::ListAllowed => {

View File

@ -1,15 +0,0 @@
[Unit]
Description=Activitypub Relay
Documentation=https://git.asonix.dog/asonix/relay
Wants=network.target
After=network.target
[Install]
WantedBy=multi-user.target
[Service]
Type=simple
EnvironmentFile=/etc/systemd/system/example-relay.service.env
ExecStart=/path/to/relay
Restart=always

View File

@ -1,19 +0,0 @@
HOSTNAME='relay.example.com'
ADDR='0.0.0.0'
PORT='8080'
RESTRICTED_MODE='true'
VALIDATE_SIGNATURES='true'
HTTPS='true'
PRETTY_LOG='false'
PUBLISH_BLOCKS='true'
DEBUG='false'
SLED_PATH='/opt/sled'
TELEGRAM_ADMIN_HANDLE='myhandle'
RUST_BACKTRACE='full'
FOOTER_BLURB='Contact <a href="https://masto.example.com/@example">@example</a> for inquiries.'
LOCAL_DOMAINS='masto.example.com'
LOCAL_BLURB='<p>An ActivityPub relay for servers. Currently running somewhere. Let me know if you want to join!</p>'
OPENTELEMETRY_URL='http://otel.example.com:4317'
API_TOKEN='blahblahblahblahblahblahblah'
TELEGRAM_TOKEN='blahblahblahblahblahblahblah'

View File

@ -1,11 +0,0 @@
[Unit]
Description=Activitypub Relay Socket
Before=multi-user.target
After=network.target
[Socket]
Service=example-relay.service
ListenStream=8080
[Install]
WantedBy=sockets.target

View File

@ -6,7 +6,7 @@
<div class="admin">
<div class="left">
<figure class="avatar">
<img loading="lazy" src="@contact.avatar" alt="@contact.display_name's avatar">
<img src="@contact.avatar" alt="@contact.display_name's avatar">
</figure>
</div>
<div class="right">

View File

@ -1,7 +1,7 @@
@use crate::{
config::{Config, UrlKind},
data::Node,
templates::{info_html, instance_html, statics::index_css},
templates::{info, instance, statics::index_css},
};
@(local: &[Node], nodes: &[Node], config: &Config)
@ -9,7 +9,7 @@ templates::{info_html, instance_html, statics::index_css},
<!doctype html>
<html>
<head lang="fr">
<head lang="en">
<meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" />
<title>@config.hostname() | Relais ActivityPub</title>
@ -39,13 +39,13 @@ templates::{info_html, instance_html, statics::index_css},
@for node in local {
@if let Some(inst) = node.instance.as_ref() {
<li>
@:instance_html(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(),
@:instance(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(),
&node.base)
</li>
} else {
@if let Some(inf) = node.info.as_ref() {
<li>
@:info_html(inf, &node.base)
@:info(inf, &node.base)
</li>
}
}
@ -94,13 +94,13 @@ templates::{info_html, instance_html, statics::index_css},
@for node in nodes {
@if let Some(inst) = node.instance.as_ref() {
<li>
@:instance_html(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(),
@:instance(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(),
&node.base)
</li>
} else {
@if let Some(inf) = node.info.as_ref() {
<li>
@:info_html(inf, &node.base)
@:info(inf, &node.base)
</li>
}
}

View File

@ -1,4 +1,4 @@
@use crate::{db::{Contact, Instance}, templates::admin_html};
@use crate::{db::{Contact, Instance}, templates::admin};
@use activitystreams::iri_string::types::IriString;
@(instance: &Instance, software: Option<&str>, contact: Option<&Contact>, base: &IriString)
@ -32,8 +32,8 @@
}
</div>
@if let Some(contact) = contact {
<h5 class="instance-admin">Administré par:</h5>
@:admin_html(contact, base)
<h5 class="instance-admin">Administré par:</h5>
@:admin(contact, base)
}
}
</section>