Compare commits

..

No commits in common. "48080344ea18855ef0f71f8a46576bdaa914c950" and "9433f36cc55ed02555eaba6609b37cd98ecb4133" have entirely different histories.

30 changed files with 1139 additions and 1653 deletions

1984
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,11 +1,11 @@
[package]
name = "ap-relay"
description = "A simple activitypub relay"
version = "0.3.108"
version = "0.3.104"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"
repository = "https://git.asonix.dog/asonix/relay"
repository = "https://git.asonix.dog/asonix/ap-relay"
keywords = ["activitypub", "relay"]
edition = "2021"
build = "src/build.rs"
@ -15,55 +15,51 @@ name = "relay"
path = "src/main.rs"
[features]
console = ["dep:console-subscriber"]
console = ["console-subscriber"]
default = []
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
actix-web = { version = "4.4.0", default-features = false, features = ["compress-brotli", "compress-gzip", "rustls-0_22"] }
anyhow = "1.0"
actix-rt = "2.7.0"
actix-web = { version = "4.4.0", default-features = false, features = ["compress-brotli", "compress-gzip", "rustls-0_21"] }
actix-webfinger = { version = "0.5.0", default-features = false }
activitystreams = "0.7.0-alpha.25"
activitystreams-ext = "0.1.0-alpha.3"
ammonia = "3.1.0"
async-cpupool = "0.2.0"
bcrypt = "0.15"
base64 = "0.21"
clap = { version = "4.0.0", features = ["derive"] }
color-eyre = "0.6.2"
config = { version = "0.14.0", default-features = false, features = ["toml", "json", "yaml"] }
console-subscriber = { version = "0.2", optional = true }
config = "0.13.0"
console-subscriber = { version = "0.1", optional = true }
dashmap = "5.1.0"
dotenv = "0.15.0"
flume = "0.11.0"
lru = "0.12.0"
metrics = "0.22.0"
metrics-exporter-prometheus = { version = "0.13.0", default-features = false, features = [
lru = "0.11.0"
metrics = "0.21.0"
metrics-exporter-prometheus = { version = "0.12.0", default-features = false, features = [
"http-listener",
] }
metrics-util = "0.16.0"
metrics-util = "0.15.0"
mime = "0.3.16"
minify-html = "0.15.0"
opentelemetry = "0.21"
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] }
opentelemetry-otlp = "0.14"
minify-html = "0.11.0"
opentelemetry = { version = "0.20", features = ["rt-tokio"] }
opentelemetry-otlp = "0.13"
pin-project-lite = "0.2.9"
# pinned to metrics-util
quanta = "0.12.0"
quanta = "0.11.0"
rand = "0.8"
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "stream"]}
reqwest-middleware = "0.2"
reqwest-tracing = "0.4.5"
ring = "0.17.5"
ring = "0.16.20"
rsa = { version = "0.9" }
rsa-magic-public-key = "0.8.0"
rustls = "0.22.0"
rustls-channel-resolver = "0.2.0"
rustls-pemfile = "2"
rustls = "0.21.0"
rustls-pemfile = "1.0.1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
sled = "0.34.7"
streem = "0.2.0"
teloxide = { version = "0.12.0", default-features = false, features = [
"ctrlc_handler",
"macros",
@ -73,39 +69,41 @@ thiserror = "1.0"
time = { version = "0.3.17", features = ["serde"] }
tracing = "0.1"
tracing-error = "0.2"
tracing-log = "0.2"
tracing-opentelemetry = "0.22"
tracing-futures = "0.2"
tracing-log = "0.1"
tracing-opentelemetry = "0.21"
tracing-subscriber = { version = "0.3", features = [
"ansi",
"env-filter",
"fmt",
] }
tokio = { version = "1", features = ["full", "tracing"] }
tokio = { version = "1", features = ["macros", "sync"] }
uuid = { version = "1", features = ["v4", "serde"] }
streem = "0.1.0"
[dependencies.background-jobs]
version = "0.18.0"
version = "0.15.0"
default-features = false
features = ["error-logging", "metrics", "tokio"]
features = ["background-jobs-actix", "error-logging"]
[dependencies.http-signature-normalization-actix]
version = "0.11.0"
version = "0.10.1"
default-features = false
features = ["server", "ring"]
[dependencies.http-signature-normalization-reqwest]
version = "0.11.0"
version = "0.10.0"
default-features = false
features = ["middleware", "ring"]
[dependencies.tracing-actix-web]
version = "0.7.9"
version = "0.7.6"
[build-dependencies]
color-eyre = "0.6.2"
anyhow = "1.0"
dotenv = "0.15.0"
ructe = { version = "0.17.0", features = ["sass", "mime03"] }
toml = "0.8.0"
toml = "0.7.0"
[profile.dev.package.rsa]
opt-level = 3

View File

@ -5,11 +5,11 @@
"systems": "systems"
},
"locked": {
"lastModified": 1701680307,
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
"lastModified": 1692799911,
"narHash": "sha256-3eihraek4qL744EvQXsK1Ha6C3CR7nnT8X2qWap4RNk=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
"rev": "f9e7cf818399d17d347f847525c5a5a8032e4e44",
"type": "github"
},
"original": {
@ -20,11 +20,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1705133751,
"narHash": "sha256-rCIsyE80jgiOU78gCWN3A0wE0tR2GI5nH6MlS+HaaSQ=",
"lastModified": 1693003285,
"narHash": "sha256-5nm4yrEHKupjn62MibENtfqlP6pWcRTuSKrMiH9bLkc=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "9b19f5e77dd906cb52dade0b7bd280339d2a1f3d",
"rev": "5690c4271f2998c304a45c91a0aeb8fb69feaea7",
"type": "github"
},
"original": {

View File

@ -1,14 +1,17 @@
{ lib
, nixosTests
, protobuf
, rustPlatform
}:
rustPlatform.buildRustPackage {
pname = "relay";
version = "0.3.108";
version = "0.3.104";
src = ./.;
cargoLock.lockFile = ./Cargo.lock;
PROTOC = "${protobuf}/bin/protoc";
PROTOC_INCLUDE = "${protobuf}/include";
RUSTFLAGS = "--cfg tokio_unstable";
nativeBuildInputs = [ ];

View File

@ -21,7 +21,7 @@ fn git_info() {
}
}
fn version_info() -> color_eyre::Result<()> {
fn version_info() -> Result<(), anyhow::Error> {
let cargo_toml = Path::new(&std::env::var("CARGO_MANIFEST_DIR")?).join("Cargo.toml");
let mut file = File::open(cargo_toml)?;
@ -42,7 +42,7 @@ fn version_info() -> color_eyre::Result<()> {
Ok(())
}
fn main() -> color_eyre::Result<()> {
fn main() -> Result<(), anyhow::Error> {
dotenv::dotenv().ok();
git_info();

View File

@ -1,4 +1,4 @@
use metrics::{Key, Metadata, Recorder, SetRecorderError};
use metrics::{Key, Recorder, SetRecorderError};
use metrics_util::{
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
MetricKindMask, Summary,
@ -289,7 +289,7 @@ impl Inner {
}
let mut d = self.distributions.write().unwrap();
let outer_entry = d.entry(name.clone()).or_default();
let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new);
let entry = outer_entry
.entry(labels)
@ -360,8 +360,8 @@ impl MemoryCollector {
d.entry(key.as_str().to_owned()).or_insert(description);
}
pub(crate) fn install(&self) -> Result<(), SetRecorderError<Self>> {
metrics::set_global_recorder(self.clone())
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
metrics::set_boxed_recorder(Box::new(self.clone()))
}
}
@ -393,19 +393,19 @@ impl Recorder for MemoryCollector {
self.add_description_if_missing(&key, description)
}
fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> metrics::Counter {
fn register_counter(&self, key: &Key) -> metrics::Counter {
self.inner
.registry
.get_or_create_counter(key, |c| c.clone().into())
}
fn register_gauge(&self, key: &Key, _: &Metadata<'_>) -> metrics::Gauge {
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
self.inner
.registry
.get_or_create_gauge(key, |c| c.clone().into())
}
fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> metrics::Histogram {
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
self.inner
.registry
.get_or_create_histogram(key, |c| c.clone().into())

View File

@ -12,8 +12,9 @@ use activitystreams::{
};
use config::Environment;
use http_signature_normalization_actix::{digest::ring::Sha256, prelude::VerifyDigest};
use rustls::sign::CertifiedKey;
use rustls::{Certificate, PrivateKey};
use std::{
io::BufReader,
net::{IpAddr, SocketAddr},
path::PathBuf,
};
@ -311,34 +312,43 @@ impl Config {
Some((config.addr, config.port).into())
}
pub(crate) async fn open_keys(&self) -> Result<Option<CertifiedKey>, Error> {
pub(crate) fn open_keys(&self) -> Result<Option<(Vec<Certificate>, PrivateKey)>, Error> {
let tls = if let Some(tls) = &self.tls {
tls
} else {
tracing::info!("No TLS config present");
tracing::warn!("No TLS config present");
return Ok(None);
};
let certs_bytes = tokio::fs::read(&tls.cert).await?;
let certs =
rustls_pemfile::certs(&mut certs_bytes.as_slice()).collect::<Result<Vec<_>, _>>()?;
let mut certs_reader = BufReader::new(std::fs::File::open(&tls.cert)?);
let certs = rustls_pemfile::certs(&mut certs_reader)?;
if certs.is_empty() {
tracing::warn!("No certs read from certificate file");
return Ok(None);
}
let key_bytes = tokio::fs::read(&tls.key).await?;
let key = if let Some(key) = rustls_pemfile::private_key(&mut key_bytes.as_slice())? {
key
let mut key_reader = BufReader::new(std::fs::File::open(&tls.key)?);
let key = rustls_pemfile::read_one(&mut key_reader)?;
let certs = certs.into_iter().map(Certificate).collect();
let key = if let Some(key) = key {
match key {
rustls_pemfile::Item::RSAKey(der) => PrivateKey(der),
rustls_pemfile::Item::PKCS8Key(der) => PrivateKey(der),
rustls_pemfile::Item::ECKey(der) => PrivateKey(der),
_ => {
tracing::warn!("Unknown key format: {:?}", key);
return Ok(None);
}
}
} else {
tracing::warn!("Failed to read private key");
return Ok(None);
};
let key = rustls::crypto::ring::sign::any_supported_type(&key)?;
Ok(Some(CertifiedKey::new(certs, key)))
Ok(Some((certs, key)))
}
pub(crate) fn footer_blurb(&self) -> Option<crate::templates::Html<String>> {

View File

@ -750,11 +750,6 @@ mod tests {
{
let db =
Db::build_inner(true, sled::Config::new().temporary(true).open().unwrap()).unwrap();
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on((f)(db));
actix_rt::System::new().block_on((f)(db));
}
}

View File

@ -1,85 +1,57 @@
use activitystreams::checked::CheckError;
use actix_rt::task::JoinError;
use actix_web::{
error::{BlockingError, ResponseError},
http::StatusCode,
HttpResponse,
};
use background_jobs::BoxError;
use color_eyre::eyre::Error as Report;
use http_signature_normalization_reqwest::SignError;
use std::{convert::Infallible, io, sync::Arc};
use tokio::task::JoinError;
#[derive(Clone)]
struct ArcKind {
kind: Arc<ErrorKind>,
}
impl std::fmt::Debug for ArcKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.kind.fmt(f)
}
}
impl std::fmt::Display for ArcKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.kind.fmt(f)
}
}
impl std::error::Error for ArcKind {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.kind.source()
}
}
use std::{convert::Infallible, fmt::Debug, io};
use tracing_error::SpanTrace;
pub(crate) struct Error {
kind: ArcKind,
display: Box<str>,
debug: Box<str>,
context: String,
kind: ErrorKind,
}
impl Error {
fn kind(&self) -> &ErrorKind {
&self.kind.kind
}
pub(crate) fn is_breaker(&self) -> bool {
matches!(self.kind(), ErrorKind::Breaker)
matches!(self.kind, ErrorKind::Breaker)
}
pub(crate) fn is_not_found(&self) -> bool {
matches!(self.kind(), ErrorKind::Status(_, StatusCode::NOT_FOUND))
matches!(self.kind, ErrorKind::Status(_, StatusCode::NOT_FOUND))
}
pub(crate) fn is_bad_request(&self) -> bool {
matches!(self.kind(), ErrorKind::Status(_, StatusCode::BAD_REQUEST))
matches!(self.kind, ErrorKind::Status(_, StatusCode::BAD_REQUEST))
}
pub(crate) fn is_gone(&self) -> bool {
matches!(self.kind(), ErrorKind::Status(_, StatusCode::GONE))
matches!(self.kind, ErrorKind::Status(_, StatusCode::GONE))
}
pub(crate) fn is_malformed_json(&self) -> bool {
matches!(self.kind(), ErrorKind::Json(_))
matches!(self.kind, ErrorKind::Json(_))
}
}
impl std::fmt::Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.debug)
writeln!(f, "{:?}", self.kind)
}
}
impl std::fmt::Display for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.display)
writeln!(f, "{}", self.kind)?;
std::fmt::Display::fmt(&self.context, f)
}
}
impl std::error::Error for Error {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
self.kind().source()
self.kind.source()
}
}
@ -88,36 +60,25 @@ where
ErrorKind: From<T>,
{
fn from(error: T) -> Self {
let kind = ArcKind {
kind: Arc::new(ErrorKind::from(error)),
};
let report = Report::new(kind.clone());
let display = format!("{report}");
let debug = format!("{report:?}");
Error {
kind,
display: Box::from(display),
debug: Box::from(debug),
context: SpanTrace::capture().to_string(),
kind: error.into(),
}
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum ErrorKind {
#[error("Error in extractor")]
Extractor(#[from] crate::extractors::ErrorKind),
#[error("Error queueing job, {0}")]
Queue(anyhow::Error),
#[error("Error queueing job")]
Queue(#[from] BoxError),
#[error("Error in configuration")]
#[error("Error in configuration, {0}")]
Config(#[from] config::ConfigError),
#[error("Couldn't parse key")]
#[error("Couldn't parse key, {0}")]
Pkcs8(#[from] rsa::pkcs8::Error),
#[error("Couldn't encode public key")]
#[error("Couldn't encode public key, {0}")]
Spki(#[from] rsa::pkcs8::spki::Error),
#[error("Couldn't sign request")]
@ -126,36 +87,33 @@ pub(crate) enum ErrorKind {
#[error("Couldn't make request")]
Reqwest(#[from] reqwest::Error),
#[error("Couldn't make request")]
#[error("Couldn't build client")]
ReqwestMiddleware(#[from] reqwest_middleware::Error),
#[error("Couldn't parse IRI")]
#[error("Couldn't parse IRI, {0}")]
ParseIri(#[from] activitystreams::iri_string::validate::Error),
#[error("Couldn't normalize IRI")]
#[error("Couldn't normalize IRI, {0}")]
NormalizeIri(#[from] std::collections::TryReserveError),
#[error("Couldn't perform IO")]
#[error("Couldn't perform IO, {0}")]
Io(#[from] io::Error),
#[error("Couldn't sign string, {0}")]
Rsa(rsa::errors::Error),
#[error("Couldn't use db")]
#[error("Couldn't use db, {0}")]
Sled(#[from] sled::Error),
#[error("Couldn't do the json thing")]
#[error("Couldn't do the json thing, {0}")]
Json(#[from] serde_json::Error),
#[error("Couldn't sign request")]
#[error("Couldn't sign request, {0}")]
Sign(#[from] SignError),
#[error("Couldn't sign digest")]
Signature(#[from] rsa::signature::Error),
#[error("Couldn't prepare TLS private key")]
PrepareKey(#[from] rustls::Error),
#[error("Couldn't verify signature")]
VerifySignature,
@ -186,10 +144,10 @@ pub(crate) enum ErrorKind {
#[error("Wrong ActivityPub kind, {0}")]
Kind(String),
#[error("Too many CPUs")]
#[error("Too many CPUs, {0}")]
CpuCount(#[from] std::num::TryFromIntError),
#[error("Host mismatch")]
#[error("{0}")]
HostMismatch(#[from] CheckError),
#[error("Couldn't flush buffer")]
@ -243,7 +201,7 @@ pub(crate) enum ErrorKind {
impl ResponseError for Error {
fn status_code(&self) -> StatusCode {
match self.kind() {
match self.kind {
ErrorKind::NotAllowed(_) | ErrorKind::WrongActor(_) | ErrorKind::BadActor(_, _) => {
StatusCode::FORBIDDEN
}
@ -263,7 +221,7 @@ impl ResponseError for Error {
.insert_header(("Content-Type", "application/activity+json"))
.body(
serde_json::to_string(&serde_json::json!({
"error": self.kind().to_string(),
"error": self.kind.to_string(),
}))
.unwrap_or_else(|_| "{}".to_string()),
)

View File

@ -1,15 +1,19 @@
use actix_web::{
dev::Payload,
error::ParseError,
http::header::{from_one_raw_str, Header, HeaderName, HeaderValue, TryIntoHeaderValue},
http::{
header::{from_one_raw_str, Header, HeaderName, HeaderValue, TryIntoHeaderValue},
StatusCode,
},
web::Data,
FromRequest, HttpMessage, HttpRequest,
FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError,
};
use bcrypt::{BcryptError, DEFAULT_COST};
use http_signature_normalization_actix::{prelude::InvalidHeaderValue, Canceled, Spawn};
use std::{convert::Infallible, str::FromStr, time::Instant};
use tracing_error::SpanTrace;
use crate::{db::Db, error::Error, future::LocalBoxFuture, spawner::Spawner};
use crate::{db::Db, future::LocalBoxFuture, spawner::Spawner};
#[derive(Clone)]
pub(crate) struct AdminConfig {
@ -24,7 +28,7 @@ impl AdminConfig {
}
fn verify(&self, token: XApiToken) -> Result<bool, Error> {
bcrypt::verify(token.0, &self.hashed_api_token).map_err(Error::bcrypt_verify)
bcrypt::verify(&token.0, &self.hashed_api_token).map_err(Error::bcrypt_verify)
}
}
@ -79,42 +83,74 @@ impl Admin {
}
}
#[derive(Debug, thiserror::Error)]
#[error("Failed authentication")]
pub(crate) struct Error {
context: String,
#[source]
kind: ErrorKind,
}
impl Error {
fn invalid() -> Self {
Error::from(ErrorKind::Invalid)
Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::Invalid,
}
}
fn missing_config() -> Self {
Error::from(ErrorKind::MissingConfig)
Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::MissingConfig,
}
}
fn missing_db() -> Self {
Error::from(ErrorKind::MissingDb)
Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::MissingDb,
}
}
fn missing_spawner() -> Self {
Error::from(ErrorKind::MissingSpawner)
Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::MissingSpawner,
}
}
fn bcrypt_verify(e: BcryptError) -> Self {
Error::from(ErrorKind::BCryptVerify(e))
Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::BCryptVerify(e),
}
}
fn bcrypt_hash(e: BcryptError) -> Self {
Error::from(ErrorKind::BCryptHash(e))
Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::BCryptHash(e),
}
}
fn parse_header(e: ParseError) -> Self {
Error::from(ErrorKind::ParseHeader(e))
Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::ParseHeader(e),
}
}
fn canceled(_: Canceled) -> Self {
Error::from(ErrorKind::Canceled)
Error {
context: SpanTrace::capture().to_string(),
kind: ErrorKind::Canceled,
}
}
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum ErrorKind {
enum ErrorKind {
#[error("Invalid API Token")]
Invalid,
@ -140,6 +176,20 @@ pub(crate) enum ErrorKind {
ParseHeader(#[source] ParseError),
}
impl ResponseError for Error {
fn status_code(&self) -> StatusCode {
match self.kind {
ErrorKind::Invalid | ErrorKind::ParseHeader(_) => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
fn error_response(&self) -> HttpResponse {
HttpResponse::build(self.status_code())
.json(serde_json::json!({ "msg": self.kind.to_string() }))
}
}
impl FromRequest for Admin {
type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;
@ -150,8 +200,10 @@ impl FromRequest for Admin {
Box::pin(async move {
let (db, c, s, t) = res?;
Self::verify(c, s, t).await?;
metrics::histogram!("relay.admin.verify")
.record(now.elapsed().as_micros() as f64 / 1_000_000_f64);
metrics::histogram!(
"relay.admin.verify",
now.elapsed().as_micros() as f64 / 1_000_000_f64
);
Ok(Admin { db })
})
}

View File

@ -1,4 +1,3 @@
use std::{future::Future, pin::Pin};
pub(crate) type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
pub(crate) type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;

View File

@ -19,10 +19,8 @@ use crate::{
jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline},
};
use background_jobs::{
memory_storage::{Storage, TokioTimer},
metrics::MetricsStorage,
tokio::{QueueHandle, WorkerConfig},
Job,
memory_storage::{ActixTimer, Storage},
Job, QueueHandle, WorkerConfig,
};
use std::time::Duration;
@ -45,12 +43,10 @@ pub(crate) fn create_workers(
actors: ActorCache,
media: MediaCache,
config: Config,
) -> std::io::Result<JobServer> {
) -> JobServer {
let deliver_concurrency = config.deliver_concurrency();
let queue_handle = WorkerConfig::new(
MetricsStorage::wrap(Storage::new(TokioTimer)),
move |queue_handle| {
let queue_handle = WorkerConfig::new(Storage::new(ActixTimer), move |queue_handle| {
JobState::new(
state.clone(),
actors.clone(),
@ -58,8 +54,7 @@ pub(crate) fn create_workers(
media.clone(),
config.clone(),
)
},
)
})
.register::<Deliver>()
.register::<DeliverMany>()
.register::<QueryNodeinfo>()
@ -75,12 +70,12 @@ pub(crate) fn create_workers(
.set_worker_count("maintenance", 2)
.set_worker_count("apub", 2)
.set_worker_count("deliver", deliver_concurrency)
.start()?;
.start();
queue_handle.every(Duration::from_secs(60 * 5), Listeners)?;
queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline)?;
queue_handle.every(Duration::from_secs(60 * 5), Listeners);
queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline);
Ok(JobServer::new(queue_handle))
JobServer::new(queue_handle)
}
#[derive(Clone, Debug)]

View File

@ -2,14 +2,14 @@ use crate::{
config::{Config, UrlKind},
db::Actor,
error::Error,
future::BoxFuture,
jobs::{
apub::{get_inboxes, prepare_activity},
DeliverMany, JobState,
},
};
use activitystreams::{activity::Announce as AsAnnounce, iri_string::types::IriString};
use background_jobs::Job;
use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Announce {
@ -62,15 +62,14 @@ fn generate_announce(
)
}
impl Job for Announce {
impl ActixJob for Announce {
type State = JobState;
type Error = Error;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::apub::Announce";
const QUEUE: &'static str = "apub";
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -3,7 +3,6 @@ use crate::{
config::{Config, UrlKind},
db::Actor,
error::{Error, ErrorKind},
future::BoxFuture,
jobs::{apub::prepare_activity, Deliver, JobState, QueryInstance, QueryNodeinfo},
};
use activitystreams::{
@ -11,7 +10,8 @@ use activitystreams::{
iri_string::types::IriString,
prelude::*,
};
use background_jobs::Job;
use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Follow {
@ -111,15 +111,14 @@ fn generate_accept_follow(
)
}
impl Job for Follow {
impl ActixJob for Follow {
type State = JobState;
type Error = Error;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::apub::Follow";
const QUEUE: &'static str = "apub";
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -2,11 +2,11 @@ use crate::{
apub::AcceptedActivities,
db::Actor,
error::{Error, ErrorKind},
future::BoxFuture,
jobs::{apub::get_inboxes, DeliverMany, JobState},
};
use activitystreams::prelude::*;
use background_jobs::Job;
use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Forward {
@ -47,15 +47,14 @@ impl Forward {
}
}
impl Job for Forward {
impl ActixJob for Forward {
type State = JobState;
type Error = Error;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::apub::Forward";
const QUEUE: &'static str = "apub";
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -2,10 +2,10 @@ use crate::{
config::UrlKind,
db::Actor,
error::Error,
future::BoxFuture,
jobs::{apub::generate_undo_follow, Deliver, JobState},
};
use background_jobs::Job;
use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Reject(pub(crate) Actor);
@ -33,15 +33,14 @@ impl Reject {
}
}
impl Job for Reject {
impl ActixJob for Reject {
type State = JobState;
type Error = Error;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::apub::Reject";
const QUEUE: &'static str = "apub";
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -3,11 +3,11 @@ use crate::{
config::UrlKind,
db::Actor,
error::Error,
future::BoxFuture,
jobs::{apub::generate_undo_follow, Deliver, JobState},
};
use activitystreams::prelude::BaseExt;
use background_jobs::Job;
use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Undo {
@ -48,15 +48,14 @@ impl Undo {
}
}
impl Job for Undo {
impl ActixJob for Undo {
type State = JobState;
type Error = Error;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::apub::Undo";
const QUEUE: &'static str = "apub";
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -1,12 +1,12 @@
use crate::{
apub::AcceptedActors,
error::{Error, ErrorKind},
future::BoxFuture,
jobs::JobState,
requests::BreakerStrategy,
};
use activitystreams::{iri_string::types::IriString, object::Image, prelude::*};
use background_jobs::Job;
use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct QueryContact {
@ -85,16 +85,15 @@ fn to_contact(contact: AcceptedActors) -> Option<(String, String, IriString, Iri
Some((username, display_name, url, avatar))
}
impl Job for QueryContact {
impl ActixJob for QueryContact {
type State = JobState;
type Error = Error;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::QueryContact";
const QUEUE: &'static str = "maintenance";
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -1,11 +1,11 @@
use crate::{
error::Error,
future::BoxFuture,
jobs::{debug_object, JobState},
requests::BreakerStrategy,
};
use activitystreams::iri_string::types::IriString;
use background_jobs::{Backoff, Job};
use background_jobs::{ActixJob, Backoff};
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Deliver {
@ -35,7 +35,7 @@ impl Deliver {
}
#[tracing::instrument(name = "Deliver", skip(state))]
async fn perform(self, state: JobState) -> Result<(), Error> {
async fn permform(self, state: JobState) -> Result<(), Error> {
if let Err(e) = state
.state
.requests
@ -56,16 +56,15 @@ impl Deliver {
}
}
impl Job for Deliver {
impl ActixJob for Deliver {
type State = JobState;
type Error = Error;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::Deliver";
const QUEUE: &'static str = "deliver";
const BACKOFF: Backoff = Backoff::Exponential(8);
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
Box::pin(async move { self.permform(state).await.map_err(Into::into) })
}
}

View File

@ -1,10 +1,10 @@
use crate::{
error::Error,
future::BoxFuture,
future::LocalBoxFuture,
jobs::{debug_object, Deliver, JobState},
};
use activitystreams::iri_string::types::IriString;
use background_jobs::Job;
use background_jobs::ActixJob;
#[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct DeliverMany {
@ -45,15 +45,14 @@ impl DeliverMany {
}
}
impl Job for DeliverMany {
impl ActixJob for DeliverMany {
type State = JobState;
type Error = Error;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
const NAME: &'static str = "relay::jobs::DeliverMany";
const QUEUE: &'static str = "deliver";
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -1,12 +1,12 @@
use crate::{
config::UrlKind,
error::{Error, ErrorKind},
future::BoxFuture,
jobs::{Boolish, JobState},
requests::BreakerStrategy,
};
use activitystreams::{iri, iri_string::types::IriString};
use background_jobs::Job;
use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct QueryInstance {
@ -165,16 +165,15 @@ impl QueryInstance {
}
}
impl Job for QueryInstance {
impl ActixJob for QueryInstance {
type State = JobState;
type Error = Error;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::QueryInstance";
const QUEUE: &'static str = "maintenance";
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -1,18 +1,18 @@
use crate::{
error::{Error, ErrorKind},
future::BoxFuture,
jobs::{Boolish, JobState, QueryContact},
requests::BreakerStrategy,
};
use activitystreams::{iri, iri_string::types::IriString, primitives::OneOrMany};
use background_jobs::Job;
use background_jobs::ActixJob;
use std::{fmt::Debug, future::Future, pin::Pin};
#[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct QueryNodeinfo {
actor_id: IriString,
}
impl std::fmt::Debug for QueryNodeinfo {
impl Debug for QueryNodeinfo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("QueryNodeinfo")
.field("actor_id", &self.actor_id.to_string())
@ -92,7 +92,7 @@ impl QueryNodeinfo {
.metadata
.and_then(|meta| meta.into_iter().next().and_then(|meta| meta.staff_accounts))
{
if let Some(contact_id) = accounts.first() {
if let Some(contact_id) = accounts.get(0) {
state
.job_server
.queue(QueryContact::new(self.actor_id, contact_id.clone()))
@ -104,16 +104,15 @@ impl QueryNodeinfo {
}
}
impl Job for QueryNodeinfo {
impl ActixJob for QueryNodeinfo {
type State = JobState;
type Error = Error;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::QueryNodeinfo";
const QUEUE: &'static str = "maintenance";
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -1,9 +1,9 @@
use crate::{
error::Error,
future::BoxFuture,
jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState},
};
use background_jobs::Job;
use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct Listeners;
@ -23,15 +23,14 @@ impl Listeners {
}
}
impl Job for Listeners {
impl ActixJob for Listeners {
type State = JobState;
type Error = Error;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::Listeners";
const QUEUE: &'static str = "maintenance";
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -1,5 +1,6 @@
use crate::{error::Error, future::BoxFuture, jobs::JobState};
use background_jobs::{Backoff, Job};
use crate::{error::Error, jobs::JobState};
use background_jobs::{ActixJob, Backoff};
use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct RecordLastOnline;
@ -13,16 +14,15 @@ impl RecordLastOnline {
}
}
impl Job for RecordLastOnline {
impl ActixJob for RecordLastOnline {
type State = JobState;
type Error = Error;
type Future = BoxFuture<'static, Result<(), Self::Error>>;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::RecordLastOnline";
const QUEUE: &'static str = "maintenance";
const BACKOFF: Backoff = Backoff::Linear(1);
fn run(self, state: Self::State) -> Self::Future {
Box::pin(self.perform(state))
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -4,6 +4,7 @@
use std::time::Duration;
use activitystreams::iri_string::types::IriString;
use actix_rt::task::JoinHandle;
use actix_web::{middleware::Compress, web, App, HttpServer};
use collector::MemoryCollector;
#[cfg(feature = "console")]
@ -12,16 +13,14 @@ use error::Error;
use http_signature_normalization_actix::middleware::VerifySignature;
use metrics_exporter_prometheus::PrometheusBuilder;
use metrics_util::layers::FanoutBuilder;
use opentelemetry::KeyValue;
use opentelemetry::{sdk::Resource, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::Resource;
use reqwest_middleware::ClientWithMiddleware;
use rustls::ServerConfig;
use tokio::task::JoinHandle;
use tracing_actix_web::TracingLogger;
use tracing_error::ErrorLayer;
use tracing_log::LogTracer;
use tracing_subscriber::{filter::Targets, layer::SubscriberExt, Layer};
use tracing_subscriber::{filter::Targets, fmt::format::FmtSpan, layer::SubscriberExt, Layer};
mod admin;
mod apub;
@ -56,15 +55,16 @@ use self::{
fn init_subscriber(
software_name: &'static str,
opentelemetry_url: Option<&IriString>,
) -> color_eyre::Result<()> {
) -> Result<(), anyhow::Error> {
LogTracer::init()?;
color_eyre::install()?;
let targets: Targets = std::env::var("RUST_LOG")
.unwrap_or_else(|_| "info".into())
.unwrap_or_else(|_| "warn,actix_web=debug,actix_server=debug,tracing_actix_web=info".into())
.parse()?;
let format_layer = tracing_subscriber::fmt::layer().with_filter(targets.clone());
let format_layer = tracing_subscriber::fmt::layer()
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_filter(targets.clone());
#[cfg(feature = "console")]
let console_layer = ConsoleLayer::builder()
@ -81,19 +81,18 @@ fn init_subscriber(
let subscriber = subscriber.with(console_layer);
if let Some(url) = opentelemetry_url {
let tracer = opentelemetry_otlp::new_pipeline()
let tracer =
opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
opentelemetry_sdk::trace::config().with_resource(Resource::new(vec![
KeyValue::new("service.name", software_name),
])),
)
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
Resource::new(vec![KeyValue::new("service.name", software_name)]),
))
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(url.as_str()),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
.install_batch(opentelemetry::runtime::Tokio)?;
let otel_layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
@ -140,8 +139,8 @@ fn build_client(
Ok(client_with_middleware)
}
#[tokio::main]
async fn main() -> color_eyre::Result<()> {
#[actix_rt::main]
async fn main() -> Result<(), anyhow::Error> {
dotenv::dotenv().ok();
let config = Config::build()?;
@ -151,8 +150,7 @@ async fn main() -> color_eyre::Result<()> {
let args = Args::new();
if args.any() {
client_main(config, args).await??;
return Ok(());
return client_main(config, args).await?;
}
let collector = MemoryCollector::new();
@ -162,35 +160,35 @@ async fn main() -> color_eyre::Result<()> {
.with_http_listener(bind_addr)
.build()?;
tokio::spawn(exporter);
actix_rt::spawn(exporter);
let recorder = FanoutBuilder::default()
.add_recorder(recorder)
.add_recorder(collector.clone())
.build();
metrics::set_global_recorder(recorder).map_err(|e| color_eyre::eyre::eyre!("{e}"))?;
metrics::set_boxed_recorder(Box::new(recorder))?;
} else {
collector.install()?;
}
tracing::info!("Opening DB");
tracing::warn!("Opening DB");
let db = Db::build(&config)?;
tracing::info!("Building caches");
tracing::warn!("Building caches");
let actors = ActorCache::new(db.clone());
let media = MediaCache::new(db.clone());
server_main(db, actors, media, collector, config).await?;
server_main(db, actors, media, collector, config).await??;
tracing::info!("Application exit");
tracing::warn!("Application exit");
Ok(())
}
fn client_main(config: Config, args: Args) -> JoinHandle<color_eyre::Result<()>> {
tokio::spawn(do_client_main(config, args))
fn client_main(config: Config, args: Args) -> JoinHandle<Result<(), anyhow::Error>> {
actix_rt::spawn(do_client_main(config, args))
}
async fn do_client_main(config: Config, args: Args) -> color_eyre::Result<()> {
async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
let client = build_client(
&config.user_agent(),
config.client_timeout(),
@ -273,22 +271,32 @@ async fn do_client_main(config: Config, args: Args) -> color_eyre::Result<()> {
Ok(())
}
const VERIFY_RATIO: usize = 7;
async fn server_main(
fn server_main(
db: Db,
actors: ActorCache,
media: MediaCache,
collector: MemoryCollector,
config: Config,
) -> color_eyre::Result<()> {
) -> JoinHandle<Result<(), anyhow::Error>> {
actix_rt::spawn(do_server_main(db, actors, media, collector, config))
}
const VERIFY_RATIO: usize = 7;
async fn do_server_main(
db: Db,
actors: ActorCache,
media: MediaCache,
collector: MemoryCollector,
config: Config,
) -> Result<(), anyhow::Error> {
let client = build_client(
&config.user_agent(),
config.client_timeout(),
config.proxy_config(),
)?;
tracing::info!("Creating state");
tracing::warn!("Creating state");
let (signature_threads, verify_threads) = match config.signature_threads() {
0 | 1 => (1, 1),
@ -301,30 +309,23 @@ async fn server_main(
}
};
let verify_spawner = Spawner::build("verify-cpu", verify_threads.try_into()?)?;
let sign_spawner = Spawner::build("sign-cpu", signature_threads.try_into()?)?;
let verify_spawner = Spawner::build("verify-cpu", verify_threads)?;
let sign_spawner = Spawner::build("sign-cpu", signature_threads)?;
let key_id = config.generate_url(UrlKind::MainKey).to_string();
let state = State::build(db.clone(), key_id, sign_spawner.clone(), client).await?;
let state = State::build(db.clone(), key_id, sign_spawner, client).await?;
if let Some((token, admin_handle)) = config.telegram_info() {
tracing::info!("Creating telegram handler");
tracing::warn!("Creating telegram handler");
telegram::start(admin_handle.to_owned(), db.clone(), token);
}
let cert_resolver = config
.open_keys()
.await?
.map(rustls_channel_resolver::channel::<32>);
let keys = config.open_keys()?;
let bind_address = config.bind_address();
let sign_spawner2 = sign_spawner.clone();
let verify_spawner2 = verify_spawner.clone();
let config2 = config.clone();
let server = HttpServer::new(move || {
let job_server =
create_workers(state.clone(), actors.clone(), media.clone(), config.clone())
.expect("Failed to create job server");
create_workers(state.clone(), actors.clone(), media.clone(), config.clone());
let app = App::new()
.app_data(web::Data::new(db.clone()))
@ -390,42 +391,24 @@ async fn server_main(
)
});
if let Some((cert_tx, cert_rx)) = cert_resolver {
let handle = tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.tick().await;
loop {
interval.tick().await;
match config2.open_keys().await {
Ok(Some(key)) => cert_tx.update(key),
Ok(None) => tracing::warn!("Missing TLS keys"),
Err(e) => tracing::error!("Failed to read TLS keys {e}"),
}
}
});
tracing::info!("Binding to {}:{} with TLS", bind_address.0, bind_address.1);
if let Some((certs, key)) = keys {
tracing::warn!("Binding to {}:{} with TLS", bind_address.0, bind_address.1);
let server_config = ServerConfig::builder()
.with_safe_default_cipher_suites()
.with_safe_default_kx_groups()
.with_safe_default_protocol_versions()?
.with_no_client_auth()
.with_cert_resolver(cert_rx);
.with_single_cert(certs, key)?;
server
.bind_rustls_0_22(bind_address, server_config)?
.bind_rustls_021(bind_address, server_config)?
.run()
.await?;
handle.abort();
let _ = handle.await;
} else {
tracing::info!("Binding to {}:{}", bind_address.0, bind_address.1);
tracing::warn!("Binding to {}:{}", bind_address.0, bind_address.1);
server.bind(bind_address)?.run().await?;
}
sign_spawner2.close().await;
verify_spawner2.close().await;
tracing::info!("Server closed");
tracing::warn!("Server closed");
Ok(())
}

View File

@ -40,7 +40,7 @@ impl Drop for LogOnDrop {
fn drop(&mut self) {
if self.arm {
let duration = self.begin.elapsed();
metrics::histogram!("relay.request.complete", "path" => self.path.clone(), "method" => self.method.clone()).record(duration);
metrics::histogram!("relay.request.complete", duration, "path" => self.path.clone(), "method" => self.method.clone());
}
}
}

View File

@ -432,7 +432,7 @@ struct Signer {
impl Signer {
fn sign(&self, signing_string: &str) -> Result<String, Error> {
let mut signature = vec![0; self.private_key.public().modulus_len()];
let mut signature = vec![0; self.private_key.public_modulus_len()];
self.private_key
.sign(

View File

@ -14,11 +14,10 @@ const MINIFY_CONFIG: minify_html::Cfg = minify_html::Cfg {
keep_html_and_head_opening_tags: false,
keep_spaces_between_attributes: true,
keep_comments: false,
keep_input_type_text_attr: true,
keep_ssi_comments: false,
preserve_brace_template_syntax: false,
preserve_chevron_percent_template_syntax: false,
minify_css: true,
minify_css_level_1: true,
minify_css_level_2: false,
minify_css_level_3: false,
minify_js: true,
remove_bangs: true,
remove_processing_instructions: true,

View File

@ -1,30 +1,107 @@
use async_cpupool::CpuPool;
use http_signature_normalization_actix::{Canceled, Spawn};
use std::time::Duration;
use std::{
panic::AssertUnwindSafe,
sync::Arc,
thread::JoinHandle,
time::{Duration, Instant},
};
#[derive(Clone)]
fn spawner_thread(
receiver: flume::Receiver<Box<dyn FnOnce() + Send>>,
name: &'static str,
id: usize,
) {
let guard = MetricsGuard::guard(name, id);
while let Ok(f) = receiver.recv() {
let start = Instant::now();
metrics::increment_counter!(format!("relay.{name}.operation.start"), "id" => id.to_string());
let res = std::panic::catch_unwind(AssertUnwindSafe(f));
metrics::increment_counter!(format!("relay.{name}.operation.end"), "complete" => res.is_ok().to_string(), "id" => id.to_string());
metrics::histogram!(format!("relay.{name}.operation.duration"), start.elapsed().as_secs_f64(), "complete" => res.is_ok().to_string(), "id" => id.to_string());
if let Err(e) = res {
tracing::warn!("{name} fn panicked: {e:?}");
}
}
guard.disarm();
}
#[derive(Clone, Debug)]
pub(crate) struct Spawner {
pool: CpuPool,
name: &'static str,
sender: Option<flume::Sender<Box<dyn FnOnce() + Send>>>,
threads: Option<Arc<Vec<JoinHandle<()>>>>,
}
struct MetricsGuard {
name: &'static str,
id: usize,
start: Instant,
armed: bool,
}
impl MetricsGuard {
fn guard(name: &'static str, id: usize) -> Self {
metrics::increment_counter!(format!("relay.{name}.launched"), "id" => id.to_string());
Self {
name,
id,
start: Instant::now(),
armed: true,
}
}
fn disarm(mut self) {
self.armed = false;
}
}
impl Drop for MetricsGuard {
fn drop(&mut self) {
metrics::increment_counter!(format!("relay.{}.closed", self.name), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
metrics::histogram!(format!("relay.{}.duration", self.name), self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
tracing::warn!("Stopping {} - {}", self.name, self.id);
}
}
impl Spawner {
pub(crate) fn build(name: &'static str, threads: u16) -> color_eyre::Result<Self> {
let pool = CpuPool::configure()
.name(name)
.max_threads(threads)
.build()?;
pub(crate) fn build(name: &'static str, threads: usize) -> std::io::Result<Self> {
let (sender, receiver) = flume::bounded(8);
Ok(Spawner { pool })
}
tracing::warn!("Launching {threads} {name}s");
pub(crate) async fn close(self) {
self.pool.close().await;
let threads = (0..threads)
.map(|i| {
let receiver = receiver.clone();
std::thread::Builder::new()
.name(format!("{name}-{i}"))
.spawn(move || {
spawner_thread(receiver, name, i);
})
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Spawner {
name,
sender: Some(sender),
threads: Some(Arc::new(threads)),
})
}
}
impl std::fmt::Debug for Spawner {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Spawner").finish()
impl Drop for Spawner {
fn drop(&mut self) {
self.sender.take();
if let Some(threads) = self.threads.take().and_then(Arc::into_inner) {
tracing::warn!("Joining {}s", self.name);
for thread in threads {
let _ = thread.join();
}
}
}
}
@ -34,9 +111,9 @@ where
{
let id = uuid::Uuid::new_v4();
metrics::counter!("relay.spawner.wait-timer.start").increment(1);
metrics::increment_counter!("relay.spawner.wait-timer.start");
let mut interval = tokio::time::interval(Duration::from_secs(5));
let mut interval = actix_rt::time::interval(Duration::from_secs(5));
// pass the first tick (instant)
interval.tick().await;
@ -47,12 +124,12 @@ where
loop {
tokio::select! {
out = &mut fut => {
metrics::counter!("relay.spawner.wait-timer.end").increment(1);
metrics::increment_counter!("relay.spawner.wait-timer.end");
return out;
}
_ = interval.tick() => {
counter += 1;
metrics::counter!("relay.spawner.wait-timer.pending").increment(1);
metrics::increment_counter!("relay.spawner.wait-timer.pending");
tracing::warn!("Blocking operation {id} is taking a long time, {} seconds", counter * 5);
}
}
@ -67,9 +144,22 @@ impl Spawn for Spawner {
Func: FnOnce() -> Out + Send + 'static,
Out: Send + 'static,
{
let pool = self.pool.clone();
let sender = self.sender.as_ref().expect("Sender exists").clone();
Box::pin(async move { timer(pool.spawn(func)).await.map_err(|_| Canceled) })
Box::pin(async move {
let (tx, rx) = flume::bounded(1);
let _ = sender
.send_async(Box::new(move || {
if tx.try_send((func)()).is_err() {
tracing::warn!("Requestor hung up");
metrics::increment_counter!("relay.spawner.disconnected");
}
}))
.await;
timer(rx.recv_async()).await.map_err(|_| Canceled)
})
}
}
@ -81,10 +171,21 @@ impl http_signature_normalization_reqwest::Spawn for Spawner {
Func: FnOnce() -> Out + Send + 'static,
Out: Send + 'static,
{
let pool = self.pool.clone();
let sender = self.sender.as_ref().expect("Sender exists").clone();
Box::pin(async move {
timer(pool.spawn(func))
let (tx, rx) = flume::bounded(1);
let _ = sender
.send_async(Box::new(move || {
if tx.try_send((func)()).is_err() {
tracing::warn!("Requestor hung up");
metrics::increment_counter!("relay.spawner.disconnected");
}
}))
.await;
timer(rx.recv_async())
.await
.map_err(|_| http_signature_normalization_reqwest::Canceled)
})

View File

@ -46,7 +46,7 @@ pub(crate) fn start(admin_handle: String, db: Db, token: &str) {
let bot = Bot::new(token);
let admin_handle = Arc::new(admin_handle);
tokio::spawn(async move {
actix_rt::spawn(async move {
let command_handler = teloxide::filter_command::<Command, _>().endpoint(
move |bot: Bot, msg: Message, cmd: Command| {
let admin_handle = admin_handle.clone();