Compare commits

..

No commits in common. "48080344ea18855ef0f71f8a46576bdaa914c950" and "9433f36cc55ed02555eaba6609b37cd98ecb4133" have entirely different histories.

30 changed files with 1139 additions and 1653 deletions

1984
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,11 +1,11 @@
[package] [package]
name = "ap-relay" name = "ap-relay"
description = "A simple activitypub relay" description = "A simple activitypub relay"
version = "0.3.108" version = "0.3.104"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0" license = "AGPL-3.0"
readme = "README.md" readme = "README.md"
repository = "https://git.asonix.dog/asonix/relay" repository = "https://git.asonix.dog/asonix/ap-relay"
keywords = ["activitypub", "relay"] keywords = ["activitypub", "relay"]
edition = "2021" edition = "2021"
build = "src/build.rs" build = "src/build.rs"
@ -15,55 +15,51 @@ name = "relay"
path = "src/main.rs" path = "src/main.rs"
[features] [features]
console = ["dep:console-subscriber"] console = ["console-subscriber"]
default = [] default = []
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
actix-web = { version = "4.4.0", default-features = false, features = ["compress-brotli", "compress-gzip", "rustls-0_22"] } anyhow = "1.0"
actix-rt = "2.7.0"
actix-web = { version = "4.4.0", default-features = false, features = ["compress-brotli", "compress-gzip", "rustls-0_21"] }
actix-webfinger = { version = "0.5.0", default-features = false } actix-webfinger = { version = "0.5.0", default-features = false }
activitystreams = "0.7.0-alpha.25" activitystreams = "0.7.0-alpha.25"
activitystreams-ext = "0.1.0-alpha.3" activitystreams-ext = "0.1.0-alpha.3"
ammonia = "3.1.0" ammonia = "3.1.0"
async-cpupool = "0.2.0"
bcrypt = "0.15" bcrypt = "0.15"
base64 = "0.21" base64 = "0.21"
clap = { version = "4.0.0", features = ["derive"] } clap = { version = "4.0.0", features = ["derive"] }
color-eyre = "0.6.2" config = "0.13.0"
config = { version = "0.14.0", default-features = false, features = ["toml", "json", "yaml"] } console-subscriber = { version = "0.1", optional = true }
console-subscriber = { version = "0.2", optional = true }
dashmap = "5.1.0" dashmap = "5.1.0"
dotenv = "0.15.0" dotenv = "0.15.0"
flume = "0.11.0" flume = "0.11.0"
lru = "0.12.0" lru = "0.11.0"
metrics = "0.22.0" metrics = "0.21.0"
metrics-exporter-prometheus = { version = "0.13.0", default-features = false, features = [ metrics-exporter-prometheus = { version = "0.12.0", default-features = false, features = [
"http-listener", "http-listener",
] } ] }
metrics-util = "0.16.0" metrics-util = "0.15.0"
mime = "0.3.16" mime = "0.3.16"
minify-html = "0.15.0" minify-html = "0.11.0"
opentelemetry = "0.21" opentelemetry = { version = "0.20", features = ["rt-tokio"] }
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] } opentelemetry-otlp = "0.13"
opentelemetry-otlp = "0.14"
pin-project-lite = "0.2.9" pin-project-lite = "0.2.9"
# pinned to metrics-util quanta = "0.11.0"
quanta = "0.12.0"
rand = "0.8" rand = "0.8"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "stream"]} reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "stream"]}
reqwest-middleware = "0.2" reqwest-middleware = "0.2"
reqwest-tracing = "0.4.5" reqwest-tracing = "0.4.5"
ring = "0.17.5" ring = "0.16.20"
rsa = { version = "0.9" } rsa = { version = "0.9" }
rsa-magic-public-key = "0.8.0" rsa-magic-public-key = "0.8.0"
rustls = "0.22.0" rustls = "0.21.0"
rustls-channel-resolver = "0.2.0" rustls-pemfile = "1.0.1"
rustls-pemfile = "2"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sled = "0.34.7" sled = "0.34.7"
streem = "0.2.0"
teloxide = { version = "0.12.0", default-features = false, features = [ teloxide = { version = "0.12.0", default-features = false, features = [
"ctrlc_handler", "ctrlc_handler",
"macros", "macros",
@ -73,39 +69,41 @@ thiserror = "1.0"
time = { version = "0.3.17", features = ["serde"] } time = { version = "0.3.17", features = ["serde"] }
tracing = "0.1" tracing = "0.1"
tracing-error = "0.2" tracing-error = "0.2"
tracing-log = "0.2" tracing-futures = "0.2"
tracing-opentelemetry = "0.22" tracing-log = "0.1"
tracing-opentelemetry = "0.21"
tracing-subscriber = { version = "0.3", features = [ tracing-subscriber = { version = "0.3", features = [
"ansi", "ansi",
"env-filter", "env-filter",
"fmt", "fmt",
] } ] }
tokio = { version = "1", features = ["full", "tracing"] } tokio = { version = "1", features = ["macros", "sync"] }
uuid = { version = "1", features = ["v4", "serde"] } uuid = { version = "1", features = ["v4", "serde"] }
streem = "0.1.0"
[dependencies.background-jobs] [dependencies.background-jobs]
version = "0.18.0" version = "0.15.0"
default-features = false default-features = false
features = ["error-logging", "metrics", "tokio"] features = ["background-jobs-actix", "error-logging"]
[dependencies.http-signature-normalization-actix] [dependencies.http-signature-normalization-actix]
version = "0.11.0" version = "0.10.1"
default-features = false default-features = false
features = ["server", "ring"] features = ["server", "ring"]
[dependencies.http-signature-normalization-reqwest] [dependencies.http-signature-normalization-reqwest]
version = "0.11.0" version = "0.10.0"
default-features = false default-features = false
features = ["middleware", "ring"] features = ["middleware", "ring"]
[dependencies.tracing-actix-web] [dependencies.tracing-actix-web]
version = "0.7.9" version = "0.7.6"
[build-dependencies] [build-dependencies]
color-eyre = "0.6.2" anyhow = "1.0"
dotenv = "0.15.0" dotenv = "0.15.0"
ructe = { version = "0.17.0", features = ["sass", "mime03"] } ructe = { version = "0.17.0", features = ["sass", "mime03"] }
toml = "0.8.0" toml = "0.7.0"
[profile.dev.package.rsa] [profile.dev.package.rsa]
opt-level = 3 opt-level = 3

View File

@ -5,11 +5,11 @@
"systems": "systems" "systems": "systems"
}, },
"locked": { "locked": {
"lastModified": 1701680307, "lastModified": 1692799911,
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=", "narHash": "sha256-3eihraek4qL744EvQXsK1Ha6C3CR7nnT8X2qWap4RNk=",
"owner": "numtide", "owner": "numtide",
"repo": "flake-utils", "repo": "flake-utils",
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725", "rev": "f9e7cf818399d17d347f847525c5a5a8032e4e44",
"type": "github" "type": "github"
}, },
"original": { "original": {
@ -20,11 +20,11 @@
}, },
"nixpkgs": { "nixpkgs": {
"locked": { "locked": {
"lastModified": 1705133751, "lastModified": 1693003285,
"narHash": "sha256-rCIsyE80jgiOU78gCWN3A0wE0tR2GI5nH6MlS+HaaSQ=", "narHash": "sha256-5nm4yrEHKupjn62MibENtfqlP6pWcRTuSKrMiH9bLkc=",
"owner": "NixOS", "owner": "NixOS",
"repo": "nixpkgs", "repo": "nixpkgs",
"rev": "9b19f5e77dd906cb52dade0b7bd280339d2a1f3d", "rev": "5690c4271f2998c304a45c91a0aeb8fb69feaea7",
"type": "github" "type": "github"
}, },
"original": { "original": {

View File

@ -1,14 +1,17 @@
{ lib { lib
, nixosTests , nixosTests
, protobuf
, rustPlatform , rustPlatform
}: }:
rustPlatform.buildRustPackage { rustPlatform.buildRustPackage {
pname = "relay"; pname = "relay";
version = "0.3.108"; version = "0.3.104";
src = ./.; src = ./.;
cargoLock.lockFile = ./Cargo.lock; cargoLock.lockFile = ./Cargo.lock;
PROTOC = "${protobuf}/bin/protoc";
PROTOC_INCLUDE = "${protobuf}/include";
RUSTFLAGS = "--cfg tokio_unstable"; RUSTFLAGS = "--cfg tokio_unstable";
nativeBuildInputs = [ ]; nativeBuildInputs = [ ];

View File

@ -21,7 +21,7 @@ fn git_info() {
} }
} }
fn version_info() -> color_eyre::Result<()> { fn version_info() -> Result<(), anyhow::Error> {
let cargo_toml = Path::new(&std::env::var("CARGO_MANIFEST_DIR")?).join("Cargo.toml"); let cargo_toml = Path::new(&std::env::var("CARGO_MANIFEST_DIR")?).join("Cargo.toml");
let mut file = File::open(cargo_toml)?; let mut file = File::open(cargo_toml)?;
@ -42,7 +42,7 @@ fn version_info() -> color_eyre::Result<()> {
Ok(()) Ok(())
} }
fn main() -> color_eyre::Result<()> { fn main() -> Result<(), anyhow::Error> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
git_info(); git_info();

View File

@ -1,4 +1,4 @@
use metrics::{Key, Metadata, Recorder, SetRecorderError}; use metrics::{Key, Recorder, SetRecorderError};
use metrics_util::{ use metrics_util::{
registry::{AtomicStorage, GenerationalStorage, Recency, Registry}, registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
MetricKindMask, Summary, MetricKindMask, Summary,
@ -289,7 +289,7 @@ impl Inner {
} }
let mut d = self.distributions.write().unwrap(); let mut d = self.distributions.write().unwrap();
let outer_entry = d.entry(name.clone()).or_default(); let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new);
let entry = outer_entry let entry = outer_entry
.entry(labels) .entry(labels)
@ -360,8 +360,8 @@ impl MemoryCollector {
d.entry(key.as_str().to_owned()).or_insert(description); d.entry(key.as_str().to_owned()).or_insert(description);
} }
pub(crate) fn install(&self) -> Result<(), SetRecorderError<Self>> { pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
metrics::set_global_recorder(self.clone()) metrics::set_boxed_recorder(Box::new(self.clone()))
} }
} }
@ -393,19 +393,19 @@ impl Recorder for MemoryCollector {
self.add_description_if_missing(&key, description) self.add_description_if_missing(&key, description)
} }
fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> metrics::Counter { fn register_counter(&self, key: &Key) -> metrics::Counter {
self.inner self.inner
.registry .registry
.get_or_create_counter(key, |c| c.clone().into()) .get_or_create_counter(key, |c| c.clone().into())
} }
fn register_gauge(&self, key: &Key, _: &Metadata<'_>) -> metrics::Gauge { fn register_gauge(&self, key: &Key) -> metrics::Gauge {
self.inner self.inner
.registry .registry
.get_or_create_gauge(key, |c| c.clone().into()) .get_or_create_gauge(key, |c| c.clone().into())
} }
fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> metrics::Histogram { fn register_histogram(&self, key: &Key) -> metrics::Histogram {
self.inner self.inner
.registry .registry
.get_or_create_histogram(key, |c| c.clone().into()) .get_or_create_histogram(key, |c| c.clone().into())

View File

@ -12,8 +12,9 @@ use activitystreams::{
}; };
use config::Environment; use config::Environment;
use http_signature_normalization_actix::{digest::ring::Sha256, prelude::VerifyDigest}; use http_signature_normalization_actix::{digest::ring::Sha256, prelude::VerifyDigest};
use rustls::sign::CertifiedKey; use rustls::{Certificate, PrivateKey};
use std::{ use std::{
io::BufReader,
net::{IpAddr, SocketAddr}, net::{IpAddr, SocketAddr},
path::PathBuf, path::PathBuf,
}; };
@ -311,34 +312,43 @@ impl Config {
Some((config.addr, config.port).into()) Some((config.addr, config.port).into())
} }
pub(crate) async fn open_keys(&self) -> Result<Option<CertifiedKey>, Error> { pub(crate) fn open_keys(&self) -> Result<Option<(Vec<Certificate>, PrivateKey)>, Error> {
let tls = if let Some(tls) = &self.tls { let tls = if let Some(tls) = &self.tls {
tls tls
} else { } else {
tracing::info!("No TLS config present"); tracing::warn!("No TLS config present");
return Ok(None); return Ok(None);
}; };
let certs_bytes = tokio::fs::read(&tls.cert).await?; let mut certs_reader = BufReader::new(std::fs::File::open(&tls.cert)?);
let certs = let certs = rustls_pemfile::certs(&mut certs_reader)?;
rustls_pemfile::certs(&mut certs_bytes.as_slice()).collect::<Result<Vec<_>, _>>()?;
if certs.is_empty() { if certs.is_empty() {
tracing::warn!("No certs read from certificate file"); tracing::warn!("No certs read from certificate file");
return Ok(None); return Ok(None);
} }
let key_bytes = tokio::fs::read(&tls.key).await?; let mut key_reader = BufReader::new(std::fs::File::open(&tls.key)?);
let key = if let Some(key) = rustls_pemfile::private_key(&mut key_bytes.as_slice())? { let key = rustls_pemfile::read_one(&mut key_reader)?;
key
let certs = certs.into_iter().map(Certificate).collect();
let key = if let Some(key) = key {
match key {
rustls_pemfile::Item::RSAKey(der) => PrivateKey(der),
rustls_pemfile::Item::PKCS8Key(der) => PrivateKey(der),
rustls_pemfile::Item::ECKey(der) => PrivateKey(der),
_ => {
tracing::warn!("Unknown key format: {:?}", key);
return Ok(None);
}
}
} else { } else {
tracing::warn!("Failed to read private key"); tracing::warn!("Failed to read private key");
return Ok(None); return Ok(None);
}; };
let key = rustls::crypto::ring::sign::any_supported_type(&key)?; Ok(Some((certs, key)))
Ok(Some(CertifiedKey::new(certs, key)))
} }
pub(crate) fn footer_blurb(&self) -> Option<crate::templates::Html<String>> { pub(crate) fn footer_blurb(&self) -> Option<crate::templates::Html<String>> {

View File

@ -750,11 +750,6 @@ mod tests {
{ {
let db = let db =
Db::build_inner(true, sled::Config::new().temporary(true).open().unwrap()).unwrap(); Db::build_inner(true, sled::Config::new().temporary(true).open().unwrap()).unwrap();
actix_rt::System::new().block_on((f)(db));
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on((f)(db));
} }
} }

View File

@ -1,85 +1,57 @@
use activitystreams::checked::CheckError; use activitystreams::checked::CheckError;
use actix_rt::task::JoinError;
use actix_web::{ use actix_web::{
error::{BlockingError, ResponseError}, error::{BlockingError, ResponseError},
http::StatusCode, http::StatusCode,
HttpResponse, HttpResponse,
}; };
use background_jobs::BoxError;
use color_eyre::eyre::Error as Report;
use http_signature_normalization_reqwest::SignError; use http_signature_normalization_reqwest::SignError;
use std::{convert::Infallible, io, sync::Arc}; use std::{convert::Infallible, fmt::Debug, io};
use tokio::task::JoinError; use tracing_error::SpanTrace;
#[derive(Clone)]
struct ArcKind {
kind: Arc<ErrorKind>,
}
impl std::fmt::Debug for ArcKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.kind.fmt(f)
}
}
impl std::fmt::Display for ArcKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.kind.fmt(f)
}
}
impl std::error::Error for ArcKind {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.kind.source()
}
}
pub(crate) struct Error { pub(crate) struct Error {
kind: ArcKind, context: String,
display: Box<str>, kind: ErrorKind,
debug: Box<str>,
} }
impl Error { impl Error {
fn kind(&self) -> &ErrorKind {
&self.kind.kind
}
pub(crate) fn is_breaker(&self) -> bool { pub(crate) fn is_breaker(&self) -> bool {
matches!(self.kind(), ErrorKind::Breaker) matches!(self.kind, ErrorKind::Breaker)
} }
pub(crate) fn is_not_found(&self) -> bool { pub(crate) fn is_not_found(&self) -> bool {
matches!(self.kind(), ErrorKind::Status(_, StatusCode::NOT_FOUND)) matches!(self.kind, ErrorKind::Status(_, StatusCode::NOT_FOUND))
} }
pub(crate) fn is_bad_request(&self) -> bool { pub(crate) fn is_bad_request(&self) -> bool {
matches!(self.kind(), ErrorKind::Status(_, StatusCode::BAD_REQUEST)) matches!(self.kind, ErrorKind::Status(_, StatusCode::BAD_REQUEST))
} }
pub(crate) fn is_gone(&self) -> bool { pub(crate) fn is_gone(&self) -> bool {
matches!(self.kind(), ErrorKind::Status(_, StatusCode::GONE)) matches!(self.kind, ErrorKind::Status(_, StatusCode::GONE))
} }
pub(crate) fn is_malformed_json(&self) -> bool { pub(crate) fn is_malformed_json(&self) -> bool {
matches!(self.kind(), ErrorKind::Json(_)) matches!(self.kind, ErrorKind::Json(_))
} }
} }
impl std::fmt::Debug for Error { impl std::fmt::Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.debug) writeln!(f, "{:?}", self.kind)
} }
} }
impl std::fmt::Display for Error { impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.display) writeln!(f, "{}", self.kind)?;
std::fmt::Display::fmt(&self.context, f)
} }
} }
impl std::error::Error for Error { impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.kind().source() self.kind.source()
} }
} }
@ -88,36 +60,25 @@ where
ErrorKind: From<T>, ErrorKind: From<T>,
{ {
fn from(error: T) -> Self { fn from(error: T) -> Self {
let kind = ArcKind {
kind: Arc::new(ErrorKind::from(error)),
};
let report = Report::new(kind.clone());
let display = format!("{report}");
let debug = format!("{report:?}");
Error { Error {
kind, context: SpanTrace::capture().to_string(),
display: Box::from(display), kind: error.into(),
debug: Box::from(debug),
} }
} }
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum ErrorKind { pub(crate) enum ErrorKind {
#[error("Error in extractor")] #[error("Error queueing job, {0}")]
Extractor(#[from] crate::extractors::ErrorKind), Queue(anyhow::Error),
#[error("Error queueing job")] #[error("Error in configuration, {0}")]
Queue(#[from] BoxError),
#[error("Error in configuration")]
Config(#[from] config::ConfigError), Config(#[from] config::ConfigError),
#[error("Couldn't parse key")] #[error("Couldn't parse key, {0}")]
Pkcs8(#[from] rsa::pkcs8::Error), Pkcs8(#[from] rsa::pkcs8::Error),
#[error("Couldn't encode public key")] #[error("Couldn't encode public key, {0}")]
Spki(#[from] rsa::pkcs8::spki::Error), Spki(#[from] rsa::pkcs8::spki::Error),
#[error("Couldn't sign request")] #[error("Couldn't sign request")]
@ -126,36 +87,33 @@ pub(crate) enum ErrorKind {
#[error("Couldn't make request")] #[error("Couldn't make request")]
Reqwest(#[from] reqwest::Error), Reqwest(#[from] reqwest::Error),
#[error("Couldn't make request")] #[error("Couldn't build client")]
ReqwestMiddleware(#[from] reqwest_middleware::Error), ReqwestMiddleware(#[from] reqwest_middleware::Error),
#[error("Couldn't parse IRI")] #[error("Couldn't parse IRI, {0}")]
ParseIri(#[from] activitystreams::iri_string::validate::Error), ParseIri(#[from] activitystreams::iri_string::validate::Error),
#[error("Couldn't normalize IRI")] #[error("Couldn't normalize IRI, {0}")]
NormalizeIri(#[from] std::collections::TryReserveError), NormalizeIri(#[from] std::collections::TryReserveError),
#[error("Couldn't perform IO")] #[error("Couldn't perform IO, {0}")]
Io(#[from] io::Error), Io(#[from] io::Error),
#[error("Couldn't sign string, {0}")] #[error("Couldn't sign string, {0}")]
Rsa(rsa::errors::Error), Rsa(rsa::errors::Error),
#[error("Couldn't use db")] #[error("Couldn't use db, {0}")]
Sled(#[from] sled::Error), Sled(#[from] sled::Error),
#[error("Couldn't do the json thing")] #[error("Couldn't do the json thing, {0}")]
Json(#[from] serde_json::Error), Json(#[from] serde_json::Error),
#[error("Couldn't sign request")] #[error("Couldn't sign request, {0}")]
Sign(#[from] SignError), Sign(#[from] SignError),
#[error("Couldn't sign digest")] #[error("Couldn't sign digest")]
Signature(#[from] rsa::signature::Error), Signature(#[from] rsa::signature::Error),
#[error("Couldn't prepare TLS private key")]
PrepareKey(#[from] rustls::Error),
#[error("Couldn't verify signature")] #[error("Couldn't verify signature")]
VerifySignature, VerifySignature,
@ -186,10 +144,10 @@ pub(crate) enum ErrorKind {
#[error("Wrong ActivityPub kind, {0}")] #[error("Wrong ActivityPub kind, {0}")]
Kind(String), Kind(String),
#[error("Too many CPUs")] #[error("Too many CPUs, {0}")]
CpuCount(#[from] std::num::TryFromIntError), CpuCount(#[from] std::num::TryFromIntError),
#[error("Host mismatch")] #[error("{0}")]
HostMismatch(#[from] CheckError), HostMismatch(#[from] CheckError),
#[error("Couldn't flush buffer")] #[error("Couldn't flush buffer")]
@ -243,7 +201,7 @@ pub(crate) enum ErrorKind {
impl ResponseError for Error { impl ResponseError for Error {
fn status_code(&self) -> StatusCode { fn status_code(&self) -> StatusCode {
match self.kind() { match self.kind {
ErrorKind::NotAllowed(_) | ErrorKind::WrongActor(_) | ErrorKind::BadActor(_, _) => { ErrorKind::NotAllowed(_) | ErrorKind::WrongActor(_) | ErrorKind::BadActor(_, _) => {
StatusCode::FORBIDDEN StatusCode::FORBIDDEN
} }
@ -263,7 +221,7 @@ impl ResponseError for Error {
.insert_header(("Content-Type", "application/activity+json")) .insert_header(("Content-Type", "application/activity+json"))
.body( .body(
serde_json::to_string(&serde_json::json!({ serde_json::to_string(&serde_json::json!({
"error": self.kind().to_string(), "error": self.kind.to_string(),
})) }))
.unwrap_or_else(|_| "{}".to_string()), .unwrap_or_else(|_| "{}".to_string()),
) )

View File

@ -1,15 +1,19 @@
use actix_web::{ use actix_web::{
dev::Payload, dev::Payload,
error::ParseError, error::ParseError,
http::header::{from_one_raw_str, Header, HeaderName, HeaderValue, TryIntoHeaderValue}, http::{
header::{from_one_raw_str, Header, HeaderName, HeaderValue, TryIntoHeaderValue},
StatusCode,
},
web::Data, web::Data,
FromRequest, HttpMessage, HttpRequest, FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError,
}; };
use bcrypt::{BcryptError, DEFAULT_COST}; use bcrypt::{BcryptError, DEFAULT_COST};
use http_signature_normalization_actix::{prelude::InvalidHeaderValue, Canceled, Spawn}; use http_signature_normalization_actix::{prelude::InvalidHeaderValue, Canceled, Spawn};
use std::{convert::Infallible, str::FromStr, time::Instant}; use std::{convert::Infallible, str::FromStr, time::Instant};
use tracing_error::SpanTrace;
use crate::{db::Db, error::Error, future::LocalBoxFuture, spawner::Spawner}; use crate::{db::Db, future::LocalBoxFuture, spawner::Spawner};
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct AdminConfig { pub(crate) struct AdminConfig {
@ -24,7 +28,7 @@ impl AdminConfig {
} }
fn verify(&self, token: XApiToken) -> Result<bool, Error> { fn verify(&self, token: XApiToken) -> Result<bool, Error> {
bcrypt::verify(token.0, &self.hashed_api_token).map_err(Error::bcrypt_verify) bcrypt::verify(&token.0, &self.hashed_api_token).map_err(Error::bcrypt_verify)
} }
} }
@ -79,42 +83,74 @@ impl Admin {
} }
} }
#[derive(Debug, thiserror::Error)]
#[error("Failed authentication")]
pub(crate) struct Error {
context: String,
#[source]
kind: ErrorKind,
}
impl Error { impl Error {
fn invalid() -> Self { fn invalid() -> Self {
Error::from(ErrorKind::Invalid) Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::Invalid,
}
} }
fn missing_config() -> Self { fn missing_config() -> Self {
Error::from(ErrorKind::MissingConfig) Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::MissingConfig,
}
} }
fn missing_db() -> Self { fn missing_db() -> Self {
Error::from(ErrorKind::MissingDb) Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::MissingDb,
}
} }
fn missing_spawner() -> Self { fn missing_spawner() -> Self {
Error::from(ErrorKind::MissingSpawner) Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::MissingSpawner,
}
} }
fn bcrypt_verify(e: BcryptError) -> Self { fn bcrypt_verify(e: BcryptError) -> Self {
Error::from(ErrorKind::BCryptVerify(e)) Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::BCryptVerify(e),
}
} }
fn bcrypt_hash(e: BcryptError) -> Self { fn bcrypt_hash(e: BcryptError) -> Self {
Error::from(ErrorKind::BCryptHash(e)) Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::BCryptHash(e),
}
} }
fn parse_header(e: ParseError) -> Self { fn parse_header(e: ParseError) -> Self {
Error::from(ErrorKind::ParseHeader(e)) Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::ParseHeader(e),
}
} }
fn canceled(_: Canceled) -> Self { fn canceled(_: Canceled) -> Self {
Error::from(ErrorKind::Canceled) Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::Canceled,
}
} }
} }
#[derive(Debug, thiserror::Error)] #[derive(Debug, thiserror::Error)]
pub(crate) enum ErrorKind { enum ErrorKind {
#[error("Invalid API Token")] #[error("Invalid API Token")]
Invalid, Invalid,
@ -140,6 +176,20 @@ pub(crate) enum ErrorKind {
ParseHeader(#[source] ParseError), ParseHeader(#[source] ParseError),
} }
impl ResponseError for Error {
fn status_code(&self) -> StatusCode {
match self.kind {
ErrorKind::Invalid | ErrorKind::ParseHeader(_) => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
fn error_response(&self) -> HttpResponse {
HttpResponse::build(self.status_code())
.json(serde_json::json!({ "msg": self.kind.to_string() }))
}
}
impl FromRequest for Admin { impl FromRequest for Admin {
type Error = Error; type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>; type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;
@ -150,8 +200,10 @@ impl FromRequest for Admin {
Box::pin(async move { Box::pin(async move {
let (db, c, s, t) = res?; let (db, c, s, t) = res?;
Self::verify(c, s, t).await?; Self::verify(c, s, t).await?;
metrics::histogram!("relay.admin.verify") metrics::histogram!(
.record(now.elapsed().as_micros() as f64 / 1_000_000_f64); "relay.admin.verify",
now.elapsed().as_micros() as f64 / 1_000_000_f64
);
Ok(Admin { db }) Ok(Admin { db })
}) })
} }

View File

@ -1,4 +1,3 @@
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
pub(crate) type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>; pub(crate) type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
pub(crate) type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

View File

@ -19,10 +19,8 @@ use crate::{
jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline}, jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline},
}; };
use background_jobs::{ use background_jobs::{
memory_storage::{Storage, TokioTimer}, memory_storage::{ActixTimer, Storage},
metrics::MetricsStorage, Job, QueueHandle, WorkerConfig,
tokio::{QueueHandle, WorkerConfig},
Job,
}; };
use std::time::Duration; use std::time::Duration;
@ -45,21 +43,18 @@ pub(crate) fn create_workers(
actors: ActorCache, actors: ActorCache,
media: MediaCache, media: MediaCache,
config: Config, config: Config,
) -> std::io::Result<JobServer> { ) -> JobServer {
let deliver_concurrency = config.deliver_concurrency(); let deliver_concurrency = config.deliver_concurrency();
let queue_handle = WorkerConfig::new( let queue_handle = WorkerConfig::new(Storage::new(ActixTimer), move |queue_handle| {
MetricsStorage::wrap(Storage::new(TokioTimer)), JobState::new(
move |queue_handle| { state.clone(),
JobState::new( actors.clone(),
state.clone(), JobServer::new(queue_handle),
actors.clone(), media.clone(),
JobServer::new(queue_handle), config.clone(),
media.clone(), )
config.clone(), })
)
},
)
.register::<Deliver>() .register::<Deliver>()
.register::<DeliverMany>() .register::<DeliverMany>()
.register::<QueryNodeinfo>() .register::<QueryNodeinfo>()
@ -75,12 +70,12 @@ pub(crate) fn create_workers(
.set_worker_count("maintenance", 2) .set_worker_count("maintenance", 2)
.set_worker_count("apub", 2) .set_worker_count("apub", 2)
.set_worker_count("deliver", deliver_concurrency) .set_worker_count("deliver", deliver_concurrency)
.start()?; .start();
queue_handle.every(Duration::from_secs(60 * 5), Listeners)?; queue_handle.every(Duration::from_secs(60 * 5), Listeners);
queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline)?; queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline);
Ok(JobServer::new(queue_handle)) JobServer::new(queue_handle)
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]

View File

@ -2,14 +2,14 @@ use crate::{
config::{Config, UrlKind}, config::{Config, UrlKind},
db::Actor, db::Actor,
error::Error, error::Error,
future::BoxFuture,
jobs::{ jobs::{
apub::{get_inboxes, prepare_activity}, apub::{get_inboxes, prepare_activity},
DeliverMany, JobState, DeliverMany, JobState,
}, },
}; };
use activitystreams::{activity::Announce as AsAnnounce, iri_string::types::IriString}; use activitystreams::{activity::Announce as AsAnnounce, iri_string::types::IriString};
use background_jobs::Job; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Announce { pub(crate) struct Announce {
@ -62,15 +62,14 @@ fn generate_announce(
) )
} }
impl Job for Announce { impl ActixJob for Announce {
type State = JobState; type State = JobState;
type Error = Error; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
const NAME: &'static str = "relay::jobs::apub::Announce"; const NAME: &'static str = "relay::jobs::apub::Announce";
const QUEUE: &'static str = "apub"; const QUEUE: &'static str = "apub";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View File

@ -3,7 +3,6 @@ use crate::{
config::{Config, UrlKind}, config::{Config, UrlKind},
db::Actor, db::Actor,
error::{Error, ErrorKind}, error::{Error, ErrorKind},
future::BoxFuture,
jobs::{apub::prepare_activity, Deliver, JobState, QueryInstance, QueryNodeinfo}, jobs::{apub::prepare_activity, Deliver, JobState, QueryInstance, QueryNodeinfo},
}; };
use activitystreams::{ use activitystreams::{
@ -11,7 +10,8 @@ use activitystreams::{
iri_string::types::IriString, iri_string::types::IriString,
prelude::*, prelude::*,
}; };
use background_jobs::Job; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Follow { pub(crate) struct Follow {
@ -111,15 +111,14 @@ fn generate_accept_follow(
) )
} }
impl Job for Follow { impl ActixJob for Follow {
type State = JobState; type State = JobState;
type Error = Error; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
const NAME: &'static str = "relay::jobs::apub::Follow"; const NAME: &'static str = "relay::jobs::apub::Follow";
const QUEUE: &'static str = "apub"; const QUEUE: &'static str = "apub";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View File

@ -2,11 +2,11 @@ use crate::{
apub::AcceptedActivities, apub::AcceptedActivities,
db::Actor, db::Actor,
error::{Error, ErrorKind}, error::{Error, ErrorKind},
future::BoxFuture,
jobs::{apub::get_inboxes, DeliverMany, JobState}, jobs::{apub::get_inboxes, DeliverMany, JobState},
}; };
use activitystreams::prelude::*; use activitystreams::prelude::*;
use background_jobs::Job; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Forward { pub(crate) struct Forward {
@ -47,15 +47,14 @@ impl Forward {
} }
} }
impl Job for Forward { impl ActixJob for Forward {
type State = JobState; type State = JobState;
type Error = Error; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
const NAME: &'static str = "relay::jobs::apub::Forward"; const NAME: &'static str = "relay::jobs::apub::Forward";
const QUEUE: &'static str = "apub"; const QUEUE: &'static str = "apub";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View File

@ -2,10 +2,10 @@ use crate::{
config::UrlKind, config::UrlKind,
db::Actor, db::Actor,
error::Error, error::Error,
future::BoxFuture,
jobs::{apub::generate_undo_follow, Deliver, JobState}, jobs::{apub::generate_undo_follow, Deliver, JobState},
}; };
use background_jobs::Job; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Reject(pub(crate) Actor); pub(crate) struct Reject(pub(crate) Actor);
@ -33,15 +33,14 @@ impl Reject {
} }
} }
impl Job for Reject { impl ActixJob for Reject {
type State = JobState; type State = JobState;
type Error = Error; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
const NAME: &'static str = "relay::jobs::apub::Reject"; const NAME: &'static str = "relay::jobs::apub::Reject";
const QUEUE: &'static str = "apub"; const QUEUE: &'static str = "apub";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View File

@ -3,11 +3,11 @@ use crate::{
config::UrlKind, config::UrlKind,
db::Actor, db::Actor,
error::Error, error::Error,
future::BoxFuture,
jobs::{apub::generate_undo_follow, Deliver, JobState}, jobs::{apub::generate_undo_follow, Deliver, JobState},
}; };
use activitystreams::prelude::BaseExt; use activitystreams::prelude::BaseExt;
use background_jobs::Job; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Undo { pub(crate) struct Undo {
@ -48,15 +48,14 @@ impl Undo {
} }
} }
impl Job for Undo { impl ActixJob for Undo {
type State = JobState; type State = JobState;
type Error = Error; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
const NAME: &'static str = "relay::jobs::apub::Undo"; const NAME: &'static str = "relay::jobs::apub::Undo";
const QUEUE: &'static str = "apub"; const QUEUE: &'static str = "apub";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View File

@ -1,12 +1,12 @@
use crate::{ use crate::{
apub::AcceptedActors, apub::AcceptedActors,
error::{Error, ErrorKind}, error::{Error, ErrorKind},
future::BoxFuture,
jobs::JobState, jobs::JobState,
requests::BreakerStrategy, requests::BreakerStrategy,
}; };
use activitystreams::{iri_string::types::IriString, object::Image, prelude::*}; use activitystreams::{iri_string::types::IriString, object::Image, prelude::*};
use background_jobs::Job; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct QueryContact { pub(crate) struct QueryContact {
@ -85,16 +85,15 @@ fn to_contact(contact: AcceptedActors) -> Option<(String, String, IriString, Iri
Some((username, display_name, url, avatar)) Some((username, display_name, url, avatar))
} }
impl Job for QueryContact { impl ActixJob for QueryContact {
type State = JobState; type State = JobState;
type Error = Error; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
const NAME: &'static str = "relay::jobs::QueryContact"; const NAME: &'static str = "relay::jobs::QueryContact";
const QUEUE: &'static str = "maintenance"; const QUEUE: &'static str = "maintenance";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View File

@ -1,11 +1,11 @@
use crate::{ use crate::{
error::Error, error::Error,
future::BoxFuture,
jobs::{debug_object, JobState}, jobs::{debug_object, JobState},
requests::BreakerStrategy, requests::BreakerStrategy,
}; };
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use background_jobs::{Backoff, Job}; use background_jobs::{ActixJob, Backoff};
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Deliver { pub(crate) struct Deliver {
@ -35,7 +35,7 @@ impl Deliver {
} }
#[tracing::instrument(name = "Deliver", skip(state))] #[tracing::instrument(name = "Deliver", skip(state))]
async fn perform(self, state: JobState) -> Result<(), Error> { async fn permform(self, state: JobState) -> Result<(), Error> {
if let Err(e) = state if let Err(e) = state
.state .state
.requests .requests
@ -56,16 +56,15 @@ impl Deliver {
} }
} }
impl Job for Deliver { impl ActixJob for Deliver {
type State = JobState; type State = JobState;
type Error = Error; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
const NAME: &'static str = "relay::jobs::Deliver"; const NAME: &'static str = "relay::jobs::Deliver";
const QUEUE: &'static str = "deliver"; const QUEUE: &'static str = "deliver";
const BACKOFF: Backoff = Backoff::Exponential(8); const BACKOFF: Backoff = Backoff::Exponential(8);
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.permform(state).await.map_err(Into::into) })
} }
} }

View File

@ -1,10 +1,10 @@
use crate::{ use crate::{
error::Error, error::Error,
future::BoxFuture, future::LocalBoxFuture,
jobs::{debug_object, Deliver, JobState}, jobs::{debug_object, Deliver, JobState},
}; };
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use background_jobs::Job; use background_jobs::ActixJob;
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct DeliverMany { pub(crate) struct DeliverMany {
@ -45,15 +45,14 @@ impl DeliverMany {
} }
} }
impl Job for DeliverMany { impl ActixJob for DeliverMany {
type State = JobState; type State = JobState;
type Error = Error; type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
const NAME: &'static str = "relay::jobs::DeliverMany"; const NAME: &'static str = "relay::jobs::DeliverMany";
const QUEUE: &'static str = "deliver"; const QUEUE: &'static str = "deliver";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View File

@ -1,12 +1,12 @@
use crate::{ use crate::{
config::UrlKind, config::UrlKind,
error::{Error, ErrorKind}, error::{Error, ErrorKind},
future::BoxFuture,
jobs::{Boolish, JobState}, jobs::{Boolish, JobState},
requests::BreakerStrategy, requests::BreakerStrategy,
}; };
use activitystreams::{iri, iri_string::types::IriString}; use activitystreams::{iri, iri_string::types::IriString};
use background_jobs::Job; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct QueryInstance { pub(crate) struct QueryInstance {
@ -165,16 +165,15 @@ impl QueryInstance {
} }
} }
impl Job for QueryInstance { impl ActixJob for QueryInstance {
type State = JobState; type State = JobState;
type Error = Error; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
const NAME: &'static str = "relay::jobs::QueryInstance"; const NAME: &'static str = "relay::jobs::QueryInstance";
const QUEUE: &'static str = "maintenance"; const QUEUE: &'static str = "maintenance";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View File

@ -1,18 +1,18 @@
use crate::{ use crate::{
error::{Error, ErrorKind}, error::{Error, ErrorKind},
future::BoxFuture,
jobs::{Boolish, JobState, QueryContact}, jobs::{Boolish, JobState, QueryContact},
requests::BreakerStrategy, requests::BreakerStrategy,
}; };
use activitystreams::{iri, iri_string::types::IriString, primitives::OneOrMany}; use activitystreams::{iri, iri_string::types::IriString, primitives::OneOrMany};
use background_jobs::Job; use background_jobs::ActixJob;
use std::{fmt::Debug, future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct QueryNodeinfo { pub(crate) struct QueryNodeinfo {
actor_id: IriString, actor_id: IriString,
} }
impl std::fmt::Debug for QueryNodeinfo { impl Debug for QueryNodeinfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueryNodeinfo") f.debug_struct("QueryNodeinfo")
.field("actor_id", &self.actor_id.to_string()) .field("actor_id", &self.actor_id.to_string())
@ -92,7 +92,7 @@ impl QueryNodeinfo {
.metadata .metadata
.and_then(|meta| meta.into_iter().next().and_then(|meta| meta.staff_accounts)) .and_then(|meta| meta.into_iter().next().and_then(|meta| meta.staff_accounts))
{ {
if let Some(contact_id) = accounts.first() { if let Some(contact_id) = accounts.get(0) {
state state
.job_server .job_server
.queue(QueryContact::new(self.actor_id, contact_id.clone())) .queue(QueryContact::new(self.actor_id, contact_id.clone()))
@ -104,16 +104,15 @@ impl QueryNodeinfo {
} }
} }
impl Job for QueryNodeinfo { impl ActixJob for QueryNodeinfo {
type State = JobState; type State = JobState;
type Error = Error; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
const NAME: &'static str = "relay::jobs::QueryNodeinfo"; const NAME: &'static str = "relay::jobs::QueryNodeinfo";
const QUEUE: &'static str = "maintenance"; const QUEUE: &'static str = "maintenance";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View File

@ -1,9 +1,9 @@
use crate::{ use crate::{
error::Error, error::Error,
future::BoxFuture,
jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState}, jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState},
}; };
use background_jobs::Job; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Listeners; pub(crate) struct Listeners;
@ -23,15 +23,14 @@ impl Listeners {
} }
} }
impl Job for Listeners { impl ActixJob for Listeners {
type State = JobState; type State = JobState;
type Error = Error; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
const NAME: &'static str = "relay::jobs::Listeners"; const NAME: &'static str = "relay::jobs::Listeners";
const QUEUE: &'static str = "maintenance"; const QUEUE: &'static str = "maintenance";
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View File

@ -1,5 +1,6 @@
use crate::{error::Error, future::BoxFuture, jobs::JobState}; use crate::{error::Error, jobs::JobState};
use background_jobs::{Backoff, Job}; use background_jobs::{ActixJob, Backoff};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct RecordLastOnline; pub(crate) struct RecordLastOnline;
@ -13,16 +14,15 @@ impl RecordLastOnline {
} }
} }
impl Job for RecordLastOnline { impl ActixJob for RecordLastOnline {
type State = JobState; type State = JobState;
type Error = Error; type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
const NAME: &'static str = "relay::jobs::RecordLastOnline"; const NAME: &'static str = "relay::jobs::RecordLastOnline";
const QUEUE: &'static str = "maintenance"; const QUEUE: &'static str = "maintenance";
const BACKOFF: Backoff = Backoff::Linear(1); const BACKOFF: Backoff = Backoff::Linear(1);
fn run(self, state: Self::State) -> Self::Future { fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state)) Box::pin(async move { self.perform(state).await.map_err(Into::into) })
} }
} }

View File

@ -4,6 +4,7 @@
use std::time::Duration; use std::time::Duration;
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_rt::task::JoinHandle;
use actix_web::{middleware::Compress, web, App, HttpServer}; use actix_web::{middleware::Compress, web, App, HttpServer};
use collector::MemoryCollector; use collector::MemoryCollector;
#[cfg(feature = "console")] #[cfg(feature = "console")]
@ -12,16 +13,14 @@ use error::Error;
use http_signature_normalization_actix::middleware::VerifySignature; use http_signature_normalization_actix::middleware::VerifySignature;
use metrics_exporter_prometheus::PrometheusBuilder; use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::layers::FanoutBuilder; use metrics_util::layers::FanoutBuilder;
use opentelemetry::KeyValue; use opentelemetry::{sdk::Resource, KeyValue};
use opentelemetry_otlp::WithExportConfig; use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::Resource;
use reqwest_middleware::ClientWithMiddleware; use reqwest_middleware::ClientWithMiddleware;
use rustls::ServerConfig; use rustls::ServerConfig;
use tokio::task::JoinHandle;
use tracing_actix_web::TracingLogger; use tracing_actix_web::TracingLogger;
use tracing_error::ErrorLayer; use tracing_error::ErrorLayer;
use tracing_log::LogTracer; use tracing_log::LogTracer;
use tracing_subscriber::{filter::Targets, layer::SubscriberExt, Layer}; use tracing_subscriber::{filter::Targets, fmt::format::FmtSpan, layer::SubscriberExt, Layer};
mod admin; mod admin;
mod apub; mod apub;
@ -56,15 +55,16 @@ use self::{
fn init_subscriber( fn init_subscriber(
software_name: &'static str, software_name: &'static str,
opentelemetry_url: Option<&IriString>, opentelemetry_url: Option<&IriString>,
) -> color_eyre::Result<()> { ) -> Result<(), anyhow::Error> {
LogTracer::init()?; LogTracer::init()?;
color_eyre::install()?;
let targets: Targets = std::env::var("RUST_LOG") let targets: Targets = std::env::var("RUST_LOG")
.unwrap_or_else(|_| "info".into()) .unwrap_or_else(|_| "warn,actix_web=debug,actix_server=debug,tracing_actix_web=info".into())
.parse()?; .parse()?;
let format_layer = tracing_subscriber::fmt::layer().with_filter(targets.clone()); let format_layer = tracing_subscriber::fmt::layer()
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_filter(targets.clone());
#[cfg(feature = "console")] #[cfg(feature = "console")]
let console_layer = ConsoleLayer::builder() let console_layer = ConsoleLayer::builder()
@ -81,19 +81,18 @@ fn init_subscriber(
let subscriber = subscriber.with(console_layer); let subscriber = subscriber.with(console_layer);
if let Some(url) = opentelemetry_url { if let Some(url) = opentelemetry_url {
let tracer = opentelemetry_otlp::new_pipeline() let tracer =
.tracing() opentelemetry_otlp::new_pipeline()
.with_trace_config( .tracing()
opentelemetry_sdk::trace::config().with_resource(Resource::new(vec![ .with_trace_config(opentelemetry::sdk::trace::config().with_resource(
KeyValue::new("service.name", software_name), Resource::new(vec![KeyValue::new("service.name", software_name)]),
])), ))
) .with_exporter(
.with_exporter( opentelemetry_otlp::new_exporter()
opentelemetry_otlp::new_exporter() .tonic()
.tonic() .with_endpoint(url.as_str()),
.with_endpoint(url.as_str()), )
) .install_batch(opentelemetry::runtime::Tokio)?;
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
let otel_layer = tracing_opentelemetry::layer() let otel_layer = tracing_opentelemetry::layer()
.with_tracer(tracer) .with_tracer(tracer)
@ -140,8 +139,8 @@ fn build_client(
Ok(client_with_middleware) Ok(client_with_middleware)
} }
#[tokio::main] #[actix_rt::main]
async fn main() -> color_eyre::Result<()> { async fn main() -> Result<(), anyhow::Error> {
dotenv::dotenv().ok(); dotenv::dotenv().ok();
let config = Config::build()?; let config = Config::build()?;
@ -151,8 +150,7 @@ async fn main() -> color_eyre::Result<()> {
let args = Args::new(); let args = Args::new();
if args.any() { if args.any() {
client_main(config, args).await??; return client_main(config, args).await?;
return Ok(());
} }
let collector = MemoryCollector::new(); let collector = MemoryCollector::new();
@ -162,35 +160,35 @@ async fn main() -> color_eyre::Result<()> {
.with_http_listener(bind_addr) .with_http_listener(bind_addr)
.build()?; .build()?;
tokio::spawn(exporter); actix_rt::spawn(exporter);
let recorder = FanoutBuilder::default() let recorder = FanoutBuilder::default()
.add_recorder(recorder) .add_recorder(recorder)
.add_recorder(collector.clone()) .add_recorder(collector.clone())
.build(); .build();
metrics::set_global_recorder(recorder).map_err(|e| color_eyre::eyre::eyre!("{e}"))?; metrics::set_boxed_recorder(Box::new(recorder))?;
} else { } else {
collector.install()?; collector.install()?;
} }
tracing::info!("Opening DB"); tracing::warn!("Opening DB");
let db = Db::build(&config)?; let db = Db::build(&config)?;
tracing::info!("Building caches"); tracing::warn!("Building caches");
let actors = ActorCache::new(db.clone()); let actors = ActorCache::new(db.clone());
let media = MediaCache::new(db.clone()); let media = MediaCache::new(db.clone());
server_main(db, actors, media, collector, config).await?; server_main(db, actors, media, collector, config).await??;
tracing::info!("Application exit"); tracing::warn!("Application exit");
Ok(()) Ok(())
} }
fn client_main(config: Config, args: Args) -> JoinHandle<color_eyre::Result<()>> { fn client_main(config: Config, args: Args) -> JoinHandle<Result<(), anyhow::Error>> {
tokio::spawn(do_client_main(config, args)) actix_rt::spawn(do_client_main(config, args))
} }
async fn do_client_main(config: Config, args: Args) -> color_eyre::Result<()> { async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
let client = build_client( let client = build_client(
&config.user_agent(), &config.user_agent(),
config.client_timeout(), config.client_timeout(),
@ -273,22 +271,32 @@ async fn do_client_main(config: Config, args: Args) -> color_eyre::Result<()> {
Ok(()) Ok(())
} }
const VERIFY_RATIO: usize = 7; fn server_main(
async fn server_main(
db: Db, db: Db,
actors: ActorCache, actors: ActorCache,
media: MediaCache, media: MediaCache,
collector: MemoryCollector, collector: MemoryCollector,
config: Config, config: Config,
) -> color_eyre::Result<()> { ) -> JoinHandle<Result<(), anyhow::Error>> {
actix_rt::spawn(do_server_main(db, actors, media, collector, config))
}
const VERIFY_RATIO: usize = 7;
async fn do_server_main(
db: Db,
actors: ActorCache,
media: MediaCache,
collector: MemoryCollector,
config: Config,
) -> Result<(), anyhow::Error> {
let client = build_client( let client = build_client(
&config.user_agent(), &config.user_agent(),
config.client_timeout(), config.client_timeout(),
config.proxy_config(), config.proxy_config(),
)?; )?;
tracing::info!("Creating state"); tracing::warn!("Creating state");
let (signature_threads, verify_threads) = match config.signature_threads() { let (signature_threads, verify_threads) = match config.signature_threads() {
0 | 1 => (1, 1), 0 | 1 => (1, 1),
@ -301,30 +309,23 @@ async fn server_main(
} }
}; };
let verify_spawner = Spawner::build("verify-cpu", verify_threads.try_into()?)?; let verify_spawner = Spawner::build("verify-cpu", verify_threads)?;
let sign_spawner = Spawner::build("sign-cpu", signature_threads.try_into()?)?; let sign_spawner = Spawner::build("sign-cpu", signature_threads)?;
let key_id = config.generate_url(UrlKind::MainKey).to_string(); let key_id = config.generate_url(UrlKind::MainKey).to_string();
let state = State::build(db.clone(), key_id, sign_spawner.clone(), client).await?; let state = State::build(db.clone(), key_id, sign_spawner, client).await?;
if let Some((token, admin_handle)) = config.telegram_info() { if let Some((token, admin_handle)) = config.telegram_info() {
tracing::info!("Creating telegram handler"); tracing::warn!("Creating telegram handler");
telegram::start(admin_handle.to_owned(), db.clone(), token); telegram::start(admin_handle.to_owned(), db.clone(), token);
} }
let cert_resolver = config let keys = config.open_keys()?;
.open_keys()
.await?
.map(rustls_channel_resolver::channel::<32>);
let bind_address = config.bind_address(); let bind_address = config.bind_address();
let sign_spawner2 = sign_spawner.clone();
let verify_spawner2 = verify_spawner.clone();
let config2 = config.clone();
let server = HttpServer::new(move || { let server = HttpServer::new(move || {
let job_server = let job_server =
create_workers(state.clone(), actors.clone(), media.clone(), config.clone()) create_workers(state.clone(), actors.clone(), media.clone(), config.clone());
.expect("Failed to create job server");
let app = App::new() let app = App::new()
.app_data(web::Data::new(db.clone())) .app_data(web::Data::new(db.clone()))
@ -390,42 +391,24 @@ async fn server_main(
) )
}); });
if let Some((cert_tx, cert_rx)) = cert_resolver { if let Some((certs, key)) = keys {
let handle = tokio::spawn(async move { tracing::warn!("Binding to {}:{} with TLS", bind_address.0, bind_address.1);
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.tick().await;
loop {
interval.tick().await;
match config2.open_keys().await {
Ok(Some(key)) => cert_tx.update(key),
Ok(None) => tracing::warn!("Missing TLS keys"),
Err(e) => tracing::error!("Failed to read TLS keys {e}"),
}
}
});
tracing::info!("Binding to {}:{} with TLS", bind_address.0, bind_address.1);
let server_config = ServerConfig::builder() let server_config = ServerConfig::builder()
.with_safe_default_cipher_suites()
.with_safe_default_kx_groups()
.with_safe_default_protocol_versions()?
.with_no_client_auth() .with_no_client_auth()
.with_cert_resolver(cert_rx); .with_single_cert(certs, key)?;
server server
.bind_rustls_0_22(bind_address, server_config)? .bind_rustls_021(bind_address, server_config)?
.run() .run()
.await?; .await?;
handle.abort();
let _ = handle.await;
} else { } else {
tracing::info!("Binding to {}:{}", bind_address.0, bind_address.1); tracing::warn!("Binding to {}:{}", bind_address.0, bind_address.1);
server.bind(bind_address)?.run().await?; server.bind(bind_address)?.run().await?;
} }
sign_spawner2.close().await; tracing::warn!("Server closed");
verify_spawner2.close().await;
tracing::info!("Server closed");
Ok(()) Ok(())
} }

View File

@ -40,7 +40,7 @@ impl Drop for LogOnDrop {
fn drop(&mut self) { fn drop(&mut self) {
if self.arm { if self.arm {
let duration = self.begin.elapsed(); let duration = self.begin.elapsed();
metrics::histogram!("relay.request.complete", "path" => self.path.clone(), "method" => self.method.clone()).record(duration); metrics::histogram!("relay.request.complete", duration, "path" => self.path.clone(), "method" => self.method.clone());
} }
} }
} }

View File

@ -432,7 +432,7 @@ struct Signer {
impl Signer { impl Signer {
fn sign(&self, signing_string: &str) -> Result<String, Error> { fn sign(&self, signing_string: &str) -> Result<String, Error> {
let mut signature = vec![0; self.private_key.public().modulus_len()]; let mut signature = vec![0; self.private_key.public_modulus_len()];
self.private_key self.private_key
.sign( .sign(

View File

@ -14,11 +14,10 @@ const MINIFY_CONFIG: minify_html::Cfg = minify_html::Cfg {
keep_html_and_head_opening_tags: false, keep_html_and_head_opening_tags: false,
keep_spaces_between_attributes: true, keep_spaces_between_attributes: true,
keep_comments: false, keep_comments: false,
keep_input_type_text_attr: true,
keep_ssi_comments: false,
preserve_brace_template_syntax: false,
preserve_chevron_percent_template_syntax: false,
minify_css: true, minify_css: true,
minify_css_level_1: true,
minify_css_level_2: false,
minify_css_level_3: false,
minify_js: true, minify_js: true,
remove_bangs: true, remove_bangs: true,
remove_processing_instructions: true, remove_processing_instructions: true,

View File

@ -1,30 +1,107 @@
use async_cpupool::CpuPool;
use http_signature_normalization_actix::{Canceled, Spawn}; use http_signature_normalization_actix::{Canceled, Spawn};
use std::time::Duration; use std::{
panic::AssertUnwindSafe,
sync::Arc,
thread::JoinHandle,
time::{Duration, Instant},
};
#[derive(Clone)] fn spawner_thread(
receiver: flume::Receiver<Box<dyn FnOnce() + Send>>,
name: &'static str,
id: usize,
) {
let guard = MetricsGuard::guard(name, id);
while let Ok(f) = receiver.recv() {
let start = Instant::now();
metrics::increment_counter!(format!("relay.{name}.operation.start"), "id" => id.to_string());
let res = std::panic::catch_unwind(AssertUnwindSafe(f));
metrics::increment_counter!(format!("relay.{name}.operation.end"), "complete" => res.is_ok().to_string(), "id" => id.to_string());
metrics::histogram!(format!("relay.{name}.operation.duration"), start.elapsed().as_secs_f64(), "complete" => res.is_ok().to_string(), "id" => id.to_string());
if let Err(e) = res {
tracing::warn!("{name} fn panicked: {e:?}");
}
}
guard.disarm();
}
#[derive(Clone, Debug)]
pub(crate) struct Spawner { pub(crate) struct Spawner {
pool: CpuPool, name: &'static str,
sender: Option<flume::Sender<Box<dyn FnOnce() + Send>>>,
threads: Option<Arc<Vec<JoinHandle<()>>>>,
}
struct MetricsGuard {
name: &'static str,
id: usize,
start: Instant,
armed: bool,
}
impl MetricsGuard {
fn guard(name: &'static str, id: usize) -> Self {
metrics::increment_counter!(format!("relay.{name}.launched"), "id" => id.to_string());
Self {
name,
id,
start: Instant::now(),
armed: true,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for MetricsGuard {
fn drop(&mut self) {
metrics::increment_counter!(format!("relay.{}.closed", self.name), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
metrics::histogram!(format!("relay.{}.duration", self.name), self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
tracing::warn!("Stopping {} - {}", self.name, self.id);
}
} }
impl Spawner { impl Spawner {
pub(crate) fn build(name: &'static str, threads: u16) -> color_eyre::Result<Self> { pub(crate) fn build(name: &'static str, threads: usize) -> std::io::Result<Self> {
let pool = CpuPool::configure() let (sender, receiver) = flume::bounded(8);
.name(name)
.max_threads(threads)
.build()?;
Ok(Spawner { pool }) tracing::warn!("Launching {threads} {name}s");
}
pub(crate) async fn close(self) { let threads = (0..threads)
self.pool.close().await; .map(|i| {
let receiver = receiver.clone();
std::thread::Builder::new()
.name(format!("{name}-{i}"))
.spawn(move || {
spawner_thread(receiver, name, i);
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Spawner {
name,
sender: Some(sender),
threads: Some(Arc::new(threads)),
})
} }
} }
impl std::fmt::Debug for Spawner { impl Drop for Spawner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn drop(&mut self) {
f.debug_struct("Spawner").finish() self.sender.take();
if let Some(threads) = self.threads.take().and_then(Arc::into_inner) {
tracing::warn!("Joining {}s", self.name);
for thread in threads {
let _ = thread.join();
}
}
} }
} }
@ -34,9 +111,9 @@ where
{ {
let id = uuid::Uuid::new_v4(); let id = uuid::Uuid::new_v4();
metrics::counter!("relay.spawner.wait-timer.start").increment(1); metrics::increment_counter!("relay.spawner.wait-timer.start");
let mut interval = tokio::time::interval(Duration::from_secs(5)); let mut interval = actix_rt::time::interval(Duration::from_secs(5));
// pass the first tick (instant) // pass the first tick (instant)
interval.tick().await; interval.tick().await;
@ -47,12 +124,12 @@ where
loop { loop {
tokio::select! { tokio::select! {
out = &mut fut => { out = &mut fut => {
metrics::counter!("relay.spawner.wait-timer.end").increment(1); metrics::increment_counter!("relay.spawner.wait-timer.end");
return out; return out;
} }
_ = interval.tick() => { _ = interval.tick() => {
counter += 1; counter += 1;
metrics::counter!("relay.spawner.wait-timer.pending").increment(1); metrics::increment_counter!("relay.spawner.wait-timer.pending");
tracing::warn!("Blocking operation {id} is taking a long time, {} seconds", counter * 5); tracing::warn!("Blocking operation {id} is taking a long time, {} seconds", counter * 5);
} }
} }
@ -67,9 +144,22 @@ impl Spawn for Spawner {
Func: FnOnce() -> Out + Send + 'static, Func: FnOnce() -> Out + Send + 'static,
Out: Send + 'static, Out: Send + 'static,
{ {
let pool = self.pool.clone(); let sender = self.sender.as_ref().expect("Sender exists").clone();
Box::pin(async move { timer(pool.spawn(func)).await.map_err(|_| Canceled) }) Box::pin(async move {
let (tx, rx) = flume::bounded(1);
let _ = sender
.send_async(Box::new(move || {
if tx.try_send((func)()).is_err() {
tracing::warn!("Requestor hung up");
metrics::increment_counter!("relay.spawner.disconnected");
}
}))
.await;
timer(rx.recv_async()).await.map_err(|_| Canceled)
})
} }
} }
@ -81,10 +171,21 @@ impl http_signature_normalization_reqwest::Spawn for Spawner {
Func: FnOnce() -> Out + Send + 'static, Func: FnOnce() -> Out + Send + 'static,
Out: Send + 'static, Out: Send + 'static,
{ {
let pool = self.pool.clone(); let sender = self.sender.as_ref().expect("Sender exists").clone();
Box::pin(async move { Box::pin(async move {
timer(pool.spawn(func)) let (tx, rx) = flume::bounded(1);
let _ = sender
.send_async(Box::new(move || {
if tx.try_send((func)()).is_err() {
tracing::warn!("Requestor hung up");
metrics::increment_counter!("relay.spawner.disconnected");
}
}))
.await;
timer(rx.recv_async())
.await .await
.map_err(|_| http_signature_normalization_reqwest::Canceled) .map_err(|_| http_signature_normalization_reqwest::Canceled)
}) })

View File

@ -46,7 +46,7 @@ pub(crate) fn start(admin_handle: String, db: Db, token: &str) {
let bot = Bot::new(token); let bot = Bot::new(token);
let admin_handle = Arc::new(admin_handle); let admin_handle = Arc::new(admin_handle);
tokio::spawn(async move { actix_rt::spawn(async move {
let command_handler = teloxide::filter_command::<Command, _>().endpoint( let command_handler = teloxide::filter_command::<Command, _>().endpoint(
move |bot: Bot, msg: Message, cmd: Command| { move |bot: Bot, msg: Message, cmd: Command| {
let admin_handle = admin_handle.clone(); let admin_handle = admin_handle.clone();