Merge tag 'v0.3.79' into max

This commit is contained in:
Maxime Augier 2023-01-19 21:14:20 +01:00
commit b94f792a19
42 changed files with 1331 additions and 970 deletions

2
.env
View File

@ -9,3 +9,5 @@ FOOTER_BLURB="Opéré par <a href=\"https://mastodon.xolus.net/@max\">@max</a>"
LOCAL_DOMAINS="xolus.net" LOCAL_DOMAINS="xolus.net"
LOCAL_BLURB="<p>Relais ActivityPub francophone</p>" LOCAL_BLURB="<p>Relais ActivityPub francophone</p>"
# OPENTELEMETRY_URL=http://localhost:4317 # OPENTELEMETRY_URL=http://localhost:4317
PROMETHEUS_ADDR=127.0.0.1
PROMETHEUS_PORT=9000

434
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,7 @@
[package] [package]
name = "ap-relay" name = "ap-relay"
description = "A simple activitypub relay" description = "A simple activitypub relay"
version = "0.3.66" version = "0.3.79"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0" license = "AGPL-3.0"
readme = "README.md" readme = "README.md"
@ -29,8 +29,8 @@ actix-web = { version = "4.0.1", default-features = false, features = [
"compress-gzip", "compress-gzip",
] } ] }
actix-webfinger = "0.4.0" actix-webfinger = "0.4.0"
activitystreams = "0.7.0-alpha.19" activitystreams = "0.7.0-alpha.21"
activitystreams-ext = "0.1.0-alpha.2" activitystreams-ext = "0.1.0-alpha.3"
ammonia = "3.1.0" ammonia = "3.1.0"
awc = { version = "3.0.0", default-features = false, features = ["rustls"] } awc = { version = "3.0.0", default-features = false, features = ["rustls"] }
bcrypt = "0.13" bcrypt = "0.13"
@ -41,8 +41,11 @@ console-subscriber = { version = "0.1", optional = true }
dashmap = "5.1.0" dashmap = "5.1.0"
dotenv = "0.15.0" dotenv = "0.15.0"
futures-util = "0.3.17" futures-util = "0.3.17"
lru = "0.8.0" lru = "0.9.0"
metrics = "0.20.1" metrics = "0.20.1"
metrics-exporter-prometheus = { version = "0.11.0", default-features = false, features = [
"http-listener",
] }
metrics-util = "0.14.0" metrics-util = "0.14.0"
mime = "0.3.16" mime = "0.3.16"
minify-html = "0.10.0" minify-html = "0.10.0"
@ -66,6 +69,7 @@ teloxide = { version = "0.11.1", default-features = false, features = [
"rustls", "rustls",
] } ] }
thiserror = "1.0" thiserror = "1.0"
time = { version = "0.3.17", features = ["serde"] }
tracing = "0.1" tracing = "0.1"
tracing-awc = "0.1.6" tracing-awc = "0.1.6"
tracing-error = "0.2" tracing-error = "0.2"
@ -86,12 +90,12 @@ default-features = false
features = ["background-jobs-actix", "error-logging"] features = ["background-jobs-actix", "error-logging"]
[dependencies.http-signature-normalization-actix] [dependencies.http-signature-normalization-actix]
version = "0.6.0" version = "0.8.0"
default-features = false default-features = false
features = ["client", "server", "sha-2"] features = ["client", "server", "sha-2"]
[dependencies.tracing-actix-web] [dependencies.tracing-actix-web]
version = "0.6.1" version = "0.7.0"
[build-dependencies] [build-dependencies]
anyhow = "1.0" anyhow = "1.0"

View File

@ -10,7 +10,7 @@ $ sudo docker run --rm -it \
-e ADDR=0.0.0.0 \ -e ADDR=0.0.0.0 \
-e SLED_PATH=/mnt/sled/db-0.34 \ -e SLED_PATH=/mnt/sled/db-0.34 \
-p 8080:8080 \ -p 8080:8080 \
asonix/relay:0.3.52 asonix/relay:0.3.78
``` ```
This will launch the relay with the database stored in "./sled/db-0.34" and listening on port 8080 This will launch the relay with the database stored in "./sled/db-0.34" and listening on port 8080
#### Cargo #### Cargo
@ -103,6 +103,8 @@ TLS_CERT=/path/to/cert
FOOTER_BLURB="Contact <a href=\"https://masto.asonix.dog/@asonix\">@asonix</a> for inquiries" FOOTER_BLURB="Contact <a href=\"https://masto.asonix.dog/@asonix\">@asonix</a> for inquiries"
LOCAL_DOMAINS=masto.asonix.dog LOCAL_DOMAINS=masto.asonix.dog
LOCAL_BLURB="<p>Welcome to my cool relay where I have cool relay things happening. I hope you enjoy your stay!</p>" LOCAL_BLURB="<p>Welcome to my cool relay where I have cool relay things happening. I hope you enjoy your stay!</p>"
PROMETHEUS_ADDR=0.0.0.0
PROMETHEUS_PORT=9000
``` ```
#### Descriptions #### Descriptions
@ -128,6 +130,8 @@ Where to store the on-disk database of connected servers. This defaults to `./sl
The log level to print. Available levels are `ERROR`, `WARN`, `INFO`, `DEBUG`, and `TRACE`. You can also specify module paths to enable some logs but not others, such as `RUST_LOG=warn,tracing_actix_web=info,relay=info`. This defaults to `warn` The log level to print. Available levels are `ERROR`, `WARN`, `INFO`, `DEBUG`, and `TRACE`. You can also specify module paths to enable some logs but not others, such as `RUST_LOG=warn,tracing_actix_web=info,relay=info`. This defaults to `warn`
##### `SOURCE_REPO` ##### `SOURCE_REPO`
The URL to the source code for the relay. This defaults to `https://git.asonix.dog/asonix/relay`, but should be changed if you're running a fork hosted elsewhere. The URL to the source code for the relay. This defaults to `https://git.asonix.dog/asonix/relay`, but should be changed if you're running a fork hosted elsewhere.
##### `REPOSITORY_COMMIT_BASE`
The base path of the repository commit hash reference. For example, `/src/commit/` for Gitea, `/tree/` for GitLab.
##### `API_TOKEN` ##### `API_TOKEN`
The Secret token used to access the admin APIs. This must be set for the commandline to function The Secret token used to access the admin APIs. This must be set for the commandline to function
##### `OPENTELEMETRY_URL` ##### `OPENTELEMETRY_URL`
@ -146,6 +150,10 @@ Optional - Add custom notes in the footer of the page
Optional - domains of mastodon servers run by the same admin as the relay Optional - domains of mastodon servers run by the same admin as the relay
##### `LOCAL_BLURB` ##### `LOCAL_BLURB`
Optional - description for the relay Optional - description for the relay
##### `PROMETHEUS_ADDR`
Optional - Address to bind to for serving the prometheus scrape endpoint
##### `PROMETHEUS_PORT`
Optional - Port to bind to for serving the prometheus scrape endpoint
### Subscribing ### Subscribing
Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings. Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings.
@ -165,10 +173,13 @@ example, if the server is `https://relay.my.tld`, the correct URL would be
- Follow Public, become a listener of the relay - Follow Public, become a listener of the relay
- Undo Follow {self-actor}, stop listening on the relay, an Undo Follow will be sent back - Undo Follow {self-actor}, stop listening on the relay, an Undo Follow will be sent back
- Undo Follow Public, stop listening on the relay - Undo Follow Public, stop listening on the relay
- Delete {anything}, the Delete {anything} is relayed verbatim to listening servers - Delete {anything}, the Delete {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature signed with a JSON-LD signature
- Update {anything}, the Update {anything} is relayed verbatim to listening servers - Update {anything}, the Update {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature
- Add {anything}, the Add {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature signed with a JSON-LD signature

View File

@ -1,41 +0,0 @@
ARG REPO_ARCH=amd64
# cross-build environment
FROM asonix/rust-builder:$REPO_ARCH-latest AS builder
ARG TAG=main
ARG BINARY=relay
ARG PROJECT=relay
ARG GIT_REPOSITORY=https://git.asonix.dog/asonix/$PROJECT
ENV \
BINARY=${BINARY}
ADD \
--chown=build:build \
$GIT_REPOSITORY/archive/$TAG.tar.gz \
/opt/build/repo.tar.gz
RUN \
tar zxf repo.tar.gz
WORKDIR /opt/build/$PROJECT
RUN \
build
# production environment
FROM asonix/rust-runner:$REPO_ARCH-latest
ARG BINARY=relay
ENV \
BINARY=${BINARY}
COPY \
--from=builder \
/opt/build/binary \
/usr/bin/${BINARY}
ENTRYPOINT ["/sbin/tini", "--"]
CMD /usr/bin/${BINARY}

View File

@ -1,37 +0,0 @@
#!/usr/bin/env bash
function require() {
if [ "$1" = "" ]; then
echo "input '$2' required"
print_help
exit 1
fi
}
function print_help() {
echo "deploy.sh"
echo ""
echo "Usage:"
echo " deploy.sh [repo] [tag] [arch]"
echo ""
echo "Args:"
echo " repo: The docker repository to publish the image"
echo " tag: The tag applied to the docker image"
echo " arch: The architecuture of the doker image"
}
REPO=$1
TAG=$2
ARCH=$3
require "$REPO" repo
require "$TAG" tag
require "$ARCH" arch
sudo docker build \
--pull \
--build-arg TAG=$TAG \
--build-arg REPO_ARCH=$ARCH \
-t $REPO:$ARCH-$TAG \
-f Dockerfile \
.

View File

@ -1,87 +0,0 @@
#!/usr/bin/env bash
function require() {
if [ "$1" = "" ]; then
echo "input '$2' required"
print_help
exit 1
fi
}
function print_help() {
echo "deploy.sh"
echo ""
echo "Usage:"
echo " deploy.sh [tag] [branch] [push]"
echo ""
echo "Args:"
echo " tag: The git tag to be applied to the repository and docker build"
echo " branch: The git branch to use for tagging and publishing"
echo " push: Whether or not to push the image"
echo ""
echo "Examples:"
echo " ./deploy.sh v0.3.0-alpha.13 main true"
echo " ./deploy.sh v0.3.0-alpha.13-shell-out asonix/shell-out false"
}
function build_image() {
tag=$1
arch=$2
push=$3
./build-image.sh asonix/relay $tag $arch
sudo docker tag asonix/relay:$arch-$tag asonix/relay:$arch-latest
if [ "$push" == "true" ]; then
sudo docker push asonix/relay:$arch-$tag
sudo docker push asonix/relay:$arch-latest
fi
}
# Creating the new tag
new_tag="$1"
branch="$2"
push=$3
require "$new_tag" "tag"
require "$branch" "branch"
require "$push" "push"
if ! sudo docker run --rm -it arm64v8/alpine:3.11 /bin/sh -c 'echo "docker is configured correctly"'
then
echo "docker is not configured to run on qemu-emulated architectures, fixing will require sudo"
sudo docker run --rm --privileged multiarch/qemu-user-static --reset -p yes
fi
set -xe
git checkout $branch
# Changing the docker-compose prod
sed -i "s/asonix\/relay:.*/asonix\/relay:$new_tag/" docker-compose.yml
git add ../prod/docker-compose.yml
# The commit
git commit -m"Version $new_tag"
git tag $new_tag
# Push
git push origin $new_tag
git push
# Build for arm64v8, arm32v7 and amd64
build_image $new_tag arm64v8 $push
build_image $new_tag arm32v7 $push
build_image $new_tag amd64 $push
# Build for other archs
# TODO
if [ "$push" == "true" ]; then
./manifest.sh relay $new_tag
./manifest.sh relay latest
# pushd ../../
# cargo publish
# popd
fi

View File

@ -2,7 +2,7 @@ version: '3.3'
services: services:
relay: relay:
image: asonix/relay:v0.3.8 image: asonix/relay:v0.3.73
ports: ports:
- "8079:8079" - "8079:8079"
restart: always restart: always
@ -14,6 +14,7 @@ services:
- RESTRICTED_MODE=false - RESTRICTED_MODE=false
- VALIDATE_SIGNATURES=true - VALIDATE_SIGNATURES=true
- HTTPS=true - HTTPS=true
- DATABASE_URL=postgres://pg_user:pg_pass@pg_host:pg_port/pg_database - SLED_PATH=/mnt/sled/db-0.34
- PRETTY_LOG=false - PRETTY_LOG=false
- PUBLISH_BLOCKS=true - PUBLISH_BLOCKS=true
- API_TOKEN=somepasswordishtoken

View File

@ -1,43 +0,0 @@
#!/usr/bin/env bash
function require() {
if [ "$1" = "" ]; then
echo "input '$2' required"
print_help
exit 1
fi
}
function print_help() {
echo "deploy.sh"
echo ""
echo "Usage:"
echo " manifest.sh [repo] [tag]"
echo ""
echo "Args:"
echo " repo: The docker repository to update"
echo " tag: The git tag to be applied to the image manifest"
}
REPO=$1
TAG=$2
require "$REPO" "repo"
require "$TAG" "tag"
set -xe
sudo docker manifest create asonix/$REPO:$TAG \
-a asonix/$REPO:arm64v8-$TAG \
-a asonix/$REPO:arm32v7-$TAG \
-a asonix/$REPO:amd64-$TAG
sudo docker manifest annotate asonix/$REPO:$TAG \
asonix/$REPO:arm64v8-$TAG --os linux --arch arm64 --variant v8
sudo docker manifest annotate asonix/$REPO:$TAG \
asonix/$REPO:arm32v7-$TAG --os linux --arch arm --variant v7
sudo docker manifest annotate asonix/$REPO:$TAG \
asonix/$REPO:amd64-$TAG --os linux --arch amd64
sudo docker manifest push asonix/$REPO:$TAG --purge

View File

@ -1,4 +1,6 @@
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use std::collections::{BTreeMap, BTreeSet};
use time::OffsetDateTime;
pub mod client; pub mod client;
pub mod routes; pub mod routes;
@ -22,3 +24,9 @@ pub(crate) struct BlockedDomains {
pub(crate) struct ConnectedActors { pub(crate) struct ConnectedActors {
pub(crate) connected_actors: Vec<IriString>, pub(crate) connected_actors: Vec<IriString>,
} }
#[derive(serde::Deserialize, serde::Serialize)]
pub(crate) struct LastSeen {
pub(crate) last_seen: BTreeMap<OffsetDateTime, BTreeSet<String>>,
pub(crate) never: Vec<String>,
}

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains}, admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains, LastSeen},
collector::Snapshot, collector::Snapshot,
config::{AdminUrlKind, Config}, config::{AdminUrlKind, Config},
error::{Error, ErrorKind}, error::{Error, ErrorKind},
@ -55,6 +55,10 @@ pub(crate) async fn stats(client: &Client, config: &Config) -> Result<Snapshot,
get_results(client, config, AdminUrlKind::Stats).await get_results(client, config, AdminUrlKind::Stats).await
} }
pub(crate) async fn last_seen(client: &Client, config: &Config) -> Result<LastSeen, Error> {
get_results(client, config, AdminUrlKind::LastSeen).await
}
async fn get_results<T: DeserializeOwned>( async fn get_results<T: DeserializeOwned>(
client: &Client, client: &Client,
config: &Config, config: &Config,

View File

@ -1,5 +1,5 @@
use crate::{ use crate::{
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains}, admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains, LastSeen},
collector::{MemoryCollector, Snapshot}, collector::{MemoryCollector, Snapshot},
error::Error, error::Error,
extractors::Admin, extractors::Admin,
@ -8,6 +8,8 @@ use actix_web::{
web::{Data, Json}, web::{Data, Json},
HttpResponse, HttpResponse,
}; };
use std::collections::{BTreeMap, BTreeSet};
use time::OffsetDateTime;
pub(crate) async fn allow( pub(crate) async fn allow(
admin: Admin, admin: Admin,
@ -69,3 +71,20 @@ pub(crate) async fn stats(
) -> Result<Json<Snapshot>, Error> { ) -> Result<Json<Snapshot>, Error> {
Ok(Json(collector.snapshot())) Ok(Json(collector.snapshot()))
} }
pub(crate) async fn last_seen(admin: Admin) -> Result<Json<LastSeen>, Error> {
let nodes = admin.db_ref().last_seen().await?;
let mut last_seen: BTreeMap<OffsetDateTime, BTreeSet<String>> = BTreeMap::new();
let mut never = Vec::new();
for (domain, datetime) in nodes {
if let Some(datetime) = datetime {
last_seen.entry(datetime).or_default().insert(domain);
} else {
never.push(domain);
}
}
Ok(Json(LastSeen { last_seen, never }))
}

View File

@ -34,11 +34,13 @@ pub struct PublicKey {
#[serde(rename_all = "PascalCase")] #[serde(rename_all = "PascalCase")]
pub enum ValidTypes { pub enum ValidTypes {
Accept, Accept,
Add,
Announce, Announce,
Create, Create,
Delete, Delete,
Follow, Follow,
Reject, Reject,
Remove,
Undo, Undo,
Update, Update,
} }

View File

@ -17,11 +17,22 @@ pub(crate) struct Args {
#[arg(short, long, help = "Get statistics from the server")] #[arg(short, long, help = "Get statistics from the server")]
stats: bool, stats: bool,
#[arg(
short,
long,
help = "List domains by when they were last succesfully contacted"
)]
contacted: bool,
} }
impl Args { impl Args {
pub(crate) fn any(&self) -> bool { pub(crate) fn any(&self) -> bool {
!self.blocks.is_empty() || !self.allowed.is_empty() || self.list || self.stats !self.blocks.is_empty()
|| !self.allowed.is_empty()
|| self.list
|| self.stats
|| self.contacted
} }
pub(crate) fn new() -> Self { pub(crate) fn new() -> Self {
@ -47,4 +58,8 @@ impl Args {
pub(crate) fn stats(&self) -> bool { pub(crate) fn stats(&self) -> bool {
self.stats self.stats
} }
pub(crate) fn contacted(&self) -> bool {
self.contacted
}
} }

View File

@ -6,6 +6,7 @@ fn git_info() {
if output.status.success() { if output.status.success() {
let git_hash = String::from_utf8_lossy(&output.stdout); let git_hash = String::from_utf8_lossy(&output.stdout);
println!("cargo:rustc-env=GIT_HASH={}", git_hash); println!("cargo:rustc-env=GIT_HASH={}", git_hash);
println!("cargo:rustc-env=GIT_SHORT_HASH={}", &git_hash[..8])
} }
} }
@ -23,7 +24,7 @@ fn git_info() {
fn version_info() -> Result<(), anyhow::Error> { fn version_info() -> Result<(), anyhow::Error> {
let cargo_toml = Path::new(&std::env::var("CARGO_MANIFEST_DIR")?).join("Cargo.toml"); let cargo_toml = Path::new(&std::env::var("CARGO_MANIFEST_DIR")?).join("Cargo.toml");
let mut file = File::open(&cargo_toml)?; let mut file = File::open(cargo_toml)?;
let mut cargo_data = String::new(); let mut cargo_data = String::new();
file.read_to_string(&mut cargo_data)?; file.read_to_string(&mut cargo_data)?;

View File

@ -1,414 +1,5 @@
use metrics::{Key, Recorder, SetRecorderError}; mod double;
use metrics_util::{ mod stats;
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
MetricKindMask, Summary,
};
use quanta::Clock;
use std::{
collections::{BTreeMap, HashMap},
sync::{atomic::Ordering, Arc, RwLock},
time::Duration,
};
const SECONDS: u64 = 1; pub(crate) use double::DoubleRecorder;
const MINUTES: u64 = 60 * SECONDS; pub(crate) use stats::{MemoryCollector, Snapshot};
const HOURS: u64 = 60 * MINUTES;
const DAYS: u64 = 24 * HOURS;
type DistributionMap = BTreeMap<Vec<(String, String)>, Summary>;
#[derive(Clone)]
pub struct MemoryCollector {
inner: Arc<Inner>,
}
struct Inner {
descriptions: RwLock<HashMap<String, metrics::SharedString>>,
distributions: RwLock<HashMap<String, DistributionMap>>,
recency: Recency<Key>,
registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Counter {
labels: BTreeMap<String, String>,
value: u64,
}
impl std::fmt::Display for Counter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ");
write!(f, "{} - {}", labels, self.value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Gauge {
labels: BTreeMap<String, String>,
value: f64,
}
impl std::fmt::Display for Gauge {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ");
write!(f, "{} - {}", labels, self.value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Histogram {
labels: BTreeMap<String, String>,
value: Vec<(f64, Option<f64>)>,
}
impl std::fmt::Display for Histogram {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ");
let value = self
.value
.iter()
.map(|(k, v)| {
if let Some(v) = v {
format!("{}: {:.6}", k, v)
} else {
format!("{}: None,", k)
}
})
.collect::<Vec<_>>()
.join(", ");
write!(f, "{} - {}", labels, value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Snapshot {
counters: HashMap<String, Vec<Counter>>,
gauges: HashMap<String, Vec<Gauge>>,
histograms: HashMap<String, Vec<Histogram>>,
}
const PAIRS: [((&str, &str), &str); 2] = [
(
(
"background-jobs.worker.started",
"background-jobs.worker.finished",
),
"background-jobs.worker.running",
),
(
(
"background-jobs.job.started",
"background-jobs.job.finished",
),
"background-jobs.job.running",
),
];
#[derive(Default)]
struct MergeCounter {
start: Option<Counter>,
finish: Option<Counter>,
}
impl MergeCounter {
fn merge(self) -> Option<Counter> {
match (self.start, self.finish) {
(Some(start), Some(end)) => Some(Counter {
labels: start.labels,
value: start.value.saturating_sub(end.value),
}),
(Some(only), None) => Some(only),
(None, Some(only)) => Some(Counter {
labels: only.labels,
value: 0,
}),
(None, None) => None,
}
}
}
impl Snapshot {
pub(crate) fn present(self) {
if !self.counters.is_empty() {
println!("Counters");
let mut merging = HashMap::new();
for (key, counters) in self.counters {
if let Some(((start, _), name)) = PAIRS
.iter()
.find(|((start, finish), _)| *start == key || *finish == key)
{
let entry = merging.entry(name).or_insert_with(HashMap::new);
for counter in counters {
let mut merge_counter = entry
.entry(counter.labels.clone())
.or_insert_with(MergeCounter::default);
if key == *start {
merge_counter.start = Some(counter);
} else {
merge_counter.finish = Some(counter);
}
}
continue;
}
println!("\t{}", key);
for counter in counters {
println!("\t\t{}", counter);
}
}
for (key, counters) in merging {
println!("\t{}", key);
for (_, counter) in counters {
if let Some(counter) = counter.merge() {
println!("\t\t{}", counter);
}
}
}
}
if !self.gauges.is_empty() {
println!("Gauges");
for (key, gauges) in self.gauges {
println!("\t{}", key);
for gauge in gauges {
println!("\t\t{}", gauge);
}
}
}
if !self.histograms.is_empty() {
println!("Histograms");
for (key, histograms) in self.histograms {
println!("\t{}", key);
for histogram in histograms {
println!("\t\t{}", histogram);
}
}
}
}
}
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
let labels = key
.labels()
.into_iter()
.map(|label| (label.key().to_string(), label.value().to_string()))
.collect();
let name = key.name().to_string();
(name, labels)
}
impl Inner {
fn snapshot_counters(&self) -> HashMap<String, Vec<Counter>> {
let mut counters = HashMap::new();
for (key, counter) in self.registry.get_counter_handles() {
let gen = counter.get_generation();
if !self.recency.should_store_counter(&key, gen, &self.registry) {
continue;
}
let (name, labels) = key_to_parts(&key);
let value = counter.get_inner().load(Ordering::Acquire);
counters.entry(name).or_insert_with(Vec::new).push(Counter {
labels: labels.into_iter().collect(),
value,
});
}
counters
}
fn snapshot_gauges(&self) -> HashMap<String, Vec<Gauge>> {
let mut gauges = HashMap::new();
for (key, gauge) in self.registry.get_gauge_handles() {
let gen = gauge.get_generation();
if !self.recency.should_store_gauge(&key, gen, &self.registry) {
continue;
}
let (name, labels) = key_to_parts(&key);
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
gauges.entry(name).or_insert_with(Vec::new).push(Gauge {
labels: labels.into_iter().collect(),
value,
})
}
gauges
}
fn snapshot_histograms(&self) -> HashMap<String, Vec<Histogram>> {
for (key, histogram) in self.registry.get_histogram_handles() {
let gen = histogram.get_generation();
let (name, labels) = key_to_parts(&key);
if !self
.recency
.should_store_histogram(&key, gen, &self.registry)
{
let mut d = self.distributions.write().unwrap();
let delete_by_name = if let Some(by_name) = d.get_mut(&name) {
by_name.remove(&labels);
by_name.is_empty()
} else {
false
};
drop(d);
if delete_by_name {
self.descriptions.write().unwrap().remove(&name);
}
continue;
}
let mut d = self.distributions.write().unwrap();
let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new);
let entry = outer_entry
.entry(labels)
.or_insert_with(Summary::with_defaults);
histogram.get_inner().clear_with(|samples| {
for sample in samples {
entry.add(*sample);
}
})
}
let d = self.distributions.read().unwrap().clone();
d.into_iter()
.map(|(key, value)| {
(
key,
value
.into_iter()
.map(|(labels, summary)| Histogram {
labels: labels.into_iter().collect(),
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
.into_iter()
.map(|q| (q, summary.quantile(q)))
.collect(),
})
.collect(),
)
})
.collect()
}
fn snapshot(&self) -> Snapshot {
Snapshot {
counters: self.snapshot_counters(),
gauges: self.snapshot_gauges(),
histograms: self.snapshot_histograms(),
}
}
}
impl MemoryCollector {
pub(crate) fn new() -> Self {
MemoryCollector {
inner: Arc::new(Inner {
descriptions: Default::default(),
distributions: Default::default(),
recency: Recency::new(
Clock::new(),
MetricKindMask::ALL,
Some(Duration::from_secs(5 * DAYS)),
),
registry: Registry::new(GenerationalStorage::atomic()),
}),
}
}
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
metrics::set_boxed_recorder(Box::new(self.clone()))
}
pub(crate) fn snapshot(&self) -> Snapshot {
self.inner.snapshot()
}
fn add_description_if_missing(
&self,
key: &metrics::KeyName,
description: metrics::SharedString,
) {
let mut d = self.inner.descriptions.write().unwrap();
d.entry(key.as_str().to_owned()).or_insert(description);
}
}
impl Recorder for MemoryCollector {
fn describe_counter(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn describe_gauge(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn describe_histogram(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn register_counter(&self, key: &Key) -> metrics::Counter {
self.inner
.registry
.get_or_create_counter(key, |c| c.clone().into())
}
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
self.inner
.registry
.get_or_create_gauge(key, |c| c.clone().into())
}
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
self.inner
.registry
.get_or_create_histogram(key, |c| c.clone().into())
}
}

133
src/collector/double.rs Normal file
View File

@ -0,0 +1,133 @@
use metrics::{CounterFn, GaugeFn, HistogramFn, Key, Recorder, SetRecorderError};
use std::sync::Arc;
#[derive(Clone)]
pub(crate) struct DoubleRecorder<R, S> {
first: R,
second: S,
}
struct DoubleCounter {
first: metrics::Counter,
second: metrics::Counter,
}
struct DoubleGauge {
first: metrics::Gauge,
second: metrics::Gauge,
}
struct DoubleHistogram {
first: metrics::Histogram,
second: metrics::Histogram,
}
impl<R, S> DoubleRecorder<R, S> {
pub(crate) fn new(first: R, second: S) -> Self {
DoubleRecorder { first, second }
}
pub(crate) fn install(self) -> Result<(), SetRecorderError>
where
R: Recorder + 'static,
S: Recorder + 'static,
{
metrics::set_boxed_recorder(Box::new(self))
}
}
impl<R, S> Recorder for DoubleRecorder<R, S>
where
R: Recorder,
S: Recorder,
{
fn describe_counter(
&self,
key: metrics::KeyName,
unit: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.first
.describe_counter(key.clone(), unit, description.clone());
self.second.describe_counter(key, unit, description);
}
fn describe_gauge(
&self,
key: metrics::KeyName,
unit: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.first
.describe_gauge(key.clone(), unit, description.clone());
self.second.describe_gauge(key, unit, description);
}
fn describe_histogram(
&self,
key: metrics::KeyName,
unit: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.first
.describe_histogram(key.clone(), unit, description.clone());
self.second.describe_histogram(key, unit, description);
}
fn register_counter(&self, key: &Key) -> metrics::Counter {
let first = self.first.register_counter(key);
let second = self.second.register_counter(key);
metrics::Counter::from_arc(Arc::new(DoubleCounter { first, second }))
}
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
let first = self.first.register_gauge(key);
let second = self.second.register_gauge(key);
metrics::Gauge::from_arc(Arc::new(DoubleGauge { first, second }))
}
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
let first = self.first.register_histogram(key);
let second = self.second.register_histogram(key);
metrics::Histogram::from_arc(Arc::new(DoubleHistogram { first, second }))
}
}
impl CounterFn for DoubleCounter {
fn increment(&self, value: u64) {
self.first.increment(value);
self.second.increment(value);
}
fn absolute(&self, value: u64) {
self.first.absolute(value);
self.second.absolute(value);
}
}
impl GaugeFn for DoubleGauge {
fn increment(&self, value: f64) {
self.first.increment(value);
self.second.increment(value);
}
fn decrement(&self, value: f64) {
self.first.decrement(value);
self.second.decrement(value);
}
fn set(&self, value: f64) {
self.first.set(value);
self.second.set(value);
}
}
impl HistogramFn for DoubleHistogram {
fn record(&self, value: f64) {
self.first.record(value);
self.second.record(value);
}
}

414
src/collector/stats.rs Normal file
View File

@ -0,0 +1,414 @@
use metrics::{Key, Recorder, SetRecorderError};
use metrics_util::{
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
MetricKindMask, Summary,
};
use quanta::Clock;
use std::{
collections::{BTreeMap, HashMap},
sync::{atomic::Ordering, Arc, RwLock},
time::Duration,
};
const SECONDS: u64 = 1;
const MINUTES: u64 = 60 * SECONDS;
const HOURS: u64 = 60 * MINUTES;
const DAYS: u64 = 24 * HOURS;
type DistributionMap = BTreeMap<Vec<(String, String)>, Summary>;
#[derive(Clone)]
pub struct MemoryCollector {
inner: Arc<Inner>,
}
struct Inner {
descriptions: RwLock<HashMap<String, metrics::SharedString>>,
distributions: RwLock<HashMap<String, DistributionMap>>,
recency: Recency<Key>,
registry: Registry<Key, GenerationalStorage<AtomicStorage>>,
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Counter {
labels: BTreeMap<String, String>,
value: u64,
}
impl std::fmt::Display for Counter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ");
write!(f, "{} - {}", labels, self.value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Gauge {
labels: BTreeMap<String, String>,
value: f64,
}
impl std::fmt::Display for Gauge {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ");
write!(f, "{} - {}", labels, self.value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
struct Histogram {
labels: BTreeMap<String, String>,
value: Vec<(f64, Option<f64>)>,
}
impl std::fmt::Display for Histogram {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{}: {}", k, v))
.collect::<Vec<_>>()
.join(", ");
let value = self
.value
.iter()
.map(|(k, v)| {
if let Some(v) = v {
format!("{}: {:.6}", k, v)
} else {
format!("{}: None,", k)
}
})
.collect::<Vec<_>>()
.join(", ");
write!(f, "{} - {}", labels, value)
}
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Snapshot {
counters: HashMap<String, Vec<Counter>>,
gauges: HashMap<String, Vec<Gauge>>,
histograms: HashMap<String, Vec<Histogram>>,
}
const PAIRS: [((&str, &str), &str); 2] = [
(
(
"background-jobs.worker.started",
"background-jobs.worker.finished",
),
"background-jobs.worker.running",
),
(
(
"background-jobs.job.started",
"background-jobs.job.finished",
),
"background-jobs.job.running",
),
];
#[derive(Default)]
struct MergeCounter {
start: Option<Counter>,
finish: Option<Counter>,
}
impl MergeCounter {
fn merge(self) -> Option<Counter> {
match (self.start, self.finish) {
(Some(start), Some(end)) => Some(Counter {
labels: start.labels,
value: start.value.saturating_sub(end.value),
}),
(Some(only), None) => Some(only),
(None, Some(only)) => Some(Counter {
labels: only.labels,
value: 0,
}),
(None, None) => None,
}
}
}
impl Snapshot {
pub(crate) fn present(self) {
if !self.counters.is_empty() {
println!("Counters");
let mut merging = HashMap::new();
for (key, counters) in self.counters {
if let Some(((start, _), name)) = PAIRS
.iter()
.find(|((start, finish), _)| *start == key || *finish == key)
{
let entry = merging.entry(name).or_insert_with(HashMap::new);
for counter in counters {
let mut merge_counter = entry
.entry(counter.labels.clone())
.or_insert_with(MergeCounter::default);
if key == *start {
merge_counter.start = Some(counter);
} else {
merge_counter.finish = Some(counter);
}
}
continue;
}
println!("\t{}", key);
for counter in counters {
println!("\t\t{}", counter);
}
}
for (key, counters) in merging {
println!("\t{}", key);
for (_, counter) in counters {
if let Some(counter) = counter.merge() {
println!("\t\t{}", counter);
}
}
}
}
if !self.gauges.is_empty() {
println!("Gauges");
for (key, gauges) in self.gauges {
println!("\t{}", key);
for gauge in gauges {
println!("\t\t{}", gauge);
}
}
}
if !self.histograms.is_empty() {
println!("Histograms");
for (key, histograms) in self.histograms {
println!("\t{}", key);
for histogram in histograms {
println!("\t\t{}", histogram);
}
}
}
}
}
fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) {
let labels = key
.labels()
.into_iter()
.map(|label| (label.key().to_string(), label.value().to_string()))
.collect();
let name = key.name().to_string();
(name, labels)
}
impl Inner {
fn snapshot_counters(&self) -> HashMap<String, Vec<Counter>> {
let mut counters = HashMap::new();
for (key, counter) in self.registry.get_counter_handles() {
let gen = counter.get_generation();
if !self.recency.should_store_counter(&key, gen, &self.registry) {
continue;
}
let (name, labels) = key_to_parts(&key);
let value = counter.get_inner().load(Ordering::Acquire);
counters.entry(name).or_insert_with(Vec::new).push(Counter {
labels: labels.into_iter().collect(),
value,
});
}
counters
}
fn snapshot_gauges(&self) -> HashMap<String, Vec<Gauge>> {
let mut gauges = HashMap::new();
for (key, gauge) in self.registry.get_gauge_handles() {
let gen = gauge.get_generation();
if !self.recency.should_store_gauge(&key, gen, &self.registry) {
continue;
}
let (name, labels) = key_to_parts(&key);
let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire));
gauges.entry(name).or_insert_with(Vec::new).push(Gauge {
labels: labels.into_iter().collect(),
value,
})
}
gauges
}
fn snapshot_histograms(&self) -> HashMap<String, Vec<Histogram>> {
for (key, histogram) in self.registry.get_histogram_handles() {
let gen = histogram.get_generation();
let (name, labels) = key_to_parts(&key);
if !self
.recency
.should_store_histogram(&key, gen, &self.registry)
{
let mut d = self.distributions.write().unwrap();
let delete_by_name = if let Some(by_name) = d.get_mut(&name) {
by_name.remove(&labels);
by_name.is_empty()
} else {
false
};
drop(d);
if delete_by_name {
self.descriptions.write().unwrap().remove(&name);
}
continue;
}
let mut d = self.distributions.write().unwrap();
let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new);
let entry = outer_entry
.entry(labels)
.or_insert_with(Summary::with_defaults);
histogram.get_inner().clear_with(|samples| {
for sample in samples {
entry.add(*sample);
}
})
}
let d = self.distributions.read().unwrap().clone();
d.into_iter()
.map(|(key, value)| {
(
key,
value
.into_iter()
.map(|(labels, summary)| Histogram {
labels: labels.into_iter().collect(),
value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0]
.into_iter()
.map(|q| (q, summary.quantile(q)))
.collect(),
})
.collect(),
)
})
.collect()
}
fn snapshot(&self) -> Snapshot {
Snapshot {
counters: self.snapshot_counters(),
gauges: self.snapshot_gauges(),
histograms: self.snapshot_histograms(),
}
}
}
impl MemoryCollector {
pub(crate) fn new() -> Self {
MemoryCollector {
inner: Arc::new(Inner {
descriptions: Default::default(),
distributions: Default::default(),
recency: Recency::new(
Clock::new(),
MetricKindMask::ALL,
Some(Duration::from_secs(5 * DAYS)),
),
registry: Registry::new(GenerationalStorage::atomic()),
}),
}
}
pub(crate) fn snapshot(&self) -> Snapshot {
self.inner.snapshot()
}
fn add_description_if_missing(
&self,
key: &metrics::KeyName,
description: metrics::SharedString,
) {
let mut d = self.inner.descriptions.write().unwrap();
d.entry(key.as_str().to_owned()).or_insert(description);
}
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
metrics::set_boxed_recorder(Box::new(self.clone()))
}
}
impl Recorder for MemoryCollector {
fn describe_counter(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn describe_gauge(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn describe_histogram(
&self,
key: metrics::KeyName,
_: Option<metrics::Unit>,
description: metrics::SharedString,
) {
self.add_description_if_missing(&key, description)
}
fn register_counter(&self, key: &Key) -> metrics::Counter {
self.inner
.registry
.get_or_create_counter(key, |c| c.clone().into())
}
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
self.inner
.registry
.get_or_create_gauge(key, |c| c.clone().into())
}
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
self.inner
.registry
.get_or_create_histogram(key, |c| c.clone().into())
}
}

View File

@ -1,22 +1,24 @@
use crate::{ use crate::{
data::{ActorCache, State},
error::Error, error::Error,
extractors::{AdminConfig, XApiToken}, extractors::{AdminConfig, XApiToken},
middleware::MyVerify,
requests::Requests,
}; };
use activitystreams::{ use activitystreams::{
iri, iri,
iri_string::{ iri_string::{
format::ToDedicatedString,
resolve::FixedBaseResolver, resolve::FixedBaseResolver,
types::{IriAbsoluteString, IriFragmentStr, IriRelativeStr, IriString}, types::{IriAbsoluteString, IriFragmentStr, IriRelativeStr, IriString},
}, },
}; };
use config::Environment; use config::Environment;
use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature}; use http_signature_normalization_actix::prelude::VerifyDigest;
use rustls::{Certificate, PrivateKey}; use rustls::{Certificate, PrivateKey};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::{io::BufReader, net::IpAddr, path::PathBuf}; use std::{
io::BufReader,
net::{IpAddr, SocketAddr},
path::PathBuf,
};
use uuid::Uuid; use uuid::Uuid;
#[derive(Clone, Debug, serde::Deserialize)] #[derive(Clone, Debug, serde::Deserialize)]
@ -31,6 +33,7 @@ pub(crate) struct ParsedConfig {
publish_blocks: bool, publish_blocks: bool,
sled_path: PathBuf, sled_path: PathBuf,
source_repo: IriString, source_repo: IriString,
repository_commit_base: String,
opentelemetry_url: Option<IriString>, opentelemetry_url: Option<IriString>,
telegram_token: Option<String>, telegram_token: Option<String>,
telegram_admin_handle: Option<String>, telegram_admin_handle: Option<String>,
@ -40,6 +43,8 @@ pub(crate) struct ParsedConfig {
footer_blurb: Option<String>, footer_blurb: Option<String>,
local_domains: Option<String>, local_domains: Option<String>,
local_blurb: Option<String>, local_blurb: Option<String>,
prometheus_addr: Option<IpAddr>,
prometheus_port: Option<u16>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -62,6 +67,7 @@ pub struct Config {
footer_blurb: Option<String>, footer_blurb: Option<String>,
local_domains: Vec<String>, local_domains: Vec<String>,
local_blurb: Option<String>, local_blurb: Option<String>,
prometheus_config: Option<PrometheusConfig>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -70,6 +76,12 @@ struct TlsConfig {
cert: PathBuf, cert: PathBuf,
} }
#[derive(Clone, Debug)]
struct PrometheusConfig {
addr: IpAddr,
port: u16,
}
#[derive(Debug)] #[derive(Debug)]
pub enum UrlKind { pub enum UrlKind {
Activity, Activity,
@ -94,6 +106,7 @@ pub enum AdminUrlKind {
Blocked, Blocked,
Connected, Connected,
Stats, Stats,
LastSeen,
} }
impl std::fmt::Debug for Config { impl std::fmt::Debug for Config {
@ -121,6 +134,7 @@ impl std::fmt::Debug for Config {
.field("footer_blurb", &self.footer_blurb) .field("footer_blurb", &self.footer_blurb)
.field("local_domains", &self.local_domains) .field("local_domains", &self.local_domains)
.field("local_blurb", &self.local_blurb) .field("local_blurb", &self.local_blurb)
.field("prometheus_config", &self.prometheus_config)
.finish() .finish()
} }
} }
@ -138,6 +152,7 @@ impl Config {
.set_default("publish_blocks", false)? .set_default("publish_blocks", false)?
.set_default("sled_path", "./sled/db-0-34")? .set_default("sled_path", "./sled/db-0-34")?
.set_default("source_repo", "https://git.asonix.dog/asonix/relay")? .set_default("source_repo", "https://git.asonix.dog/asonix/relay")?
.set_default("repository_commit_base", "/src/commit/")?
.set_default("opentelemetry_url", None as Option<&str>)? .set_default("opentelemetry_url", None as Option<&str>)?
.set_default("telegram_token", None as Option<&str>)? .set_default("telegram_token", None as Option<&str>)?
.set_default("telegram_admin_handle", None as Option<&str>)? .set_default("telegram_admin_handle", None as Option<&str>)?
@ -147,6 +162,8 @@ impl Config {
.set_default("footer_blurb", None as Option<&str>)? .set_default("footer_blurb", None as Option<&str>)?
.set_default("local_domains", None as Option<&str>)? .set_default("local_domains", None as Option<&str>)?
.set_default("local_blurb", None as Option<&str>)? .set_default("local_blurb", None as Option<&str>)?
.set_default("prometheus_addr", None as Option<&str>)?
.set_default("prometheus_port", None as Option<u16>)?
.add_source(Environment::default()) .add_source(Environment::default())
.build()?; .build()?;
@ -175,6 +192,29 @@ impl Config {
.map(|d| d.to_string()) .map(|d| d.to_string())
.collect(); .collect();
let prometheus_config = match (config.prometheus_addr, config.prometheus_port) {
(Some(addr), Some(port)) => Some(PrometheusConfig { addr, port }),
(Some(_), None) => {
tracing::warn!("PROMETHEUS_ADDR is set but PROMETHEUS_PORT is not set, not building Prometheus config");
None
}
(None, Some(_)) => {
tracing::warn!("PROMETHEUS_PORT is set but PROMETHEUS_ADDR is not set, not building Prometheus config");
None
}
(None, None) => None,
};
let source_url = match Self::git_hash() {
Some(hash) => format!(
"{}{}{}",
config.source_repo, config.repository_commit_base, hash
)
.parse()
.expect("constructed source URL is valid"),
None => config.source_repo.clone(),
};
Ok(Config { Ok(Config {
hostname: config.hostname, hostname: config.hostname,
addr: config.addr, addr: config.addr,
@ -185,7 +225,7 @@ impl Config {
publish_blocks: config.publish_blocks, publish_blocks: config.publish_blocks,
base_uri, base_uri,
sled_path: config.sled_path, sled_path: config.sled_path,
source_repo: config.source_repo, source_repo: source_url,
opentelemetry_url: config.opentelemetry_url, opentelemetry_url: config.opentelemetry_url,
telegram_token: config.telegram_token, telegram_token: config.telegram_token,
telegram_admin_handle: config.telegram_admin_handle, telegram_admin_handle: config.telegram_admin_handle,
@ -194,9 +234,16 @@ impl Config {
footer_blurb: config.footer_blurb, footer_blurb: config.footer_blurb,
local_domains, local_domains,
local_blurb: config.local_blurb, local_blurb: config.local_blurb,
prometheus_config,
}) })
} }
pub(crate) fn prometheus_bind_address(&self) -> Option<SocketAddr> {
let config = self.prometheus_config.as_ref()?;
Some((config.addr, config.port).into())
}
pub(crate) fn open_keys(&self) -> Result<Option<(Vec<Certificate>, PrivateKey)>, Error> { pub(crate) fn open_keys(&self) -> Result<Option<(Vec<Certificate>, PrivateKey)>, Error> {
let tls = if let Some(tls) = &self.tls { let tls = if let Some(tls) = &self.tls {
tls tls
@ -276,19 +323,6 @@ impl Config {
} }
} }
pub(crate) fn signature_middleware(
&self,
requests: Requests,
actors: ActorCache,
state: State,
) -> VerifySignature<MyVerify> {
if self.validate_signatures {
VerifySignature::new(MyVerify(requests, actors, state), Default::default())
} else {
VerifySignature::new(MyVerify(requests, actors, state), Default::default()).optional()
}
}
pub(crate) fn x_api_token(&self) -> Option<XApiToken> { pub(crate) fn x_api_token(&self) -> Option<XApiToken> {
self.api_token.clone().map(XApiToken::new) self.api_token.clone().map(XApiToken::new)
} }
@ -345,7 +379,7 @@ impl Config {
fn git_version() -> Option<String> { fn git_version() -> Option<String> {
let branch = Self::git_branch()?; let branch = Self::git_branch()?;
let hash = Self::git_hash()?; let hash = Self::git_short_hash()?;
Some(format!("{}-{}", branch, hash)) Some(format!("{}-{}", branch, hash))
} }
@ -366,6 +400,10 @@ impl Config {
option_env!("GIT_HASH") option_env!("GIT_HASH")
} }
fn git_short_hash() -> Option<&'static str> {
option_env!("GIT_SHORT_HASH")
}
pub(crate) fn user_agent(&self) -> String { pub(crate) fn user_agent(&self) -> String {
format!( format!(
"{} ({}/{}; +{})", "{} ({}/{}; +{})",
@ -395,37 +433,44 @@ impl Config {
self.do_generate_url(kind).expect("Generated valid IRI") self.do_generate_url(kind).expect("Generated valid IRI")
} }
#[tracing::instrument(level = "debug", skip_all, fields(base_uri = tracing::field::debug(&self.base_uri), kind = tracing::field::debug(&kind)))]
fn do_generate_url(&self, kind: UrlKind) -> Result<IriString, Error> { fn do_generate_url(&self, kind: UrlKind) -> Result<IriString, Error> {
let iri = match kind { let iri = match kind {
UrlKind::Activity => FixedBaseResolver::new(self.base_uri.as_ref()).try_resolve( UrlKind::Activity => FixedBaseResolver::new(self.base_uri.as_ref())
IriRelativeStr::new(&format!("activity/{}", Uuid::new_v4()))?.as_ref(), .resolve(IriRelativeStr::new(&format!("activity/{}", Uuid::new_v4()))?.as_ref())
)?, .try_to_dedicated_string()?,
UrlKind::Actor => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::Actor => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("actor")?.as_ref())?, .resolve(IriRelativeStr::new("actor")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Followers => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::Followers => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("followers")?.as_ref())?, .resolve(IriRelativeStr::new("followers")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Following => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::Following => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("following")?.as_ref())?, .resolve(IriRelativeStr::new("following")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Inbox => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::Inbox => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("inbox")?.as_ref())?, .resolve(IriRelativeStr::new("inbox")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Index => self.base_uri.clone().into(), UrlKind::Index => self.base_uri.clone().into(),
UrlKind::MainKey => { UrlKind::MainKey => {
let actor = IriRelativeStr::new("actor")?; let actor = IriRelativeStr::new("actor")?;
let fragment = IriFragmentStr::new("main-key")?; let fragment = IriFragmentStr::new("main-key")?;
let mut resolved = let mut resolved = FixedBaseResolver::new(self.base_uri.as_ref())
FixedBaseResolver::new(self.base_uri.as_ref()).try_resolve(actor.as_ref())?; .resolve(actor.as_ref())
.try_to_dedicated_string()?;
resolved.set_fragment(Some(fragment)); resolved.set_fragment(Some(fragment));
resolved resolved
} }
UrlKind::Media(uuid) => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::Media(uuid) => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new(&format!("media/{}", uuid))?.as_ref())?, .resolve(IriRelativeStr::new(&format!("media/{}", uuid))?.as_ref())
.try_to_dedicated_string()?,
UrlKind::NodeInfo => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::NodeInfo => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("nodeinfo/2.0.json")?.as_ref())?, .resolve(IriRelativeStr::new("nodeinfo/2.0.json")?.as_ref())
.try_to_dedicated_string()?,
UrlKind::Outbox => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::Outbox => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("outbox")?.as_ref())?, .resolve(IriRelativeStr::new("outbox")?.as_ref())
.try_to_dedicated_string()?,
}; };
Ok(iri) Ok(iri)
@ -437,25 +482,22 @@ impl Config {
} }
fn do_generate_admin_url(&self, kind: AdminUrlKind) -> Result<IriString, Error> { fn do_generate_admin_url(&self, kind: AdminUrlKind) -> Result<IriString, Error> {
let iri = match kind { let path = match kind {
AdminUrlKind::Allow => FixedBaseResolver::new(self.base_uri.as_ref()) AdminUrlKind::Allow => "api/v1/admin/allow",
.try_resolve(IriRelativeStr::new("api/v1/admin/allow")?.as_ref())?, AdminUrlKind::Disallow => "api/v1/admin/disallow",
AdminUrlKind::Disallow => FixedBaseResolver::new(self.base_uri.as_ref()) AdminUrlKind::Block => "api/v1/admin/block",
.try_resolve(IriRelativeStr::new("api/v1/admin/disallow")?.as_ref())?, AdminUrlKind::Unblock => "api/v1/admin/unblock",
AdminUrlKind::Block => FixedBaseResolver::new(self.base_uri.as_ref()) AdminUrlKind::Allowed => "api/v1/admin/allowed",
.try_resolve(IriRelativeStr::new("api/v1/admin/block")?.as_ref())?, AdminUrlKind::Blocked => "api/v1/admin/blocked",
AdminUrlKind::Unblock => FixedBaseResolver::new(self.base_uri.as_ref()) AdminUrlKind::Connected => "api/v1/admin/connected",
.try_resolve(IriRelativeStr::new("api/v1/admin/unblock")?.as_ref())?, AdminUrlKind::Stats => "api/v1/admin/stats",
AdminUrlKind::Allowed => FixedBaseResolver::new(self.base_uri.as_ref()) AdminUrlKind::LastSeen => "api/v1/admin/last_seen",
.try_resolve(IriRelativeStr::new("api/v1/admin/allowed")?.as_ref())?,
AdminUrlKind::Blocked => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/blocked")?.as_ref())?,
AdminUrlKind::Connected => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/connected")?.as_ref())?,
AdminUrlKind::Stats => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/stats")?.as_ref())?,
}; };
let iri = FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new(path)?.as_ref())
.try_to_dedicated_string()?;
Ok(iri) Ok(iri)
} }
} }

View File

@ -1,9 +1,11 @@
mod actor; mod actor;
mod last_online;
mod media; mod media;
mod node; mod node;
mod state; mod state;
pub(crate) use actor::ActorCache; pub(crate) use actor::ActorCache;
pub(crate) use last_online::LastOnline;
pub(crate) use media::MediaCache; pub(crate) use media::MediaCache;
pub(crate) use node::{Node, NodeCache}; pub(crate) use node::{Node, NodeCache};
pub(crate) use state::State; pub(crate) use state::State;

View File

@ -37,7 +37,7 @@ impl ActorCache {
ActorCache { db } ActorCache { db }
} }
#[tracing::instrument(level = "debug" name = "Get Actor", skip_all, fields(id = id.to_string().as_str(), requests))] #[tracing::instrument(level = "debug" name = "Get Actor", skip_all, fields(id = id.to_string().as_str()))]
pub(crate) async fn get( pub(crate) async fn get(
&self, &self,
id: &IriString, id: &IriString,
@ -56,12 +56,8 @@ impl ActorCache {
#[tracing::instrument(level = "debug", name = "Add Connection", skip(self))] #[tracing::instrument(level = "debug", name = "Add Connection", skip(self))]
pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> { pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> {
let add_connection = self.db.add_connection(actor.id.clone()); self.db.add_connection(actor.id.clone()).await?;
let save_actor = self.db.save_actor(actor); self.db.save_actor(actor).await
tokio::try_join!(add_connection, save_actor)?;
Ok(())
} }
#[tracing::instrument(level = "debug", name = "Remove Connection", skip(self))] #[tracing::instrument(level = "debug", name = "Remove Connection", skip(self))]
@ -69,7 +65,7 @@ impl ActorCache {
self.db.remove_connection(actor.id.clone()).await self.db.remove_connection(actor.id.clone()).await
} }
#[tracing::instrument(level = "debug", name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str(), requests))] #[tracing::instrument(level = "debug", name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str()))]
pub(crate) async fn get_no_cache( pub(crate) async fn get_no_cache(
&self, &self,
id: &IriString, id: &IriString,

28
src/data/last_online.rs Normal file
View File

@ -0,0 +1,28 @@
use activitystreams::iri_string::types::IriStr;
use std::{collections::HashMap, sync::Mutex};
use time::OffsetDateTime;
pub(crate) struct LastOnline {
domains: Mutex<HashMap<String, OffsetDateTime>>,
}
impl LastOnline {
pub(crate) fn mark_seen(&self, iri: &IriStr) {
if let Some(authority) = iri.authority_str() {
self.domains
.lock()
.unwrap()
.insert(authority.to_string(), OffsetDateTime::now_utc());
}
}
pub(crate) fn take(&self) -> HashMap<String, OffsetDateTime> {
std::mem::take(&mut *self.domains.lock().unwrap())
}
pub(crate) fn empty() -> Self {
Self {
domains: Mutex::new(HashMap::default()),
}
}
}

View File

@ -36,11 +36,9 @@ impl NodeCache {
#[tracing::instrument(level = "debug", name = "Get nodes", skip(self))] #[tracing::instrument(level = "debug", name = "Get nodes", skip(self))]
pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> { pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> {
let infos = self.db.connected_info(); let infos = self.db.connected_info().await?;
let instances = self.db.connected_instance(); let instances = self.db.connected_instance().await?;
let contacts = self.db.connected_contact(); let contacts = self.db.connected_contact().await?;
let (infos, instances, contacts) = tokio::try_join!(infos, instances, contacts)?;
let vec = self let vec = self
.db .db

View File

@ -10,8 +10,9 @@ use actix_web::web;
use lru::LruCache; use lru::LruCache;
use rand::thread_rng; use rand::thread_rng;
use rsa::{RsaPrivateKey, RsaPublicKey}; use rsa::{RsaPrivateKey, RsaPublicKey};
use std::sync::Arc; use std::sync::{Arc, RwLock};
use tokio::sync::RwLock;
use super::LastOnline;
#[derive(Clone)] #[derive(Clone)]
pub struct State { pub struct State {
@ -20,6 +21,7 @@ pub struct State {
object_cache: Arc<RwLock<LruCache<IriString, IriString>>>, object_cache: Arc<RwLock<LruCache<IriString, IriString>>>,
node_cache: NodeCache, node_cache: NodeCache,
breakers: Breakers, breakers: Breakers,
pub(crate) last_online: Arc<LastOnline>,
pub(crate) db: Db, pub(crate) db: Db,
} }
@ -44,6 +46,7 @@ impl State {
self.private_key.clone(), self.private_key.clone(),
config.user_agent(), config.user_agent(),
self.breakers.clone(), self.breakers.clone(),
self.last_online.clone(),
) )
} }
@ -78,12 +81,12 @@ impl State {
.collect()) .collect())
} }
pub(crate) async fn is_cached(&self, object_id: &IriString) -> bool { pub(crate) fn is_cached(&self, object_id: &IriString) -> bool {
self.object_cache.read().await.contains(object_id) self.object_cache.read().unwrap().contains(object_id)
} }
pub(crate) async fn cache(&self, object_id: IriString, actor_id: IriString) { pub(crate) fn cache(&self, object_id: IriString, actor_id: IriString) {
self.object_cache.write().await.put(object_id, actor_id); self.object_cache.write().unwrap().put(object_id, actor_id);
} }
#[tracing::instrument(level = "debug", name = "Building state", skip_all)] #[tracing::instrument(level = "debug", name = "Building state", skip_all)]
@ -115,6 +118,7 @@ impl State {
node_cache: NodeCache::new(db.clone()), node_cache: NodeCache::new(db.clone()),
breakers: Breakers::default(), breakers: Breakers::default(),
db, db,
last_online: Arc::new(LastOnline::empty()),
}; };
Ok(state) Ok(state)

View File

@ -7,8 +7,13 @@ use rsa::{
pkcs8::{DecodePrivateKey, EncodePrivateKey}, pkcs8::{DecodePrivateKey, EncodePrivateKey},
RsaPrivateKey, RsaPrivateKey,
}; };
use sled::Tree; use sled::{Batch, Tree};
use std::{collections::HashMap, sync::Arc, time::SystemTime}; use std::{
collections::{BTreeMap, HashMap},
sync::Arc,
time::SystemTime,
};
use time::OffsetDateTime;
use uuid::Uuid; use uuid::Uuid;
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -28,6 +33,7 @@ struct Inner {
actor_id_info: Tree, actor_id_info: Tree,
actor_id_instance: Tree, actor_id_instance: Tree,
actor_id_contact: Tree, actor_id_contact: Tree,
last_seen: Tree,
restricted_mode: bool, restricted_mode: bool,
} }
@ -247,6 +253,7 @@ impl Db {
actor_id_info: db.open_tree("actor-id-info")?, actor_id_info: db.open_tree("actor-id-info")?,
actor_id_instance: db.open_tree("actor-id-instance")?, actor_id_instance: db.open_tree("actor-id-instance")?,
actor_id_contact: db.open_tree("actor-id-contact")?, actor_id_contact: db.open_tree("actor-id-contact")?,
last_seen: db.open_tree("last-seen")?,
restricted_mode, restricted_mode,
}), }),
}) })
@ -254,7 +261,7 @@ impl Db {
async fn unblock<T>( async fn unblock<T>(
&self, &self,
f: impl Fn(&Inner) -> Result<T, Error> + Send + 'static, f: impl FnOnce(&Inner) -> Result<T, Error> + Send + 'static,
) -> Result<T, Error> ) -> Result<T, Error>
where where
T: Send + 'static, T: Send + 'static,
@ -266,6 +273,48 @@ impl Db {
Ok(t) Ok(t)
} }
pub(crate) async fn mark_last_seen(
&self,
nodes: HashMap<String, OffsetDateTime>,
) -> Result<(), Error> {
let mut batch = Batch::default();
for (domain, datetime) in nodes {
let datetime_string = serde_json::to_vec(&datetime)?;
batch.insert(domain.as_bytes(), datetime_string);
}
self.unblock(move |inner| inner.last_seen.apply_batch(batch).map_err(Error::from))
.await
}
pub(crate) async fn last_seen(
&self,
) -> Result<BTreeMap<String, Option<OffsetDateTime>>, Error> {
self.unblock(|inner| {
let mut map = BTreeMap::new();
for iri in inner.connected() {
let Some(authority_str) = iri.authority_str() else {
continue;
};
if let Some(datetime) = inner.last_seen.get(authority_str)? {
map.insert(
authority_str.to_string(),
Some(serde_json::from_slice(&datetime)?),
);
} else {
map.insert(authority_str.to_string(), None);
}
}
Ok(map)
})
.await
}
pub(crate) async fn connected_ids(&self) -> Result<Vec<IriString>, Error> { pub(crate) async fn connected_ids(&self) -> Result<Vec<IriString>, Error> {
self.unblock(|inner| Ok(inner.connected().collect())).await self.unblock(|inner| Ok(inner.connected().collect())).await
} }

View File

@ -10,7 +10,7 @@ use std::{convert::Infallible, fmt::Debug, io};
use tracing_error::SpanTrace; use tracing_error::SpanTrace;
pub(crate) struct Error { pub(crate) struct Error {
context: SpanTrace, context: String,
kind: ErrorKind, kind: ErrorKind,
} }
@ -26,6 +26,10 @@ impl Error {
pub(crate) fn is_bad_request(&self) -> bool { pub(crate) fn is_bad_request(&self) -> bool {
matches!(self.kind, ErrorKind::Status(_, StatusCode::BAD_REQUEST)) matches!(self.kind, ErrorKind::Status(_, StatusCode::BAD_REQUEST))
} }
pub(crate) fn is_gone(&self) -> bool {
matches!(self.kind, ErrorKind::Status(_, StatusCode::GONE))
}
} }
impl std::fmt::Debug for Error { impl std::fmt::Debug for Error {
@ -53,7 +57,7 @@ where
{ {
fn from(error: T) -> Self { fn from(error: T) -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: error.into(), kind: error.into(),
} }
} }
@ -77,10 +81,7 @@ pub(crate) enum ErrorKind {
ParseIri(#[from] activitystreams::iri_string::validate::Error), ParseIri(#[from] activitystreams::iri_string::validate::Error),
#[error("Couldn't normalize IRI, {0}")] #[error("Couldn't normalize IRI, {0}")]
NormalizeIri( NormalizeIri(#[from] std::collections::TryReserveError),
#[from]
activitystreams::iri_string::task::Error<activitystreams::iri_string::normalize::Error>,
),
#[error("Couldn't perform IO, {0}")] #[error("Couldn't perform IO, {0}")]
Io(#[from] io::Error), Io(#[from] io::Error),
@ -125,7 +126,7 @@ pub(crate) enum ErrorKind {
BadActor(String, String), BadActor(String, String),
#[error("Signature verification is required, but no signature was given")] #[error("Signature verification is required, but no signature was given")]
NoSignature(String), NoSignature(Option<String>),
#[error("Wrong ActivityPub kind, {0}")] #[error("Wrong ActivityPub kind, {0}")]
Kind(String), Kind(String),
@ -196,7 +197,8 @@ impl ResponseError for Error {
ErrorKind::Kind(_) ErrorKind::Kind(_)
| ErrorKind::MissingKind | ErrorKind::MissingKind
| ErrorKind::MissingId | ErrorKind::MissingId
| ErrorKind::ObjectCount => StatusCode::BAD_REQUEST, | ErrorKind::ObjectCount
| ErrorKind::NoSignature(_) => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::INTERNAL_SERVER_ERROR,
} }
} }

View File

@ -80,7 +80,7 @@ impl Admin {
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
#[error("Failed authentication")] #[error("Failed authentication")]
pub(crate) struct Error { pub(crate) struct Error {
context: SpanTrace, context: String,
#[source] #[source]
kind: ErrorKind, kind: ErrorKind,
} }
@ -88,49 +88,49 @@ pub(crate) struct Error {
impl Error { impl Error {
fn invalid() -> Self { fn invalid() -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::Invalid, kind: ErrorKind::Invalid,
} }
} }
fn missing_config() -> Self { fn missing_config() -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::MissingConfig, kind: ErrorKind::MissingConfig,
} }
} }
fn missing_db() -> Self { fn missing_db() -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::MissingDb, kind: ErrorKind::MissingDb,
} }
} }
fn bcrypt_verify(e: BcryptError) -> Self { fn bcrypt_verify(e: BcryptError) -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::BCryptVerify(e), kind: ErrorKind::BCryptVerify(e),
} }
} }
fn bcrypt_hash(e: BcryptError) -> Self { fn bcrypt_hash(e: BcryptError) -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::BCryptHash(e), kind: ErrorKind::BCryptHash(e),
} }
} }
fn parse_header(e: ParseError) -> Self { fn parse_header(e: ParseError) -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::ParseHeader(e), kind: ErrorKind::ParseHeader(e),
} }
} }
fn canceled(_: BlockingError) -> Self { fn canceled(_: BlockingError) -> Self {
Error { Error {
context: SpanTrace::capture(), context: SpanTrace::capture().to_string(),
kind: ErrorKind::Canceled, kind: ErrorKind::Canceled,
} }
} }

View File

@ -5,6 +5,7 @@ mod deliver_many;
mod instance; mod instance;
mod nodeinfo; mod nodeinfo;
mod process_listeners; mod process_listeners;
mod record_last_online;
pub(crate) use self::{ pub(crate) use self::{
contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance,
@ -15,7 +16,7 @@ use crate::{
config::Config, config::Config,
data::{ActorCache, MediaCache, NodeCache, State}, data::{ActorCache, MediaCache, NodeCache, State},
error::{Error, ErrorKind}, error::{Error, ErrorKind},
jobs::process_listeners::Listeners, jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline},
requests::Requests, requests::Requests,
}; };
use background_jobs::{ use background_jobs::{
@ -62,6 +63,7 @@ pub(crate) fn create_workers(
.register::<QueryInstance>() .register::<QueryInstance>()
.register::<Listeners>() .register::<Listeners>()
.register::<QueryContact>() .register::<QueryContact>()
.register::<RecordLastOnline>()
.register::<apub::Announce>() .register::<apub::Announce>()
.register::<apub::Follow>() .register::<apub::Follow>()
.register::<apub::Forward>() .register::<apub::Forward>()
@ -73,6 +75,7 @@ pub(crate) fn create_workers(
.start_with_threads(parallelism); .start_with_threads(parallelism);
shared.every(Duration::from_secs(60 * 5), Listeners); shared.every(Duration::from_secs(60 * 5), Listeners);
shared.every(Duration::from_secs(60 * 10), RecordLastOnline);
let job_server = JobServer::new(shared.queue_handle().clone()); let job_server = JobServer::new(shared.queue_handle().clone());

View File

@ -36,13 +36,13 @@ async fn get_inboxes(
state.inboxes_without(&actor.inbox, &authority).await state.inboxes_without(&actor.inbox, &authority).await
} }
fn prepare_activity<T, U, V, Kind>( fn prepare_activity<T, U, V>(
mut t: T, mut t: T,
id: impl TryInto<IriString, Error = U>, id: impl TryInto<IriString, Error = U>,
to: impl TryInto<IriString, Error = V>, to: impl TryInto<IriString, Error = V>,
) -> Result<T, Error> ) -> Result<T, Error>
where where
T: ObjectExt<Kind> + BaseExt<Kind>, T: ObjectExt + BaseExt,
Error: From<U> + From<V>, Error: From<U> + From<V>,
{ {
t.set_id(id.try_into()?) t.set_id(id.try_into()?)

View File

@ -42,7 +42,7 @@ impl Announce {
.queue(DeliverMany::new(inboxes, announce)?) .queue(DeliverMany::new(inboxes, announce)?)
.await?; .await?;
state.state.cache(self.object_id, activity_id).await; state.state.cache(self.object_id, activity_id);
Ok(()) Ok(())
} }
} }

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,28 @@
use crate::{error::Error, jobs::JobState};
use background_jobs::{ActixJob, Backoff};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct RecordLastOnline;
impl RecordLastOnline {
#[tracing::instrument(skip(state))]
async fn perform(self, state: JobState) -> Result<(), Error> {
let nodes = state.state.last_online.take();
state.state.db.mark_last_seen(nodes).await
}
}
impl ActixJob for RecordLastOnline {
type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::RecordLastOnline";
const QUEUE: &'static str = "maintenance";
const BACKOFF: Backoff = Backoff::Linear(1);
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -2,10 +2,13 @@
#![allow(clippy::needless_borrow)] #![allow(clippy::needless_borrow)]
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_rt::task::JoinHandle;
use actix_web::{middleware::Compress, web, App, HttpServer}; use actix_web::{middleware::Compress, web, App, HttpServer};
use collector::MemoryCollector; use collector::{DoubleRecorder, MemoryCollector};
#[cfg(feature = "console")] #[cfg(feature = "console")]
use console_subscriber::ConsoleLayer; use console_subscriber::ConsoleLayer;
use http_signature_normalization_actix::middleware::VerifySignature;
use metrics_exporter_prometheus::PrometheusBuilder;
use opentelemetry::{sdk::Resource, KeyValue}; use opentelemetry::{sdk::Resource, KeyValue};
use opentelemetry_otlp::WithExportConfig; use opentelemetry_otlp::WithExportConfig;
use rustls::ServerConfig; use rustls::ServerConfig;
@ -35,7 +38,7 @@ use self::{
data::{ActorCache, MediaCache, State}, data::{ActorCache, MediaCache, State},
db::Db, db::Db,
jobs::create_workers, jobs::create_workers,
middleware::{DebugPayload, RelayResolver, Timings}, middleware::{DebugPayload, MyVerify, RelayResolver, Timings},
routes::{actor, inbox, index, nodeinfo, nodeinfo_meta, statics}, routes::{actor, inbox, index, nodeinfo, nodeinfo_meta, statics},
}; };
@ -94,19 +97,31 @@ fn init_subscriber(
Ok(()) Ok(())
} }
fn main() -> Result<(), anyhow::Error> { #[actix_rt::main]
async fn main() -> Result<(), anyhow::Error> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
let config = Config::build()?; let config = Config::build()?;
init_subscriber(Config::software_name(), config.opentelemetry_url())?; init_subscriber(Config::software_name(), config.opentelemetry_url())?;
let collector = MemoryCollector::new();
collector.install()?;
let args = Args::new(); let args = Args::new();
if args.any() { if args.any() {
return client_main(config, args); return client_main(config, args).await?;
}
let collector = MemoryCollector::new();
if let Some(bind_addr) = config.prometheus_bind_address() {
let (recorder, exporter) = PrometheusBuilder::new()
.with_http_listener(bind_addr)
.build()?;
actix_rt::spawn(exporter);
DoubleRecorder::new(recorder, collector.clone()).install()?;
} else {
collector.install()?;
} }
tracing::warn!("Opening DB"); tracing::warn!("Opening DB");
@ -116,16 +131,15 @@ fn main() -> Result<(), anyhow::Error> {
let actors = ActorCache::new(db.clone()); let actors = ActorCache::new(db.clone());
let media = MediaCache::new(db.clone()); let media = MediaCache::new(db.clone());
server_main(db, actors, media, collector, config)?; server_main(db, actors, media, collector, config).await??;
tracing::warn!("Application exit"); tracing::warn!("Application exit");
Ok(()) Ok(())
} }
#[actix_rt::main] fn client_main(config: Config, args: Args) -> JoinHandle<Result<(), anyhow::Error>> {
async fn client_main(config: Config, args: Args) -> Result<(), anyhow::Error> { actix_rt::spawn(do_client_main(config, args))
actix_rt::spawn(do_client_main(config, args)).await?
} }
async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> { async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
@ -142,6 +156,39 @@ async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error>
println!("Updated lists"); println!("Updated lists");
} }
if args.contacted() {
let last_seen = admin::client::last_seen(&client, &config).await?;
let mut report = String::from("Contacted:");
if !last_seen.never.is_empty() {
report += "\nNever seen:\n";
}
for domain in last_seen.never {
report += "\t";
report += &domain;
report += "\n";
}
if !last_seen.last_seen.is_empty() {
report += "\nSeen:\n";
}
for (datetime, domains) in last_seen.last_seen {
for domain in domains {
report += "\t";
report += &datetime.to_string();
report += " - ";
report += &domain;
report += "\n";
}
}
report += "\n";
println!("{report}");
}
if args.list() { if args.list() {
let (blocked, allowed, connected) = tokio::try_join!( let (blocked, allowed, connected) = tokio::try_join!(
admin::client::blocked(&client, &config), admin::client::blocked(&client, &config),
@ -174,15 +221,14 @@ async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error>
Ok(()) Ok(())
} }
#[actix_rt::main] fn server_main(
async fn server_main(
db: Db, db: Db,
actors: ActorCache, actors: ActorCache,
media: MediaCache, media: MediaCache,
collector: MemoryCollector, collector: MemoryCollector,
config: Config, config: Config,
) -> Result<(), anyhow::Error> { ) -> JoinHandle<Result<(), anyhow::Error>> {
actix_rt::spawn(do_server_main(db, actors, media, collector, config)).await? actix_rt::spawn(do_server_main(db, actors, media, collector, config))
} }
async fn do_server_main( async fn do_server_main(
@ -232,10 +278,9 @@ async fn do_server_main(
.service( .service(
web::resource("/inbox") web::resource("/inbox")
.wrap(config.digest_middleware()) .wrap(config.digest_middleware())
.wrap(config.signature_middleware( .wrap(VerifySignature::new(
state.requests(&config), MyVerify(state.requests(&config), actors.clone(), state.clone()),
actors.clone(), Default::default(),
state.clone(),
)) ))
.wrap(DebugPayload(config.debug())) .wrap(DebugPayload(config.debug()))
.route(web::post().to(inbox)), .route(web::post().to(inbox)),
@ -258,7 +303,8 @@ async fn do_server_main(
.route("/allowed", web::get().to(admin::routes::allowed)) .route("/allowed", web::get().to(admin::routes::allowed))
.route("/blocked", web::get().to(admin::routes::blocked)) .route("/blocked", web::get().to(admin::routes::blocked))
.route("/connected", web::get().to(admin::routes::connected)) .route("/connected", web::get().to(admin::routes::connected))
.route("/stats", web::get().to(admin::routes::stats)), .route("/stats", web::get().to(admin::routes::stats))
.route("/last_seen", web::get().to(admin::routes::last_seen)),
), ),
) )
}); });

View File

@ -65,11 +65,21 @@ impl MyVerify {
actor_id actor_id
} else { } else {
self.0 match self
.0
.fetch::<PublicKeyResponse>(public_key_id.as_str()) .fetch::<PublicKeyResponse>(public_key_id.as_str())
.await? .await
.actor_id() {
.ok_or(ErrorKind::MissingId)? Ok(res) => res.actor_id().ok_or(ErrorKind::MissingId),
Err(e) => {
if e.is_gone() {
tracing::warn!("Actor gone: {}", public_key_id);
return Ok(false);
} else {
return Err(e);
}
}
}?
}; };
// Previously we verified the sig from an actor's local cache // Previously we verified the sig from an actor's local cache
@ -159,10 +169,10 @@ mod tests {
use crate::apub::AcceptedActors; use crate::apub::AcceptedActors;
use rsa::{pkcs8::DecodePublicKey, RsaPublicKey}; use rsa::{pkcs8::DecodePublicKey, RsaPublicKey};
const ASONIX_DOG_ACTOR: &'static str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://w3id.org/security/v1",{"manuallyApprovesFollowers":"as:manuallyApprovesFollowers","toot":"http://joinmastodon.org/ns#","featured":{"@id":"toot:featured","@type":"@id"},"featuredTags":{"@id":"toot:featuredTags","@type":"@id"},"alsoKnownAs":{"@id":"as:alsoKnownAs","@type":"@id"},"movedTo":{"@id":"as:movedTo","@type":"@id"},"schema":"http://schema.org#","PropertyValue":"schema:PropertyValue","value":"schema:value","discoverable":"toot:discoverable","Device":"toot:Device","Ed25519Signature":"toot:Ed25519Signature","Ed25519Key":"toot:Ed25519Key","Curve25519Key":"toot:Curve25519Key","EncryptedMessage":"toot:EncryptedMessage","publicKeyBase64":"toot:publicKeyBase64","deviceId":"toot:deviceId","claim":{"@type":"@id","@id":"toot:claim"},"fingerprintKey":{"@type":"@id","@id":"toot:fingerprintKey"},"identityKey":{"@type":"@id","@id":"toot:identityKey"},"devices":{"@type":"@id","@id":"toot:devices"},"messageFranking":"toot:messageFranking","messageType":"toot:messageType","cipherText":"toot:cipherText","suspended":"toot:suspended"}],"id":"https://masto.asonix.dog/actor","type":"Application","inbox":"https://masto.asonix.dog/actor/inbox","outbox":"https://masto.asonix.dog/actor/outbox","preferredUsername":"masto.asonix.dog","url":"https://masto.asonix.dog/about/more?instance_actor=true","manuallyApprovesFollowers":true,"publicKey":{"id":"https://masto.asonix.dog/actor#main-key","owner":"https://masto.asonix.dog/actor","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n"},"endpoints":{"sharedInbox":"https://masto.asonix.dog/inbox"}}"#; const ASONIX_DOG_ACTOR: &str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://w3id.org/security/v1",{"manuallyApprovesFollowers":"as:manuallyApprovesFollowers","toot":"http://joinmastodon.org/ns#","featured":{"@id":"toot:featured","@type":"@id"},"featuredTags":{"@id":"toot:featuredTags","@type":"@id"},"alsoKnownAs":{"@id":"as:alsoKnownAs","@type":"@id"},"movedTo":{"@id":"as:movedTo","@type":"@id"},"schema":"http://schema.org#","PropertyValue":"schema:PropertyValue","value":"schema:value","discoverable":"toot:discoverable","Device":"toot:Device","Ed25519Signature":"toot:Ed25519Signature","Ed25519Key":"toot:Ed25519Key","Curve25519Key":"toot:Curve25519Key","EncryptedMessage":"toot:EncryptedMessage","publicKeyBase64":"toot:publicKeyBase64","deviceId":"toot:deviceId","claim":{"@type":"@id","@id":"toot:claim"},"fingerprintKey":{"@type":"@id","@id":"toot:fingerprintKey"},"identityKey":{"@type":"@id","@id":"toot:identityKey"},"devices":{"@type":"@id","@id":"toot:devices"},"messageFranking":"toot:messageFranking","messageType":"toot:messageType","cipherText":"toot:cipherText","suspended":"toot:suspended"}],"id":"https://masto.asonix.dog/actor","type":"Application","inbox":"https://masto.asonix.dog/actor/inbox","outbox":"https://masto.asonix.dog/actor/outbox","preferredUsername":"masto.asonix.dog","url":"https://masto.asonix.dog/about/more?instance_actor=true","manuallyApprovesFollowers":true,"publicKey":{"id":"https://masto.asonix.dog/actor#main-key","owner":"https://masto.asonix.dog/actor","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n"},"endpoints":{"sharedInbox":"https://masto.asonix.dog/inbox"}}"#;
const KARJALAZET_RELAY: &'static str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://pleroma.karjalazet.se/schemas/litepub-0.1.jsonld",{"@language":"und"}],"alsoKnownAs":[],"attachment":[],"capabilities":{},"discoverable":false,"endpoints":{"oauthAuthorizationEndpoint":"https://pleroma.karjalazet.se/oauth/authorize","oauthRegistrationEndpoint":"https://pleroma.karjalazet.se/api/v1/apps","oauthTokenEndpoint":"https://pleroma.karjalazet.se/oauth/token","sharedInbox":"https://pleroma.karjalazet.se/inbox","uploadMedia":"https://pleroma.karjalazet.se/api/ap/upload_media"},"featured":"https://pleroma.karjalazet.se/relay/collections/featured","followers":"https://pleroma.karjalazet.se/relay/followers","following":"https://pleroma.karjalazet.se/relay/following","id":"https://pleroma.karjalazet.se/relay","inbox":"https://pleroma.karjalazet.se/relay/inbox","manuallyApprovesFollowers":false,"name":null,"outbox":"https://pleroma.karjalazet.se/relay/outbox","preferredUsername":"relay","publicKey":{"id":"https://pleroma.karjalazet.se/relay#main-key","owner":"https://pleroma.karjalazet.se/relay","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n"},"summary":"","tag":[],"type":"Person","url":"https://pleroma.karjalazet.se/relay"}"#; const KARJALAZET_RELAY: &str = r#"{"@context":["https://www.w3.org/ns/activitystreams","https://pleroma.karjalazet.se/schemas/litepub-0.1.jsonld",{"@language":"und"}],"alsoKnownAs":[],"attachment":[],"capabilities":{},"discoverable":false,"endpoints":{"oauthAuthorizationEndpoint":"https://pleroma.karjalazet.se/oauth/authorize","oauthRegistrationEndpoint":"https://pleroma.karjalazet.se/api/v1/apps","oauthTokenEndpoint":"https://pleroma.karjalazet.se/oauth/token","sharedInbox":"https://pleroma.karjalazet.se/inbox","uploadMedia":"https://pleroma.karjalazet.se/api/ap/upload_media"},"featured":"https://pleroma.karjalazet.se/relay/collections/featured","followers":"https://pleroma.karjalazet.se/relay/followers","following":"https://pleroma.karjalazet.se/relay/following","id":"https://pleroma.karjalazet.se/relay","inbox":"https://pleroma.karjalazet.se/relay/inbox","manuallyApprovesFollowers":false,"name":null,"outbox":"https://pleroma.karjalazet.se/relay/outbox","preferredUsername":"relay","publicKey":{"id":"https://pleroma.karjalazet.se/relay#main-key","owner":"https://pleroma.karjalazet.se/relay","publicKeyPem":"-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n"},"summary":"","tag":[],"type":"Person","url":"https://pleroma.karjalazet.se/relay"}"#;
const ASONIX_DOG_KEY: &'static str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n"; const ASONIX_DOG_KEY: &str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAx8zXS0QNg9YGUBsxAOBH\nJaxIn7i6t+Z4UOpSFDVa2kP0NvQgIJsq3wzRqvaiuncRWpkyFk1fTakiRGD32xnY\nt+juuAaIBlU8eswKyANFqhcLAvFHmT3rA1848M4/YM19djvlL/PR9T53tPNHU+el\nS9MlsG3o6Zsj8YaUJtCI8RgEuJoROLHUb/V9a3oMQ7CfuIoSvF3VEz3/dRT09RW6\n0wQX7yhka9WlKuayWLWmTcB9lAIX6neBk+qKc8VSEsO7mHkzB8mRgVcS2uYZl1eA\nD8/jTT+SlpeFNDZk0Oh35GNFoOxh9qjRw3NGxu7jJCVBehDODzasOv4xDxKAhONa\njQIDAQAB\n-----END PUBLIC KEY-----\n";
const KARJALAZET_KEY: &'static str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n"; const KARJALAZET_KEY: &str = "-----BEGIN PUBLIC KEY-----\nMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAucoyCht6QpEzUPdQWP/J\nJYxObSH3MCcXBnG4d0OX78QshloeAHhl78EZ5c8I0ePmIjDg2NFK3/pG0EvSrHe2\nIZHnHaN5emgCb2ifNya5W572yfQXo1tUQy+ZXtbTUA7BWbr4LuCvd+HUavMwbx72\neraSZTiQj//ObwpbXFoZO5I/+e5avGmVnfmr/y2cG95hqFDtI3438RgZyBjY5kJM\nY1MLWoY9itGSfYmBtxRj3umlC2bPuBB+hHUJi6TvP7NO6zuUZ66m4ETyuBDi8iP6\ngnUp3Q4+1/I3nDUmhjt7OXckUcX3r5M4UHD3VVUFG0aZw6WWMEAxlyFf/07fCkhR\nBwIDAQAB\n-----END PUBLIC KEY-----\n\n";
#[test] #[test]
fn handles_masto_keys() { fn handles_masto_keys() {

View File

@ -1,4 +1,7 @@
use crate::error::{Error, ErrorKind}; use crate::{
data::LastOnline,
error::{Error, ErrorKind},
};
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_web::http::header::Date; use actix_web::http::header::Date;
use awc::{error::SendRequestError, Client, ClientResponse}; use awc::{error::SendRequestError, Client, ClientResponse};
@ -146,6 +149,7 @@ pub(crate) struct Requests {
private_key: RsaPrivateKey, private_key: RsaPrivateKey,
config: Config, config: Config,
breakers: Breakers, breakers: Breakers,
last_online: Arc<LastOnline>,
} }
impl std::fmt::Debug for Requests { impl std::fmt::Debug for Requests {
@ -174,6 +178,7 @@ impl Requests {
private_key: RsaPrivateKey, private_key: RsaPrivateKey,
user_agent: String, user_agent: String,
breakers: Breakers, breakers: Breakers,
last_online: Arc<LastOnline>,
) -> Self { ) -> Self {
Requests { Requests {
client: Rc::new(RefCell::new(build_client(&user_agent))), client: Rc::new(RefCell::new(build_client(&user_agent))),
@ -184,6 +189,7 @@ impl Requests {
private_key, private_key,
config: Config::default().mastodon_compat(), config: Config::default().mastodon_compat(),
breakers, breakers,
last_online,
} }
} }
@ -233,6 +239,7 @@ impl Requests {
return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into()); return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into());
} }
self.last_online.mark_seen(&parsed_url);
self.breakers.succeed(&parsed_url); self.breakers.succeed(&parsed_url);
Ok(res) Ok(res)

View File

@ -16,7 +16,8 @@ use activitystreams::{
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpResponse};
use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified}; use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified};
#[tracing::instrument(name = "Inbox", skip_all)] #[tracing::instrument(name = "Inbox", skip_all, fields(id = tracing::field::debug(&input.id_unchecked()), kind = tracing::field::debug(&input.kind())))]
#[allow(clippy::too_many_arguments)]
pub(crate) async fn route( pub(crate) async fn route(
state: web::Data<State>, state: web::Data<State>,
actors: web::Data<ActorCache>, actors: web::Data<ActorCache>,
@ -24,22 +25,48 @@ pub(crate) async fn route(
client: web::Data<Requests>, client: web::Data<Requests>,
jobs: web::Data<JobServer>, jobs: web::Data<JobServer>,
input: web::Json<AcceptedActivities>, input: web::Json<AcceptedActivities>,
verified: Option<(SignatureVerified, DigestVerified)>, digest_verified: Option<DigestVerified>,
signature_verified: Option<SignatureVerified>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let input = input.into_inner(); let input = input.into_inner();
let actor = actors let kind = input.kind().ok_or(ErrorKind::MissingKind)?;
.get(
input.actor()?.as_single_id().ok_or(ErrorKind::MissingId)?,
&client,
)
.await?
.into_inner();
let is_allowed = state.db.is_allowed(actor.id.clone()); if digest_verified.is_some() && signature_verified.is_none() && *kind == ValidTypes::Delete {
let is_connected = state.db.is_connected(actor.id.clone()); return Ok(accepted(serde_json::json!({})));
} else if config.validate_signatures()
&& (digest_verified.is_none() || signature_verified.is_none())
{
return Err(ErrorKind::NoSignature(None).into());
}
let (is_allowed, is_connected) = tokio::try_join!(is_allowed, is_connected)?; let actor_id = if input.id_unchecked().is_some() {
input.actor()?.as_single_id().ok_or(ErrorKind::MissingId)?
} else {
input
.actor_unchecked()
.as_single_id()
.ok_or(ErrorKind::MissingId)?
};
let actor = actors.get(actor_id, &client).await?.into_inner();
if let Some(verified) = signature_verified {
if actor.public_key_id.as_str() != verified.key_id() {
tracing::error!("Actor signed with wrong key");
return Err(ErrorKind::BadActor(
actor.public_key_id.to_string(),
verified.key_id().to_owned(),
)
.into());
}
} else if config.validate_signatures() {
tracing::error!("This case should never be reachable, since I handle signature checks earlier in the flow. If you see this in a log it means I did it wrong");
return Err(ErrorKind::NoSignature(Some(actor.public_key_id.to_string())).into());
}
let is_allowed = state.db.is_allowed(actor.id.clone()).await?;
let is_connected = state.db.is_connected(actor.id.clone()).await?;
if !is_allowed { if !is_allowed {
return Err(ErrorKind::NotAllowed(actor.id.to_string()).into()); return Err(ErrorKind::NotAllowed(actor.id.to_string()).into());
@ -49,29 +76,16 @@ pub(crate) async fn route(
return Err(ErrorKind::NotSubscribed(actor.id.to_string()).into()); return Err(ErrorKind::NotSubscribed(actor.id.to_string()).into());
} }
if config.validate_signatures() && verified.is_none() { match kind {
return Err(ErrorKind::NoSignature(actor.public_key_id.to_string()).into());
} else if config.validate_signatures() {
if let Some((verified, _)) = verified {
if actor.public_key_id.as_str() != verified.key_id() {
tracing::error!("Bad actor, more info: {:?}", input);
return Err(ErrorKind::BadActor(
actor.public_key_id.to_string(),
verified.key_id().to_owned(),
)
.into());
}
}
}
match input.kind().ok_or(ErrorKind::MissingKind)? {
ValidTypes::Accept => handle_accept(&config, input).await?, ValidTypes::Accept => handle_accept(&config, input).await?,
ValidTypes::Reject => handle_reject(&config, &jobs, input, actor).await?, ValidTypes::Reject => handle_reject(&config, &jobs, input, actor).await?,
ValidTypes::Announce | ValidTypes::Create => { ValidTypes::Announce | ValidTypes::Create => {
handle_announce(&state, &jobs, input, actor).await? handle_announce(&state, &jobs, input, actor).await?
} }
ValidTypes::Follow => handle_follow(&config, &jobs, input, actor).await?, ValidTypes::Follow => handle_follow(&config, &jobs, input, actor).await?,
ValidTypes::Delete | ValidTypes::Update => handle_forward(&jobs, input, actor).await?, ValidTypes::Add | ValidTypes::Delete | ValidTypes::Remove | ValidTypes::Update => {
handle_forward(&jobs, input, actor).await?
}
ValidTypes::Undo => handle_undo(&config, &jobs, input, actor, is_connected).await?, ValidTypes::Undo => handle_undo(&config, &jobs, input, actor, is_connected).await?,
}; };
@ -203,7 +217,7 @@ async fn handle_announce(
.as_single_id() .as_single_id()
.ok_or(ErrorKind::MissingId)?; .ok_or(ErrorKind::MissingId)?;
if state.is_cached(object_id).await { if state.is_cached(object_id) {
return Err(ErrorKind::Duplicate.into()); return Err(ErrorKind::Duplicate.into());
} }

View File

@ -24,18 +24,18 @@ struct Links {
links: Vec<Link>, links: Vec<Link>,
} }
#[tracing::instrument(name = "NodeInfo")] #[tracing::instrument(name = "NodeInfo", skip_all)]
pub(crate) async fn route( pub(crate) async fn route(
config: web::Data<Config>, config: web::Data<Config>,
state: web::Data<State>, state: web::Data<State>,
) -> web::Json<NodeInfo> { ) -> web::Json<NodeInfo> {
let (inboxes, blocks) = tokio::join!(state.db.inboxes(), async { let inboxes = state.db.inboxes().await;
if config.publish_blocks() {
Some(state.db.blocks().await.unwrap_or_default()) let blocks = if config.publish_blocks() {
} else { Some(state.db.blocks().await.unwrap_or_default())
None } else {
} None
}); };
let peers = inboxes let peers = inboxes
.unwrap_or_default() .unwrap_or_default()

View File

@ -5,7 +5,7 @@ use actix_web::{
}; };
#[allow(clippy::async_yields_async)] #[allow(clippy::async_yields_async)]
#[tracing::instrument(name = "Statistics")] #[tracing::instrument(name = "Statics")]
pub(crate) async fn route(filename: web::Path<String>) -> HttpResponse { pub(crate) async fn route(filename: web::Path<String>) -> HttpResponse {
if let Some(data) = StaticFile::get(&filename.into_inner()) { if let Some(data) = StaticFile::get(&filename.into_inner()) {
HttpResponse::Ok() HttpResponse::Ok()

View File

@ -0,0 +1,15 @@
[Unit]
Description=Activitypub Relay
Documentation=https://git.asonix.dog/asonix/relay
Wants=network.target
After=network.target
[Install]
WantedBy=multi-user.target
[Service]
Type=simple
EnvironmentFile=/etc/systemd/system/example-relay.service.env
ExecStart=/path/to/relay
Restart=always

View File

@ -0,0 +1,19 @@
HOSTNAME='relay.example.com'
ADDR='0.0.0.0'
PORT='8080'
RESTRICTED_MODE='true'
VALIDATE_SIGNATURES='true'
HTTPS='true'
PRETTY_LOG='false'
PUBLISH_BLOCKS='true'
DEBUG='false'
SLED_PATH='/opt/sled'
TELEGRAM_ADMIN_HANDLE='myhandle'
RUST_BACKTRACE='full'
FOOTER_BLURB='Contact <a href="https://masto.example.com/@example">@example</a> for inquiries.'
LOCAL_DOMAINS='masto.example.com'
LOCAL_BLURB='<p>An ActivityPub relay for servers. Currently running somewhere. Let me know if you want to join!</p>'
OPENTELEMETRY_URL='http://otel.example.com:4317'
API_TOKEN='blahblahblahblahblahblahblah'
TELEGRAM_TOKEN='blahblahblahblahblahblahblah'

View File

@ -0,0 +1,11 @@
[Unit]
Description=Activitypub Relay Socket
Before=multi-user.target
After=network.target
[Socket]
Service=example-relay.service
ListenStream=8080
[Install]
WantedBy=sockets.target

View File

@ -6,7 +6,7 @@
<div class="admin"> <div class="admin">
<div class="left"> <div class="left">
<figure class="avatar"> <figure class="avatar">
<img src="@contact.avatar" alt="@contact.display_name's avatar"> <img loading="lazy" src="@contact.avatar" alt="@contact.display_name's avatar">
</figure> </figure>
</div> </div>
<div class="right"> <div class="right">