From 261805004b33650fb1a2f87cc8bfa18301f761b6 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 14:45:09 -0600 Subject: [PATCH 01/60] Update background-jobs --- Cargo.lock | 42 ++++++++++++++++++++++++++++++++++++------ Cargo.toml | 2 +- 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e38d3f2..7a2f470 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -485,9 +485,9 @@ dependencies = [ [[package]] name = "background-jobs" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "793a813f9145c5f3a27b8dcd834c0927de68bbd60d53a369e5894f3cc5759020" +checksum = "62dc7cfc967d6714768097a876ca2941a54a26976c6d3c95ea6da48974890970" dependencies = [ "background-jobs-actix", "background-jobs-core", @@ -495,15 +495,16 @@ dependencies = [ [[package]] name = "background-jobs-actix" -version = "0.13.1" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47263ad9c5679419347dae655c2fa2cba078b0eaa51ac758d4f0e9690c06910b" +checksum = "ea433afb3fbbb6dc2614bad9f6671517f6cffcd523981e568cdb595dc7fa6399" dependencies = [ "actix-rt", "anyhow", "async-mutex", "async-trait", "background-jobs-core", + "metrics", "serde", "serde_json", "thiserror", @@ -515,14 +516,15 @@ dependencies = [ [[package]] name = "background-jobs-core" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48e78e842fe2ae461319e3d1843c12e301630e65650332b02032ac70b0dfc66f" +checksum = "b3071bf7a5e46638085076957dc189b2b0b147cd279eb09510b7af54e95085ef" dependencies = [ "actix-rt", "anyhow", "async-trait", "event-listener", + "metrics", "serde", "serde_json", "thiserror", @@ -1563,6 +1565,28 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.20.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b9b8653cec6897f73b519a43fba5ee3d50f62fe9af80b428accdcc093b4a849" +dependencies = [ + "ahash", + "metrics-macros", + "portable-atomic", +] + +[[package]] +name = "metrics-macros" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "731f8ecebd9f3a4aa847dfe75455e4757a45da40a7793d2f0b1f9b6ed18b23f3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "mime" version = "0.3.16" @@ -2065,6 +2089,12 @@ dependencies = [ "spki", ] +[[package]] +name = "portable-atomic" +version = "0.3.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15eb2c6e362923af47e13c23ca5afb859e83d54452c55b0b9ac763b8f7c1ac16" + [[package]] name = "ppv-lite86" version = "0.2.17" diff --git a/Cargo.toml b/Cargo.toml index dcddf46..b310b9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -70,7 +70,7 @@ tokio = { version = "1", features = ["macros", "sync"] } uuid = { version = "1", features = ["v4", "serde"] } [dependencies.background-jobs] -version = "0.13.0" +version = "0.14.0" default-features = false features = ["background-jobs-actix", "error-logging"] From 902ce5d3c28945045d408526ea1557be3a4efd64 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 14:47:32 -0600 Subject: [PATCH 02/60] New module structure --- src/{data/mod.rs => data.rs} | 0 src/{jobs/mod.rs => jobs.rs} | 0 src/jobs/{apub/mod.rs => apub.rs} | 0 src/{middleware/mod.rs => middleware.rs} | 0 src/{routes/mod.rs => routes.rs} | 0 5 files changed, 0 insertions(+), 0 deletions(-) rename src/{data/mod.rs => data.rs} (100%) rename src/{jobs/mod.rs => jobs.rs} (100%) rename src/jobs/{apub/mod.rs => apub.rs} (100%) rename src/{middleware/mod.rs => middleware.rs} (100%) rename src/{routes/mod.rs => routes.rs} (100%) diff --git a/src/data/mod.rs b/src/data.rs similarity index 100% rename from src/data/mod.rs rename to src/data.rs diff --git a/src/jobs/mod.rs b/src/jobs.rs similarity index 100% rename from src/jobs/mod.rs rename to src/jobs.rs diff --git a/src/jobs/apub/mod.rs b/src/jobs/apub.rs similarity index 100% rename from src/jobs/apub/mod.rs rename to src/jobs/apub.rs diff --git a/src/middleware/mod.rs b/src/middleware.rs similarity index 100% rename from src/middleware/mod.rs rename to src/middleware.rs diff --git a/src/routes/mod.rs b/src/routes.rs similarity index 100% rename from src/routes/mod.rs rename to src/routes.rs From c8b81bb9aa378575bf00c41ecaab669e82dde68f Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 14:47:47 -0600 Subject: [PATCH 03/60] Add metrics dependencies --- Cargo.lock | 107 ++++++++++++++++++++++++++++++++++++++++++++++++++++- Cargo.toml | 2 + 2 files changed, 107 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7a2f470..f7e33ec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -300,6 +300,8 @@ dependencies = [ "futures-util", "http-signature-normalization-actix", "lru", + "metrics", + "metrics-util", "mime", "opentelemetry", "opentelemetry-otlp", @@ -931,6 +933,12 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "endian-type" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c34f04666d835ff5d62e058c3995147c06f42fe86ff053337632bca83e42702d" + [[package]] name = "erasable" version = "1.2.1" @@ -1123,7 +1131,7 @@ checksum = "c05aeb6a22b8f62540c194aac980f2115af067bfe15a0734d7277a768d396b31" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", ] [[package]] @@ -1509,6 +1517,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c41e0c4fef86961ac6d6f8a82609f55f31b05e4fce149ac5710e439df7619ba4" +[[package]] +name = "mach" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b823e83b2affd8f40a9ee8c29dbc56404c1e34cd2710921f2801e2cf29527afa" +dependencies = [ + "libc", +] + [[package]] name = "maplit" version = "1.0.2" @@ -1587,6 +1604,27 @@ dependencies = [ "syn", ] +[[package]] +name = "metrics-util" +version = "0.14.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7d24dc2dbae22bff6f1f9326ffce828c9f07ef9cc1e8002e5279f845432a30a" +dependencies = [ + "aho-corasick", + "crossbeam-epoch", + "crossbeam-utils", + "hashbrown", + "indexmap", + "metrics", + "num_cpus", + "ordered-float", + "parking_lot 0.12.1", + "portable-atomic", + "quanta", + "radix_trie", + "sketches-ddsketch", +] + [[package]] name = "mime" version = "0.3.16" @@ -1626,7 +1664,7 @@ checksum = "e5d732bc30207a6423068df043e3d02e0735b155ad7ce1a6f76fe2baa5b158de" dependencies = [ "libc", "log", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys", ] @@ -1648,6 +1686,15 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" +[[package]] +name = "nibble_vec" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a5d83df9f36fe23f0c3648c6bbb8b0298bb5f1939c8f2704431371f4b84d43" +dependencies = [ + "smallvec", +] + [[package]] name = "nom" version = "7.1.1" @@ -1846,6 +1893,15 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "ordered-float" +version = "2.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7940cf2ca942593318d07fcf2596cdca60a85c9e7fab408a5e21a4f9dcd40d87" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.4.3" @@ -2205,6 +2261,22 @@ dependencies = [ "prost", ] +[[package]] +name = "quanta" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7e31331286705f455e56cca62e0e717158474ff02b7936c1fa596d983f4ae27" +dependencies = [ + "crossbeam-utils", + "libc", + "mach", + "once_cell", + "raw-cpuid", + "wasi 0.10.2+wasi-snapshot-preview1", + "web-sys", + "winapi", +] + [[package]] name = "quote" version = "1.0.21" @@ -2214,6 +2286,16 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "radix_trie" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c069c179fcdc6a2fe24d8d18305cf085fdbd4f922c041943e203685d6a1c58fd" +dependencies = [ + "endian-type", + "nibble_vec", +] + [[package]] name = "rand" version = "0.8.5" @@ -2244,6 +2326,15 @@ dependencies = [ "getrandom", ] +[[package]] +name = "raw-cpuid" +version = "10.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6823ea29436221176fe662da99998ad3b4db2c7f31e7b6f5fe43adccd6320bb" +dependencies = [ + "bitflags", +] + [[package]] name = "rc-box" version = "1.2.0" @@ -2609,6 +2700,12 @@ version = "0.3.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7bd3e3206899af3f8b12af284fafc038cc1dc2b41d1b89dd17297221c5d225de" +[[package]] +name = "sketches-ddsketch" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ceb945e54128e09c43d8e4f1277851bd5044c6fc540bbaa2ad888f60b3da9ae7" + [[package]] name = "slab" version = "0.4.7" @@ -3317,6 +3414,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.10.2+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" diff --git a/Cargo.toml b/Cargo.toml index b310b9a..334600f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,6 +38,8 @@ dashmap = "5.1.0" dotenv = "0.15.0" futures-util = "0.3.17" lru = "0.8.0" +metrics = "0.20.1" +metrics-util = "0.14.0" mime = "0.3.16" opentelemetry = { version = "0.18", features = ["rt-tokio"] } opentelemetry-otlp = "0.11" From c322798ba35fb32cf6ae5caeaf83f10feb81947c Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 17:28:15 -0600 Subject: [PATCH 04/60] Add metrics collector, admin route --- Cargo.lock | 2 + Cargo.toml | 2 + src/admin/routes.rs | 13 +- src/collector.rs | 369 ++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 9 +- 5 files changed, 392 insertions(+), 3 deletions(-) create mode 100644 src/collector.rs diff --git a/Cargo.lock b/Cargo.lock index f7e33ec..694caf4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -299,12 +299,14 @@ dependencies = [ "dotenv", "futures-util", "http-signature-normalization-actix", + "indexmap", "lru", "metrics", "metrics-util", "mime", "opentelemetry", "opentelemetry-otlp", + "quanta", "rand", "rsa", "rsa-magic-public-key", diff --git a/Cargo.toml b/Cargo.toml index 334600f..05753fc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -37,12 +37,14 @@ console-subscriber = { version = "0.1", optional = true } dashmap = "5.1.0" dotenv = "0.15.0" futures-util = "0.3.17" +indexmap = "1.9.2" lru = "0.8.0" metrics = "0.20.1" metrics-util = "0.14.0" mime = "0.3.16" opentelemetry = { version = "0.18", features = ["rt-tokio"] } opentelemetry-otlp = "0.11" +quanta = "0.10.1" rand = "0.8" rsa = "0.7" rsa-magic-public-key = "0.6.0" diff --git a/src/admin/routes.rs b/src/admin/routes.rs index c33efca..6578bfd 100644 --- a/src/admin/routes.rs +++ b/src/admin/routes.rs @@ -1,9 +1,13 @@ use crate::{ admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains}, + collector::{MemoryCollector, Snapshot}, error::Error, extractors::Admin, }; -use actix_web::{web::Json, HttpResponse}; +use actix_web::{ + web::{Data, Json}, + HttpResponse, +}; pub(crate) async fn allow( admin: Admin, @@ -58,3 +62,10 @@ pub(crate) async fn connected(admin: Admin) -> Result, Err Ok(Json(ConnectedActors { connected_actors })) } + +pub(crate) async fn stats( + _admin: Admin, + collector: Data, +) -> Result, Error> { + Ok(Json(collector.snapshot())) +} diff --git a/src/collector.rs b/src/collector.rs new file mode 100644 index 0000000..401307b --- /dev/null +++ b/src/collector.rs @@ -0,0 +1,369 @@ +use dashmap::DashMap; +use indexmap::IndexMap; +use metrics::{Key, Recorder, SetRecorderError}; +use metrics_util::{ + registry::{AtomicStorage, GenerationalStorage, Recency, Registry}, + MetricKindMask, Summary, +}; +use quanta::Clock; +use std::{ + collections::HashMap, + sync::{atomic::Ordering, Arc}, + time::Duration, +}; + +const SECONDS: u64 = 1; +const MINUTES: u64 = 60 * SECONDS; +const HOURS: u64 = 60 * MINUTES; +const DAYS: u64 = 24 * HOURS; + +#[derive(Clone)] +pub struct MemoryCollector { + inner: Arc, +} + +struct Inner { + descriptions: DashMap, + distributions: DashMap, Summary>>, + recency: Recency, + registry: Registry>, +} + +#[derive(serde::Serialize)] +struct Counter { + labels: Vec<(String, String)>, + value: u64, +} + +#[derive(serde::Serialize)] +struct Gauge { + labels: Vec<(String, String)>, + value: f64, +} + +#[derive(serde::Serialize)] +struct Histogram { + labels: Vec<(String, String)>, + value: Vec<(f64, Option)>, +} + +#[derive(serde::Serialize)] +pub(crate) struct Snapshot { + counters: HashMap>, + gauges: HashMap>, + histograms: HashMap>, +} + +fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) { + let labels = key + .labels() + .into_iter() + .map(|label| (label.key().to_string(), label.value().to_string())) + .collect(); + let name = key.name().to_string(); + (name, labels) +} + +impl Inner { + fn snapshot_counters(&self) -> HashMap> { + let mut counters = HashMap::new(); + + for (key, counter) in self.registry.get_counter_handles() { + let gen = counter.get_generation(); + if !self.recency.should_store_counter(&key, gen, &self.registry) { + continue; + } + + let (name, labels) = key_to_parts(&key); + let value = counter.get_inner().load(Ordering::Acquire); + counters + .entry(name) + .or_insert_with(Vec::new) + .push(Counter { labels, value }); + } + + counters + } + + fn snapshot_gauges(&self) -> HashMap> { + let mut gauges = HashMap::new(); + + for (key, gauge) in self.registry.get_gauge_handles() { + let gen = gauge.get_generation(); + if !self.recency.should_store_gauge(&key, gen, &self.registry) { + continue; + } + + let (name, labels) = key_to_parts(&key); + let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire)); + gauges + .entry(name) + .or_insert_with(Vec::new) + .push(Gauge { labels, value }) + } + + gauges + } + + fn snapshot_histograms(&self) -> HashMap> { + for (key, histogram) in self.registry.get_histogram_handles() { + let gen = histogram.get_generation(); + let (name, labels) = key_to_parts(&key); + + if !self + .recency + .should_store_histogram(&key, gen, &self.registry) + { + let delete_by_name = if let Some(mut by_name) = self.distributions.get_mut(&name) { + by_name.remove(&labels); + by_name.is_empty() + } else { + false + }; + + if delete_by_name { + self.descriptions.remove(&name); + } + + continue; + } + + let mut outer_entry = self + .distributions + .entry(name.clone()) + .or_insert_with(IndexMap::new); + + let entry = outer_entry + .entry(labels) + .or_insert_with(Summary::with_defaults); + + histogram.get_inner().clear_with(|samples| { + for sample in samples { + entry.add(*sample); + } + }) + } + + self.distributions + .iter() + .map(|entry| { + ( + entry.key().clone(), + entry + .value() + .iter() + .map(|(labels, summary)| Histogram { + labels: labels.clone(), + value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0] + .into_iter() + .map(|q| (q, summary.quantile(q))) + .collect(), + }) + .collect(), + ) + }) + .collect() + } + + fn snapshot(&self) -> Snapshot { + Snapshot { + counters: self.snapshot_counters(), + gauges: self.snapshot_gauges(), + histograms: self.snapshot_histograms(), + } + } +} + +impl MemoryCollector { + pub(crate) fn new() -> Self { + MemoryCollector { + inner: Arc::new(Inner { + descriptions: Default::default(), + distributions: Default::default(), + recency: Recency::new( + Clock::new(), + MetricKindMask::ALL, + Some(Duration::from_secs(5 * DAYS)), + ), + registry: Registry::new(GenerationalStorage::atomic()), + }), + } + } + + pub(crate) fn install(&self) -> Result<(), SetRecorderError> { + metrics::set_boxed_recorder(Box::new(self.clone())) + } + + pub(crate) fn snapshot(&self) -> Snapshot { + self.inner.snapshot() + } + + fn add_description_if_missing( + &self, + key: &metrics::KeyName, + description: metrics::SharedString, + ) { + self.inner + .descriptions + .entry(key.as_str().to_owned()) + .or_insert(description); + } +} + +impl Recorder for MemoryCollector { + fn describe_counter( + &self, + key: metrics::KeyName, + _: Option, + description: metrics::SharedString, + ) { + self.add_description_if_missing(&key, description) + } + + fn describe_gauge( + &self, + key: metrics::KeyName, + _: Option, + description: metrics::SharedString, + ) { + self.add_description_if_missing(&key, description) + } + + fn describe_histogram( + &self, + key: metrics::KeyName, + _: Option, + description: metrics::SharedString, + ) { + self.add_description_if_missing(&key, description) + } + + fn register_counter(&self, key: &Key) -> metrics::Counter { + self.inner + .registry + .get_or_create_counter(key, |c| c.clone().into()) + } + + fn register_gauge(&self, key: &Key) -> metrics::Gauge { + self.inner + .registry + .get_or_create_gauge(key, |c| c.clone().into()) + } + + fn register_histogram(&self, key: &Key) -> metrics::Histogram { + self.inner + .registry + .get_or_create_histogram(key, |c| c.clone().into()) + } +} + +/* +struct Bucket { + begin: Instant, + summary: Summary, +} + +pub(crate) struct RollingSummary { + buckets: Vec, + bucket_duration: Duration, + expire_after: Duration, + count: usize, +} + +impl Default for RollingSummary { + fn default() -> Self { + Self::new( + Duration::from_secs(5 * MINUTES), + Duration::from_secs(1 * DAYS), + ) + } +} + +impl RollingSummary { + fn new(bucket_duration: Duration, expire_after: Duration) -> Self { + Self { + buckets: Vec::new(), + bucket_duration, + expire_after, + count: 0, + } + } + + fn add(&mut self, value: f64, now: Instant) { + self.count += 1; + + // try adding to existing bucket + for bucket in &mut self.buckets { + let end = bucket.begin + self.bucket_duration; + + if now >= end { + break; + } + + if now >= bucket.begin { + bucket.summary.add(value); + return; + } + } + + // if we're adding a new bucket, clean old buckets first + if let Some(cutoff) = now.checked_sub(self.expire_after) { + self.buckets.retain(|b| b.begin > cutoff); + } + + let mut summary = Summary::with_defaults(); + summary.add(value); + + // if there's no buckets, make one and return + if self.buckets.is_empty() { + self.buckets.push(Bucket { + summary, + begin: now, + }); + return; + } + + let mut begin = self.buckets[0].begin; + + // there are buckets, but none can hold our value, see why + if now < self.buckets[0].begin { + // create an old bucket + + while now < begin { + begin -= self.bucket_duration; + } + + self.buckets.push(Bucket { begin, summary }); + self.buckets.sort_unstable_by(|a, b| b.begin.cmp(&a.begin)); + } else { + // create a new bucket + let mut end = self.buckets[0].begin + self.bucket_duration; + + while now >= end { + begin += self.bucket_duration; + end += self.bucket_duration; + } + + self.buckets.insert(0, Bucket { begin, summary }); + } + } + + fn snapshot(&self, now: Instant) -> Summary { + let cutoff = now.checked_sub(self.expire_after); + let mut acc = Summary::with_defaults(); + + let summaries = self + .buckets + .iter() + .filter(|b| cutoff.map(|c| b.begin > c).unwrap_or(true)) + .map(|b| &b.summary); + + for item in summaries { + acc.merge(item) + .expect("All summaries are created with default settings"); + } + + acc + } +} +*/ diff --git a/src/main.rs b/src/main.rs index 1d80a0d..9aebdb9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,6 +15,7 @@ use tracing_subscriber::{filter::Targets, fmt::format::FmtSpan, layer::Subscribe mod admin; mod apub; mod args; +mod collector; mod config; mod data; mod db; @@ -98,6 +99,8 @@ async fn main() -> Result<(), anyhow::Error> { let config = Config::build()?; init_subscriber(Config::software_name(), config.opentelemetry_url())?; + let collector = collector::MemoryCollector::new(); + collector.install()?; let args = Args::new(); @@ -164,7 +167,8 @@ async fn main() -> Result<(), anyhow::Error> { .app_data(web::Data::new(actors.clone())) .app_data(web::Data::new(config.clone())) .app_data(web::Data::new(job_server.clone())) - .app_data(web::Data::new(media.clone())); + .app_data(web::Data::new(media.clone())) + .app_data(web::Data::new(collector.clone())); let app = if let Some(data) = config.admin_config() { app.app_data(data) @@ -203,7 +207,8 @@ async fn main() -> Result<(), anyhow::Error> { .route("/unblock", web::post().to(admin::routes::unblock)) .route("/allowed", web::get().to(admin::routes::allowed)) .route("/blocked", web::get().to(admin::routes::blocked)) - .route("/connected", web::get().to(admin::routes::connected)), + .route("/connected", web::get().to(admin::routes::connected)) + .route("/stats", web::get().to(admin::routes::stats)), ), ) }) From f892a50f2c5299a85f990d83dae1a9e707f1a5bf Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 17:45:01 -0600 Subject: [PATCH 05/60] Add metrics printer --- Cargo.lock | 8 ++++---- src/admin/client.rs | 5 +++++ src/args.rs | 9 ++++++++- src/collector.rs | 8 ++++---- src/config.rs | 3 +++ src/main.rs | 5 +++++ 6 files changed, 29 insertions(+), 9 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 694caf4..e05d130 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -499,9 +499,9 @@ dependencies = [ [[package]] name = "background-jobs-actix" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea433afb3fbbb6dc2614bad9f6671517f6cffcd523981e568cdb595dc7fa6399" +checksum = "3aba1bc1cdff87161a6e2e00bf9dc1712412ee926a065d96cc0a03dc851b5fd3" dependencies = [ "actix-rt", "anyhow", @@ -520,9 +520,9 @@ dependencies = [ [[package]] name = "background-jobs-core" -version = "0.14.0" +version = "0.14.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b3071bf7a5e46638085076957dc189b2b0b147cd279eb09510b7af54e95085ef" +checksum = "1274e49ae8eff1fc6b4943660e59ce2f2e13e65a23a707924a50a40c7b94fc4d" dependencies = [ "actix-rt", "anyhow", diff --git a/src/admin/client.rs b/src/admin/client.rs index f63151f..3602487 100644 --- a/src/admin/client.rs +++ b/src/admin/client.rs @@ -1,5 +1,6 @@ use crate::{ admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains}, + collector::Snapshot, config::{AdminUrlKind, Config}, error::{Error, ErrorKind}, }; @@ -50,6 +51,10 @@ pub(crate) async fn connected(client: &Client, config: &Config) -> Result Result { + get_results(client, config, AdminUrlKind::Stats).await +} + async fn get_results( client: &Client, config: &Config, diff --git a/src/args.rs b/src/args.rs index 6b6c054..18a4059 100644 --- a/src/args.rs +++ b/src/args.rs @@ -14,11 +14,14 @@ pub(crate) struct Args { #[arg(short, long, help = "List allowed and blocked domains")] list: bool, + + #[arg(short, long, help = "Get statistics from the server")] + stats: bool, } impl Args { pub(crate) fn any(&self) -> bool { - !self.blocks.is_empty() || !self.allowed.is_empty() || self.list + !self.blocks.is_empty() || !self.allowed.is_empty() || self.list || self.stats } pub(crate) fn new() -> Self { @@ -40,4 +43,8 @@ impl Args { pub(crate) fn list(&self) -> bool { self.list } + + pub(crate) fn stats(&self) -> bool { + self.stats + } } diff --git a/src/collector.rs b/src/collector.rs index 401307b..dc5d179 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -29,25 +29,25 @@ struct Inner { registry: Registry>, } -#[derive(serde::Serialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize)] struct Counter { labels: Vec<(String, String)>, value: u64, } -#[derive(serde::Serialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize)] struct Gauge { labels: Vec<(String, String)>, value: f64, } -#[derive(serde::Serialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize)] struct Histogram { labels: Vec<(String, String)>, value: Vec<(f64, Option)>, } -#[derive(serde::Serialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize)] pub(crate) struct Snapshot { counters: HashMap>, gauges: HashMap>, diff --git a/src/config.rs b/src/config.rs index 284f807..1bb4f19 100644 --- a/src/config.rs +++ b/src/config.rs @@ -77,6 +77,7 @@ pub enum AdminUrlKind { Allowed, Blocked, Connected, + Stats, } impl std::fmt::Debug for Config { @@ -338,6 +339,8 @@ impl Config { .try_resolve(IriRelativeStr::new("api/v1/admin/blocked")?.as_ref())?, AdminUrlKind::Connected => FixedBaseResolver::new(self.base_uri.as_ref()) .try_resolve(IriRelativeStr::new("api/v1/admin/connected")?.as_ref())?, + AdminUrlKind::Stats => FixedBaseResolver::new(self.base_uri.as_ref()) + .try_resolve(IriRelativeStr::new("api/v1/admin/stats")?.as_ref())?, }; Ok(iri) diff --git a/src/main.rs b/src/main.rs index 9aebdb9..884e572 100644 --- a/src/main.rs +++ b/src/main.rs @@ -142,6 +142,11 @@ async fn main() -> Result<(), anyhow::Error> { println!("{report}"); } + if args.stats() { + let stats = admin::client::stats(&client, &config).await?; + println!("{:#?}", stats); + } + return Ok(()); } From 99c3ec0b75bb593a35acdfe733f7d099c389c771 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 18:26:47 -0600 Subject: [PATCH 06/60] Improve presentation of stats --- src/collector.rs | 184 +++++++++++++++++++++++++++++++++++++++++++---- src/main.rs | 2 +- 2 files changed, 172 insertions(+), 14 deletions(-) diff --git a/src/collector.rs b/src/collector.rs index dc5d179..e2ec44f 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -7,7 +7,7 @@ use metrics_util::{ }; use quanta::Clock; use std::{ - collections::HashMap, + collections::{BTreeMap, HashMap}, sync::{atomic::Ordering, Arc}, time::Duration, }; @@ -31,22 +31,74 @@ struct Inner { #[derive(Debug, serde::Deserialize, serde::Serialize)] struct Counter { - labels: Vec<(String, String)>, + labels: BTreeMap, value: u64, } +impl std::fmt::Display for Counter { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{}: {}", k, v)) + .collect::>() + .join(", "); + + write!(f, "{} - {}", labels, self.value) + } +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] struct Gauge { - labels: Vec<(String, String)>, + labels: BTreeMap, value: f64, } +impl std::fmt::Display for Gauge { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{}: {}", k, v)) + .collect::>() + .join(", "); + + write!(f, "{} - {}", labels, self.value) + } +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] struct Histogram { - labels: Vec<(String, String)>, + labels: BTreeMap, value: Vec<(f64, Option)>, } +impl std::fmt::Display for Histogram { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{}: {}", k, v)) + .collect::>() + .join(", "); + + let value = self + .value + .iter() + .map(|(k, v)| { + if let Some(v) = v { + format!("{}: {:.6}", k, v) + } else { + format!("{}: None,", k) + } + }) + .collect::>() + .join(", "); + + write!(f, "{} - {}", labels, value) + } +} + #[derive(Debug, serde::Deserialize, serde::Serialize)] pub(crate) struct Snapshot { counters: HashMap>, @@ -54,6 +106,112 @@ pub(crate) struct Snapshot { histograms: HashMap>, } +const PAIRS: [((&str, &str), &str); 2] = [ + ( + ( + "background-jobs.worker.started", + "background-jobs.worker.finished", + ), + "background-jobs.worker.running", + ), + ( + ( + "background-jobs.job.started", + "background-jobs.job.finished", + ), + "background-jobs.job.running", + ), +]; + +#[derive(Default)] +struct MergeCounter { + start: Option, + finish: Option, +} + +impl MergeCounter { + fn merge(self) -> Option { + match (self.start, self.finish) { + (Some(start), Some(end)) => Some(Counter { + labels: start.labels, + value: start.value.saturating_sub(end.value), + }), + (Some(only), None) | (None, Some(only)) => Some(Counter { + labels: only.labels, + value: 0, + }), + (None, None) => None, + } + } +} + +impl Snapshot { + pub(crate) fn present(self) { + if !self.counters.is_empty() { + println!("Counters"); + let mut merging = HashMap::new(); + for (key, counters) in self.counters { + if let Some(((start, _), name)) = PAIRS + .iter() + .find(|((start, finish), _)| *start == key || *finish == key) + { + let entry = merging.entry(name).or_insert_with(HashMap::new); + + for counter in counters { + let mut merge_counter = entry + .entry(counter.labels.clone()) + .or_insert_with(MergeCounter::default); + if key == *start { + merge_counter.start = Some(counter); + } else { + merge_counter.finish = Some(counter); + } + } + + continue; + } + + println!("\t{}", key); + for counter in counters { + println!("\t\t{}", counter); + } + } + + for (key, counters) in merging { + println!("\t{}", key); + + for (_, counter) in counters { + if let Some(counter) = counter.merge() { + println!("\t\t{}", counter); + } + } + } + } + + if !self.gauges.is_empty() { + println!("Gauges"); + for (key, gauges) in self.gauges { + println!("\t{}", key); + + for gauge in gauges { + println!("\t\t{}", gauge); + } + } + } + + if !self.histograms.is_empty() { + println!("Histograms"); + for (key, histograms) in self.histograms { + println!("\t{}", key); + + for histogram in histograms { + println!("\t\t{}", histogram); + } + } + } + } +} + fn key_to_parts(key: &Key) -> (String, Vec<(String, String)>) { let labels = key .labels() @@ -76,10 +234,10 @@ impl Inner { let (name, labels) = key_to_parts(&key); let value = counter.get_inner().load(Ordering::Acquire); - counters - .entry(name) - .or_insert_with(Vec::new) - .push(Counter { labels, value }); + counters.entry(name).or_insert_with(Vec::new).push(Counter { + labels: labels.into_iter().collect(), + value, + }); } counters @@ -96,10 +254,10 @@ impl Inner { let (name, labels) = key_to_parts(&key); let value = f64::from_bits(gauge.get_inner().load(Ordering::Acquire)); - gauges - .entry(name) - .or_insert_with(Vec::new) - .push(Gauge { labels, value }) + gauges.entry(name).or_insert_with(Vec::new).push(Gauge { + labels: labels.into_iter().collect(), + value, + }) } gauges @@ -153,7 +311,7 @@ impl Inner { .value() .iter() .map(|(labels, summary)| Histogram { - labels: labels.clone(), + labels: labels.iter().cloned().collect(), value: [0.001, 0.01, 0.05, 0.1, 0.5, 0.9, 0.99, 1.0] .into_iter() .map(|q| (q, summary.quantile(q))) diff --git a/src/main.rs b/src/main.rs index 884e572..adeff75 100644 --- a/src/main.rs +++ b/src/main.rs @@ -144,7 +144,7 @@ async fn main() -> Result<(), anyhow::Error> { if args.stats() { let stats = admin::client::stats(&client, &config).await?; - println!("{:#?}", stats); + stats.present(); } return Ok(()); From 9a9d09c0c479e1c4f31f03e5974db1a2ac7a4d43 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 18:27:16 -0600 Subject: [PATCH 07/60] Bump version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e05d130..2489f13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,7 +279,7 @@ checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "ap-relay" -version = "0.3.50" +version = "0.3.51" dependencies = [ "activitystreams", "activitystreams-ext", diff --git a/Cargo.toml b/Cargo.toml index 05753fc..e3a903a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "ap-relay" description = "A simple activitypub relay" -version = "0.3.50" +version = "0.3.51" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" From 4e1a782bea0f635d07ecafd8f6aa25c6e454fa62 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 18:57:34 -0600 Subject: [PATCH 08/60] Fix merge --- src/collector.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/collector.rs b/src/collector.rs index e2ec44f..57cfd5c 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -136,7 +136,8 @@ impl MergeCounter { labels: start.labels, value: start.value.saturating_sub(end.value), }), - (Some(only), None) | (None, Some(only)) => Some(Counter { + (Some(only), None) => Some(only), + (None, Some(only)) => Some(Counter { labels: only.labels, value: 0, }), From cecc35ae85490e024b2865e10739e64d31de0e2d Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 20:35:45 -0600 Subject: [PATCH 09/60] Add timings metrics middleware --- src/main.rs | 3 +- src/middleware.rs | 2 ++ src/middleware/timings.rs | 72 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 1 deletion(-) create mode 100644 src/middleware/timings.rs diff --git a/src/main.rs b/src/main.rs index adeff75..0b309e2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,7 +33,7 @@ use self::{ data::{ActorCache, MediaCache, State}, db::Db, jobs::create_workers, - middleware::{DebugPayload, RelayResolver}, + middleware::{DebugPayload, RelayResolver, Timings}, routes::{actor, inbox, index, nodeinfo, nodeinfo_meta, statics}, }; @@ -182,6 +182,7 @@ async fn main() -> Result<(), anyhow::Error> { }; app.wrap(TracingLogger::default()) + .wrap(Timings) .service(web::resource("/").route(web::get().to(index))) .service(web::resource("/media/{path}").route(web::get().to(routes::media))) .service( diff --git a/src/middleware.rs b/src/middleware.rs index e11344d..93d6a68 100644 --- a/src/middleware.rs +++ b/src/middleware.rs @@ -1,7 +1,9 @@ mod payload; +mod timings; mod verifier; mod webfinger; pub(crate) use payload::DebugPayload; +pub(crate) use timings::Timings; pub(crate) use verifier::MyVerify; pub(crate) use webfinger::RelayResolver; diff --git a/src/middleware/timings.rs b/src/middleware/timings.rs new file mode 100644 index 0000000..4a0f2d5 --- /dev/null +++ b/src/middleware/timings.rs @@ -0,0 +1,72 @@ +use actix_web::dev::{Service, ServiceRequest, Transform}; +use futures_util::future::LocalBoxFuture; +use std::{ + future::{ready, Ready}, + time::Instant, +}; + +pub(crate) struct Timings; +pub(crate) struct TimingsMiddleware(S); + +struct LogOnDrop { + begin: Instant, + path: String, + method: String, +} + +impl Drop for LogOnDrop { + fn drop(&mut self) { + let duration = self.begin.elapsed(); + metrics::histogram!("relay.request.complete", duration, "path" => self.path.clone(), "method" => self.method.clone()); + } +} + +impl Transform for Timings +where + S: Service, + S::Future: 'static, +{ + type Response = S::Response; + type Error = S::Error; + type InitError = (); + type Transform = TimingsMiddleware; + type Future = Ready>; + + fn new_transform(&self, service: S) -> Self::Future { + ready(Ok(TimingsMiddleware(service))) + } +} + +impl Service for TimingsMiddleware +where + S: Service, + S::Future: 'static, +{ + type Response = S::Response; + type Error = S::Error; + type Future = LocalBoxFuture<'static, Result>; + + fn poll_ready( + &self, + ctx: &mut core::task::Context<'_>, + ) -> std::task::Poll> { + self.0.poll_ready(ctx) + } + + fn call(&self, req: ServiceRequest) -> Self::Future { + let logger = LogOnDrop { + begin: Instant::now(), + path: req.path().to_string(), + method: req.method().to_string(), + }; + let fut = self.0.call(req); + + Box::pin(async move { + let res = fut.await; + + drop(logger); + + res + }) + } +} From 8fa24aa2439f1d5c4671b2e06dbc5476522db040 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 20:36:24 -0600 Subject: [PATCH 10/60] Bump version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2489f13..d9a6a5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,7 +279,7 @@ checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "ap-relay" -version = "0.3.51" +version = "0.3.52" dependencies = [ "activitystreams", "activitystreams-ext", diff --git a/Cargo.toml b/Cargo.toml index e3a903a..40d5613 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "ap-relay" description = "A simple activitypub relay" -version = "0.3.51" +version = "0.3.52" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" From 95f98ec052b0aa7e92a6d61c1c1aff51b14e21c7 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 20:40:17 -0600 Subject: [PATCH 11/60] Update readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 1506ca2..d250058 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ $ sudo docker run --rm -it \ -e ADDR=0.0.0.0 \ -e SLED_PATH=/mnt/sled/db-0.34 \ -p 8080:8080 \ - asonix/relay:0.3.23 + asonix/relay:0.3.52 ``` This will launch the relay with the database stored in "./sled/db-0.34" and listening on port 8080 #### Cargo From 787c8312bc19c842c7294a6c35ba47bbf67835d7 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 21:32:45 -0600 Subject: [PATCH 12/60] Make better use of cores for jobs --- src/jobs.rs | 18 +++++++++++++----- src/jobs/apub/announce.rs | 1 + src/jobs/apub/follow.rs | 1 + src/jobs/apub/forward.rs | 1 + src/jobs/apub/reject.rs | 1 + src/jobs/apub/undo.rs | 1 + src/jobs/contact.rs | 1 + src/jobs/deliver.rs | 1 + src/jobs/deliver_many.rs | 1 + src/jobs/instance.rs | 1 + src/jobs/nodeinfo.rs | 1 + src/jobs/process_listeners.rs | 1 + 12 files changed, 24 insertions(+), 5 deletions(-) diff --git a/src/jobs.rs b/src/jobs.rs index 6dfdbe9..5327ed0 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -7,8 +7,8 @@ mod nodeinfo; mod process_listeners; pub(crate) use self::{ - contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, - instance::QueryInstance, nodeinfo::QueryNodeinfo, + contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, + nodeinfo::QueryNodeinfo, }; use crate::{ @@ -22,7 +22,7 @@ use background_jobs::{ memory_storage::{ActixTimer, Storage}, Job, Manager, QueueHandle, WorkerConfig, }; -use std::time::Duration; +use std::{convert::TryFrom, num::NonZeroUsize, time::Duration}; fn debug_object(activity: &serde_json::Value) -> &serde_json::Value { let mut object = &activity["object"]["type"]; @@ -44,6 +44,12 @@ pub(crate) fn create_workers( media: MediaCache, config: Config, ) -> (Manager, JobServer) { + let parallelism = std::thread::available_parallelism() + .map(|p| p.get()) + .unwrap_or(1) as u64; + + let parallelism = (parallelism / 2).max(1); + let shared = WorkerConfig::new_managed(Storage::new(ActixTimer), move |queue_handle| { JobState::new( state.clone(), @@ -64,8 +70,10 @@ pub(crate) fn create_workers( .register::() .register::() .register::() - .set_worker_count("default", 16) - .start(); + .set_worker_count("maintenance", parallelism) + .set_worker_count("apub", parallelism) + .set_worker_count("deliver", parallelism * 3) + .start_with_threads(NonZeroUsize::try_from(parallelism as usize).expect("nonzero")); shared.every(Duration::from_secs(60 * 5), Listeners); diff --git a/src/jobs/apub/announce.rs b/src/jobs/apub/announce.rs index 26048c1..93dec67 100644 --- a/src/jobs/apub/announce.rs +++ b/src/jobs/apub/announce.rs @@ -67,6 +67,7 @@ impl ActixJob for Announce { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::apub::Announce"; + const QUEUE: &'static str = "apub"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/apub/follow.rs b/src/jobs/apub/follow.rs index d78b544..10cae22 100644 --- a/src/jobs/apub/follow.rs +++ b/src/jobs/apub/follow.rs @@ -116,6 +116,7 @@ impl ActixJob for Follow { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::apub::Follow"; + const QUEUE: &'static str = "apub"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/apub/forward.rs b/src/jobs/apub/forward.rs index 1f72a32..f5e191b 100644 --- a/src/jobs/apub/forward.rs +++ b/src/jobs/apub/forward.rs @@ -52,6 +52,7 @@ impl ActixJob for Forward { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::apub::Forward"; + const QUEUE: &'static str = "apub"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/apub/reject.rs b/src/jobs/apub/reject.rs index f6ee0e7..2384426 100644 --- a/src/jobs/apub/reject.rs +++ b/src/jobs/apub/reject.rs @@ -38,6 +38,7 @@ impl ActixJob for Reject { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::apub::Reject"; + const QUEUE: &'static str = "apub"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/apub/undo.rs b/src/jobs/apub/undo.rs index 0359bf2..b55d4ae 100644 --- a/src/jobs/apub/undo.rs +++ b/src/jobs/apub/undo.rs @@ -53,6 +53,7 @@ impl ActixJob for Undo { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::apub::Undo"; + const QUEUE: &'static str = "apub"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/contact.rs b/src/jobs/contact.rs index 288941a..a98ac8f 100644 --- a/src/jobs/contact.rs +++ b/src/jobs/contact.rs @@ -86,6 +86,7 @@ impl ActixJob for QueryContact { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::QueryContact"; + const QUEUE: &'static str = "maintenance"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/deliver.rs b/src/jobs/deliver.rs index 9c01d8d..223608e 100644 --- a/src/jobs/deliver.rs +++ b/src/jobs/deliver.rs @@ -55,6 +55,7 @@ impl ActixJob for Deliver { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::Deliver"; + const QUEUE: &'static str = "deliver"; const BACKOFF: Backoff = Backoff::Exponential(8); fn run(self, state: Self::State) -> Self::Future { diff --git a/src/jobs/deliver_many.rs b/src/jobs/deliver_many.rs index 58cdc1f..fc9107c 100644 --- a/src/jobs/deliver_many.rs +++ b/src/jobs/deliver_many.rs @@ -50,6 +50,7 @@ impl ActixJob for DeliverMany { type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>; const NAME: &'static str = "relay::jobs::DeliverMany"; + const QUEUE: &'static str = "deliver"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index d3e2719..4339845 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -110,6 +110,7 @@ impl ActixJob for QueryInstance { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::QueryInstance"; + const QUEUE: &'static str = "maintenance"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/nodeinfo.rs b/src/jobs/nodeinfo.rs index dbe2717..fe86ad8 100644 --- a/src/jobs/nodeinfo.rs +++ b/src/jobs/nodeinfo.rs @@ -100,6 +100,7 @@ impl ActixJob for QueryNodeinfo { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::QueryNodeinfo"; + const QUEUE: &'static str = "maintenance"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) diff --git a/src/jobs/process_listeners.rs b/src/jobs/process_listeners.rs index bba4e00..1cad2e4 100644 --- a/src/jobs/process_listeners.rs +++ b/src/jobs/process_listeners.rs @@ -28,6 +28,7 @@ impl ActixJob for Listeners { type Future = Pin>>>; const NAME: &'static str = "relay::jobs::Listeners"; + const QUEUE: &'static str = "maintenance"; fn run(self, state: Self::State) -> Self::Future { Box::pin(async move { self.perform(state).await.map_err(Into::into) }) From 8d0d39b1fca81b1748f090ab8c9deeee79027e3b Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 21:33:09 -0600 Subject: [PATCH 13/60] Bump version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d9a6a5e..42a9e9a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,7 +279,7 @@ checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "ap-relay" -version = "0.3.52" +version = "0.3.53" dependencies = [ "activitystreams", "activitystreams-ext", diff --git a/Cargo.toml b/Cargo.toml index 40d5613..389a7c1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "ap-relay" description = "A simple activitypub relay" -version = "0.3.52" +version = "0.3.53" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" From 9272ba0d4c5d7f9c565b1da9f997ba2a7050ea74 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 21:51:04 -0600 Subject: [PATCH 14/60] More logging when ending main --- src/main.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/main.rs b/src/main.rs index 0b309e2..bedd4b9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -94,6 +94,12 @@ fn init_subscriber( #[actix_rt::main] async fn main() -> Result<(), anyhow::Error> { + actix_rt::spawn(do_main()).await??; + tracing::warn!("Application exit"); + Ok(()) +} + +async fn do_main() -> Result<(), anyhow::Error> { dotenv::dotenv().ok(); let config = Config::build()?; @@ -222,8 +228,12 @@ async fn main() -> Result<(), anyhow::Error> { .run() .await?; + tracing::warn!("Server closed"); + drop(manager); + tracing::warn!("Main complete"); + Ok(()) } From 4267f52a7eeffcc005ff3e9d581e71072ba6a6a1 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 21:52:06 -0600 Subject: [PATCH 15/60] Bump deps --- Cargo.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 42a9e9a..f3ed4e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -499,9 +499,9 @@ dependencies = [ [[package]] name = "background-jobs-actix" -version = "0.14.1" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3aba1bc1cdff87161a6e2e00bf9dc1712412ee926a065d96cc0a03dc851b5fd3" +checksum = "99f8bfe0a984c8d0bc7e67b376cc05e0b9015fdd3ee878900046120ef781c47e" dependencies = [ "actix-rt", "anyhow", From 9ede941ff7a1394f8755ce787d69e847d0236f92 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 22:38:58 -0600 Subject: [PATCH 16/60] Increase concurrency --- src/jobs.rs | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/jobs.rs b/src/jobs.rs index 5327ed0..b82d820 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -48,8 +48,6 @@ pub(crate) fn create_workers( .map(|p| p.get()) .unwrap_or(1) as u64; - let parallelism = (parallelism / 2).max(1); - let shared = WorkerConfig::new_managed(Storage::new(ActixTimer), move |queue_handle| { JobState::new( state.clone(), @@ -70,9 +68,9 @@ pub(crate) fn create_workers( .register::() .register::() .register::() - .set_worker_count("maintenance", parallelism) - .set_worker_count("apub", parallelism) - .set_worker_count("deliver", parallelism * 3) + .set_worker_count("maintenance", parallelism * 2) + .set_worker_count("apub", parallelism * 2) + .set_worker_count("deliver", parallelism * 8) .start_with_threads(NonZeroUsize::try_from(parallelism as usize).expect("nonzero")); shared.every(Duration::from_secs(60 * 5), Listeners); From a154fbb5041f1a31f7f5a2ecdbc6f5fe25c00419 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 22:39:27 -0600 Subject: [PATCH 17/60] Bump version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f3ed4e9..42c17c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,7 +279,7 @@ checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "ap-relay" -version = "0.3.53" +version = "0.3.54" dependencies = [ "activitystreams", "activitystreams-ext", diff --git a/Cargo.toml b/Cargo.toml index 389a7c1..871de87 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "ap-relay" description = "A simple activitypub relay" -version = "0.3.53" +version = "0.3.54" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" From 3500f85f446fd103e687c6d123dbd9be3604f1d3 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 23:35:00 -0600 Subject: [PATCH 18/60] Move blocking setup out of actix systems --- src/main.rs | 141 ++++++++++++++++++++++++++++++++-------------------- 1 file changed, 87 insertions(+), 54 deletions(-) diff --git a/src/main.rs b/src/main.rs index bedd4b9..2598ca2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,6 +3,7 @@ use activitystreams::iri_string::types::IriString; use actix_web::{web, App, HttpServer}; +use collector::MemoryCollector; #[cfg(feature = "console")] use console_subscriber::ConsoleLayer; use opentelemetry::{sdk::Resource, KeyValue}; @@ -92,84 +93,116 @@ fn init_subscriber( Ok(()) } -#[actix_rt::main] -async fn main() -> Result<(), anyhow::Error> { - actix_rt::spawn(do_main()).await??; - tracing::warn!("Application exit"); - Ok(()) -} - -async fn do_main() -> Result<(), anyhow::Error> { +fn main() -> Result<(), anyhow::Error> { dotenv::dotenv().ok(); let config = Config::build()?; init_subscriber(Config::software_name(), config.opentelemetry_url())?; - let collector = collector::MemoryCollector::new(); + let collector = MemoryCollector::new(); collector.install()?; let args = Args::new(); if args.any() { - let client = requests::build_client(&config.user_agent()); - - if !args.blocks().is_empty() || !args.allowed().is_empty() { - if args.undo() { - admin::client::unblock(&client, &config, args.blocks().to_vec()).await?; - admin::client::disallow(&client, &config, args.allowed().to_vec()).await?; - } else { - admin::client::block(&client, &config, args.blocks().to_vec()).await?; - admin::client::allow(&client, &config, args.allowed().to_vec()).await?; - } - println!("Updated lists"); - } - - if args.list() { - let (blocked, allowed, connected) = tokio::try_join!( - admin::client::blocked(&client, &config), - admin::client::allowed(&client, &config), - admin::client::connected(&client, &config) - )?; - - let mut report = String::from("Report:\n"); - if !allowed.allowed_domains.is_empty() { - report += "\nAllowed\n\t"; - report += &allowed.allowed_domains.join("\n\t"); - } - if !blocked.blocked_domains.is_empty() { - report += "\n\nBlocked\n\t"; - report += &blocked.blocked_domains.join("\n\t"); - } - if !connected.connected_actors.is_empty() { - report += "\n\nConnected\n\t"; - report += &connected.connected_actors.join("\n\t"); - } - report += "\n"; - println!("{report}"); - } - - if args.stats() { - let stats = admin::client::stats(&client, &config).await?; - stats.present(); - } - - return Ok(()); + return client_main(config, args); } let db = Db::build(&config)?; - let media = MediaCache::new(db.clone()); - let state = State::build(db.clone()).await?; let actors = ActorCache::new(db.clone()); + let media = MediaCache::new(db.clone()); + server_main(db, actors, media, collector, config)?; + + tracing::warn!("Application exit"); + + Ok(()) +} + +#[actix_rt::main] +async fn client_main(config: Config, args: Args) -> Result<(), anyhow::Error> { + actix_rt::spawn(do_client_main(config, args)).await? +} + +async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> { + let client = requests::build_client(&config.user_agent()); + + if !args.blocks().is_empty() || !args.allowed().is_empty() { + if args.undo() { + admin::client::unblock(&client, &config, args.blocks().to_vec()).await?; + admin::client::disallow(&client, &config, args.allowed().to_vec()).await?; + } else { + admin::client::block(&client, &config, args.blocks().to_vec()).await?; + admin::client::allow(&client, &config, args.allowed().to_vec()).await?; + } + println!("Updated lists"); + } + + if args.list() { + let (blocked, allowed, connected) = tokio::try_join!( + admin::client::blocked(&client, &config), + admin::client::allowed(&client, &config), + admin::client::connected(&client, &config) + )?; + + let mut report = String::from("Report:\n"); + if !allowed.allowed_domains.is_empty() { + report += "\nAllowed\n\t"; + report += &allowed.allowed_domains.join("\n\t"); + } + if !blocked.blocked_domains.is_empty() { + report += "\n\nBlocked\n\t"; + report += &blocked.blocked_domains.join("\n\t"); + } + if !connected.connected_actors.is_empty() { + report += "\n\nConnected\n\t"; + report += &connected.connected_actors.join("\n\t"); + } + report += "\n"; + println!("{report}"); + } + + if args.stats() { + let stats = admin::client::stats(&client, &config).await?; + stats.present(); + } + + return Ok(()); +} + +#[actix_rt::main] +async fn server_main( + db: Db, + actors: ActorCache, + media: MediaCache, + collector: MemoryCollector, + config: Config, +) -> Result<(), anyhow::Error> { + actix_rt::spawn(do_server_main(db, actors, media, collector, config)).await? +} + +async fn do_server_main( + db: Db, + actors: ActorCache, + media: MediaCache, + collector: MemoryCollector, + config: Config, +) -> Result<(), anyhow::Error> { + tracing::warn!("Creating state"); + let state = State::build(db.clone()).await?; + + tracing::warn!("Creating workers"); let (manager, job_server) = create_workers(state.clone(), actors.clone(), media.clone(), config.clone()); if let Some((token, admin_handle)) = config.telegram_info() { + tracing::warn!("Creating telegram handler"); telegram::start(admin_handle.to_owned(), db.clone(), token); } let bind_address = config.bind_address(); + tracing::warn!("Binding to {}:{}", bind_address.0, bind_address.1); HttpServer::new(move || { let app = App::new() .app_data(web::Data::new(db.clone())) From 9f6e0bc7226b3c19e066a589574498354caa86b2 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 23:35:20 -0600 Subject: [PATCH 19/60] Bump version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 42c17c8..bfaea90 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,7 +279,7 @@ checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "ap-relay" -version = "0.3.54" +version = "0.3.55" dependencies = [ "activitystreams", "activitystreams-ext", diff --git a/Cargo.toml b/Cargo.toml index 871de87..5523069 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "ap-relay" description = "A simple activitypub relay" -version = "0.3.54" +version = "0.3.55" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" From 7ec56d2af29dbd0249db6e6265b4547b8d533c52 Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 23:44:35 -0600 Subject: [PATCH 20/60] clippy --- src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main.rs b/src/main.rs index 2598ca2..a40d4f5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -168,7 +168,7 @@ async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> stats.present(); } - return Ok(()); + Ok(()) } #[actix_rt::main] From d44db2eab509aeafc80acfd1a6be280ea21a8cef Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 19 Nov 2022 23:44:52 -0600 Subject: [PATCH 21/60] Bump version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bfaea90..feb891e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,7 +279,7 @@ checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "ap-relay" -version = "0.3.55" +version = "0.3.56" dependencies = [ "activitystreams", "activitystreams-ext", diff --git a/Cargo.toml b/Cargo.toml index 5523069..0956357 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "ap-relay" description = "A simple activitypub relay" -version = "0.3.55" +version = "0.3.56" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" From df3063e75fb96959d52a70516c9effa7352c2413 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 20 Nov 2022 12:06:10 -0600 Subject: [PATCH 22/60] Improve concurrency for larger systems --- src/jobs.rs | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/src/jobs.rs b/src/jobs.rs index b82d820..073dca8 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -44,9 +44,8 @@ pub(crate) fn create_workers( media: MediaCache, config: Config, ) -> (Manager, JobServer) { - let parallelism = std::thread::available_parallelism() - .map(|p| p.get()) - .unwrap_or(1) as u64; + let parallelism = + std::thread::available_parallelism().unwrap_or(NonZeroUsize::try_from(1).expect("nonzero")); let shared = WorkerConfig::new_managed(Storage::new(ActixTimer), move |queue_handle| { JobState::new( @@ -68,10 +67,10 @@ pub(crate) fn create_workers( .register::() .register::() .register::() - .set_worker_count("maintenance", parallelism * 2) - .set_worker_count("apub", parallelism * 2) - .set_worker_count("deliver", parallelism * 8) - .start_with_threads(NonZeroUsize::try_from(parallelism as usize).expect("nonzero")); + .set_worker_count("maintenance", 2) + .set_worker_count("apub", 2) + .set_worker_count("deliver", 8) + .start_with_threads(parallelism); shared.every(Duration::from_secs(60 * 5), Listeners); From 162dd1cb0e658e21ebc68182e07a9ec20ad78583 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 20 Nov 2022 12:07:27 -0600 Subject: [PATCH 23/60] Add more launch logging --- src/main.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main.rs b/src/main.rs index a40d4f5..ed24108 100644 --- a/src/main.rs +++ b/src/main.rs @@ -108,8 +108,10 @@ fn main() -> Result<(), anyhow::Error> { return client_main(config, args); } + tracing::warn!("Opening DB"); let db = Db::build(&config)?; + tracing::warn!("Building caches"); let actors = ActorCache::new(db.clone()); let media = MediaCache::new(db.clone()); From df70a28ca36c08bfc139a29ad38e77e7f2c1510d Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 20 Nov 2022 12:07:44 -0600 Subject: [PATCH 24/60] Bump version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index feb891e..da801b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,7 +279,7 @@ checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "ap-relay" -version = "0.3.56" +version = "0.3.57" dependencies = [ "activitystreams", "activitystreams-ext", diff --git a/Cargo.toml b/Cargo.toml index 0956357..1a85bad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "ap-relay" description = "A simple activitypub relay" -version = "0.3.56" +version = "0.3.57" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" From 13cd3083583c48252a53b260bb89b693170824b4 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 20 Nov 2022 12:09:17 -0600 Subject: [PATCH 25/60] clippy --- src/jobs.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/jobs.rs b/src/jobs.rs index 073dca8..296c641 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -45,7 +45,7 @@ pub(crate) fn create_workers( config: Config, ) -> (Manager, JobServer) { let parallelism = - std::thread::available_parallelism().unwrap_or(NonZeroUsize::try_from(1).expect("nonzero")); + std::thread::available_parallelism().unwrap_or_else(|_| NonZeroUsize::try_from(1).expect("nonzero")); let shared = WorkerConfig::new_managed(Storage::new(ActixTimer), move |queue_handle| { JobState::new( From efc918a8268d0955bd78d3cfd0d3f57dcea40964 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 20 Nov 2022 12:09:43 -0600 Subject: [PATCH 26/60] Update deps --- Cargo.lock | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index da801b6..04bd6b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -786,9 +786,9 @@ dependencies = [ [[package]] name = "crossbeam-epoch" -version = "0.9.11" +version = "0.9.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f916dfc5d356b0ed9dae65f1db9fc9770aa2851d2662b988ccf4fe3516e86348" +checksum = "96bf8df95e795db1a4aca2957ad884a2df35413b24bbeb3114422f3cc21498e8" dependencies = [ "autocfg", "cfg-if", @@ -799,9 +799,9 @@ dependencies = [ [[package]] name = "crossbeam-utils" -version = "0.8.12" +version = "0.8.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "edbafec5fa1f196ca66527c1b12c2ec4745ca14b50f1ad8f9f6f720b55d11fac" +checksum = "422f23e724af1240ec469ea1e834d87a4b59ce2efe2c6a96256b0c47e2fd86aa" dependencies = [ "cfg-if", ] @@ -1577,9 +1577,9 @@ checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] name = "memoffset" -version = "0.6.5" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" +checksum = "5de893c32cde5f383baa4c04c5d6dbdd735cfd4a794b0debdb2bb1b421da5ff4" dependencies = [ "autocfg", ] From a3eb785b9e7064944084aa23c316bd5a38fd265f Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 20 Nov 2022 16:25:27 -0600 Subject: [PATCH 27/60] Update defaults to be more prod friendly --- README.md | 6 +++--- src/config.rs | 4 ++-- src/main.rs | 2 +- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index d250058..1f08efe 100644 --- a/README.md +++ b/README.md @@ -112,15 +112,15 @@ Whether to print incoming activities to the console when requests hit the /inbox ##### `RESTRICTED_MODE` This setting enables an 'allowlist' setup where only servers that have been explicitly enabled through the `relay -a` command can join the relay. This is `false` by default. If `RESTRICTED_MODE` is not enabled, then manually allowing domains with `relay -a` has no effect. ##### `VALIDATE_SIGNATURES` -This setting enforces checking HTTP signatures on incoming activities. It defaults to `false` but should be set to `true` in production scenarios +This setting enforces checking HTTP signatures on incoming activities. It defaults to `true` ##### `HTTPS` -Whether the current server is running on an HTTPS port or not. This is used for generating URLs to the current running relay. By default it is set to `false`, but should be `true` in production scenarios. +Whether the current server is running on an HTTPS port or not. This is used for generating URLs to the current running relay. By default it is set to `true` ##### `PUBLISH_BLOCKS` Whether or not to publish a list of blocked domains in the `nodeinfo` metadata for the server. It defaults to `false`. ##### `SLED_PATH` Where to store the on-disk database of connected servers. This defaults to `./sled/db-0.34`. ##### `RUST_LOG` -The log level to print. Available levels are `ERROR`, `WARN`, `INFO`, `DEBUG`, and `TRACE`. You can also specify module paths to enable some logs but not others, such as `RUST_LOG=warn,tracing_actix_web=info,relay=info` +The log level to print. Available levels are `ERROR`, `WARN`, `INFO`, `DEBUG`, and `TRACE`. You can also specify module paths to enable some logs but not others, such as `RUST_LOG=warn,tracing_actix_web=info,relay=info`. This defaults to `warn` ##### `SOURCE_REPO` The URL to the source code for the relay. This defaults to `https://git.asonix.dog/asonix/relay`, but should be changed if you're running a fork hosted elsewhere. ##### `API_TOKEN` diff --git a/src/config.rs b/src/config.rs index 1bb4f19..49c24ba 100644 --- a/src/config.rs +++ b/src/config.rs @@ -112,8 +112,8 @@ impl Config { .set_default("port", 8080u64)? .set_default("debug", true)? .set_default("restricted_mode", false)? - .set_default("validate_signatures", false)? - .set_default("https", false)? + .set_default("validate_signatures", true)? + .set_default("https", true)? .set_default("publish_blocks", false)? .set_default("sled_path", "./sled/db-0-34")? .set_default("source_repo", "https://git.asonix.dog/asonix/relay")? diff --git a/src/main.rs b/src/main.rs index ed24108..cb500bb 100644 --- a/src/main.rs +++ b/src/main.rs @@ -45,7 +45,7 @@ fn init_subscriber( LogTracer::init()?; let targets: Targets = std::env::var("RUST_LOG") - .unwrap_or_else(|_| "info".into()) + .unwrap_or_else(|_| "warn".into()) .parse()?; let format_layer = tracing_subscriber::fmt::layer() From 5d33dba103dc2ae78b269d284c30ac9eac40fe39 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 20 Nov 2022 21:42:38 -0600 Subject: [PATCH 28/60] Add support for binding TLS --- Cargo.lock | 4 ++++ Cargo.toml | 6 +++++- README.md | 6 ++++++ src/config.rs | 50 +++++++++++++++++++++++++++++++++++++++++++++++++- src/jobs.rs | 4 ++-- src/main.rs | 28 ++++++++++++++++++++++------ 6 files changed, 88 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 04bd6b1..1944884 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -63,6 +63,7 @@ dependencies = [ "actix-codec", "actix-rt", "actix-service", + "actix-tls", "actix-utils", "ahash", "base64", @@ -192,6 +193,7 @@ dependencies = [ "actix-rt", "actix-server", "actix-service", + "actix-tls", "actix-utils", "ahash", "bytes", @@ -311,6 +313,8 @@ dependencies = [ "rsa", "rsa-magic-public-key", "ructe", + "rustls", + "rustls-pemfile", "serde", "serde_json", "sha2", diff --git a/Cargo.toml b/Cargo.toml index 1a85bad..98673e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,9 @@ default = [] [dependencies] anyhow = "1.0" actix-rt = "2.7.0" -actix-web = { version = "4.0.1", default-features = false } +actix-web = { version = "4.0.1", default-features = false, features = [ + "rustls", +] } actix-webfinger = "0.4.0" activitystreams = "0.7.0-alpha.19" activitystreams-ext = "0.1.0-alpha.2" @@ -48,6 +50,8 @@ quanta = "0.10.1" rand = "0.8" rsa = "0.7" rsa-magic-public-key = "0.6.0" +rustls = "0.20.7" +rustls-pemfile = "1.0.1" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" sha2 = { version = "0.10", features = ["oid"] } diff --git a/README.md b/README.md index 1f08efe..ed0957b 100644 --- a/README.md +++ b/README.md @@ -98,6 +98,8 @@ API_TOKEN=somepasswordishtoken OPENTELEMETRY_URL=localhost:4317 TELEGRAM_TOKEN=secret TELEGRAM_ADMIN_HANDLE=your_handle +TLS_KEY=/path/to/key +TLS_CERT=/path/to/cert ``` #### Descriptions @@ -131,6 +133,10 @@ A URL for exporting opentelemetry spans. This is mostly useful for debugging. Th A Telegram Bot Token for running the relay administration bot. There is no default. ##### `TELEGRAM_ADMIN_HANDLE` The handle of the telegram user allowed to administer the relay. There is no default. +##### `TLS_KEY` +Optional - This is specified if you are running the relay directly on the internet and have a TLS key to provide HTTPS for your relay +##### `TLS_CERT` +Optional - This is specified if you are running the relay directly on the internet and have a TLS certificate chain to provide HTTPS for your relay ### Subscribing Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings. diff --git a/src/config.rs b/src/config.rs index 49c24ba..308c0bf 100644 --- a/src/config.rs +++ b/src/config.rs @@ -14,8 +14,9 @@ use activitystreams::{ }; use config::Environment; use http_signature_normalization_actix::prelude::{VerifyDigest, VerifySignature}; +use rustls::{Certificate, PrivateKey}; use sha2::{Digest, Sha256}; -use std::{net::IpAddr, path::PathBuf}; +use std::{io::BufReader, net::IpAddr, path::PathBuf}; use uuid::Uuid; #[derive(Clone, Debug, serde::Deserialize)] @@ -34,6 +35,8 @@ pub(crate) struct ParsedConfig { telegram_token: Option, telegram_admin_handle: Option, api_token: Option, + tls_key: Option, + tls_cert: Option, } #[derive(Clone)] @@ -52,6 +55,13 @@ pub struct Config { telegram_token: Option, telegram_admin_handle: Option, api_token: Option, + tls: Option, +} + +#[derive(Clone)] +struct TlsConfig { + key: PathBuf, + cert: PathBuf, } #[derive(Debug)] @@ -100,6 +110,8 @@ impl std::fmt::Debug for Config { .field("telegram_token", &"[redacted]") .field("telegram_admin_handle", &self.telegram_admin_handle) .field("api_token", &"[redacted]") + .field("tls_key", &"[redacted]") + .field("tls_cert", &"[redacted]") .finish() } } @@ -121,6 +133,8 @@ impl Config { .set_default("telegram_token", None as Option<&str>)? .set_default("telegram_admin_handle", None as Option<&str>)? .set_default("api_token", None as Option<&str>)? + .set_default("tls_key", None as Option<&str>)? + .set_default("tls_cert", None as Option<&str>)? .add_source(Environment::default()) .build()?; @@ -129,6 +143,10 @@ impl Config { let scheme = if config.https { "https" } else { "http" }; let base_uri = iri!(format!("{}://{}", scheme, config.hostname)).into_absolute(); + let tls = config + .tls_key + .and_then(|key| config.tls_cert.map(|cert| TlsConfig { key, cert })); + Ok(Config { hostname: config.hostname, addr: config.addr, @@ -144,9 +162,39 @@ impl Config { telegram_token: config.telegram_token, telegram_admin_handle: config.telegram_admin_handle, api_token: config.api_token, + tls, }) } + pub(crate) fn open_keys(&self) -> Result, PrivateKey)>, Error> { + let tls = if let Some(tls) = &self.tls { + tls + } else { + return Ok(None); + }; + + let mut certs_reader = BufReader::new(std::fs::File::open(&tls.cert)?); + let certs = rustls_pemfile::certs(&mut certs_reader)?; + + let mut key_reader = BufReader::new(std::fs::File::open(&tls.key)?); + let keys = rustls_pemfile::read_all(&mut key_reader)?; + + let certs = certs.into_iter().map(Certificate).collect(); + + let key = if let Some(key) = keys.into_iter().find_map(|item| match item { + rustls_pemfile::Item::RSAKey(der) => Some(PrivateKey(der)), + rustls_pemfile::Item::PKCS8Key(der) => Some(PrivateKey(der)), + rustls_pemfile::Item::ECKey(der) => Some(PrivateKey(der)), + _ => None, + }) { + key + } else { + return Ok(None); + }; + + Ok(Some((certs, key))) + } + pub(crate) fn sled_path(&self) -> &PathBuf { &self.sled_path } diff --git a/src/jobs.rs b/src/jobs.rs index 296c641..014bd72 100644 --- a/src/jobs.rs +++ b/src/jobs.rs @@ -44,8 +44,8 @@ pub(crate) fn create_workers( media: MediaCache, config: Config, ) -> (Manager, JobServer) { - let parallelism = - std::thread::available_parallelism().unwrap_or_else(|_| NonZeroUsize::try_from(1).expect("nonzero")); + let parallelism = std::thread::available_parallelism() + .unwrap_or_else(|_| NonZeroUsize::try_from(1).expect("nonzero")); let shared = WorkerConfig::new_managed(Storage::new(ActixTimer), move |queue_handle| { JobState::new( diff --git a/src/main.rs b/src/main.rs index cb500bb..5045b0c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -8,6 +8,7 @@ use collector::MemoryCollector; use console_subscriber::ConsoleLayer; use opentelemetry::{sdk::Resource, KeyValue}; use opentelemetry_otlp::WithExportConfig; +use rustls::ServerConfig; use tracing_actix_web::TracingLogger; use tracing_error::ErrorLayer; use tracing_log::LogTracer; @@ -203,9 +204,10 @@ async fn do_server_main( telegram::start(admin_handle.to_owned(), db.clone(), token); } + let keys = config.open_keys()?; + let bind_address = config.bind_address(); - tracing::warn!("Binding to {}:{}", bind_address.0, bind_address.1); - HttpServer::new(move || { + let server = HttpServer::new(move || { let app = App::new() .app_data(web::Data::new(db.clone())) .app_data(web::Data::new(state.clone())) @@ -258,10 +260,24 @@ async fn do_server_main( .route("/stats", web::get().to(admin::routes::stats)), ), ) - }) - .bind(bind_address)? - .run() - .await?; + }); + + if let Some((certs, key)) = keys { + tracing::warn!("Binding to {}:{} with TLS", bind_address.0, bind_address.1); + let server_config = ServerConfig::builder() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_safe_default_protocol_versions()? + .with_no_client_auth() + .with_single_cert(certs, key)?; + server + .bind_rustls(bind_address, server_config)? + .run() + .await?; + } else { + tracing::warn!("Binding to {}:{}", bind_address.0, bind_address.1); + server.bind(bind_address)?.run().await?; + } tracing::warn!("Server closed"); From 981a6779bf640e5d626bae26a4a44311bd261e48 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 20 Nov 2022 21:42:59 -0600 Subject: [PATCH 29/60] Bump version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1944884..b17e7b9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -281,7 +281,7 @@ checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "ap-relay" -version = "0.3.57" +version = "0.3.58" dependencies = [ "activitystreams", "activitystreams-ext", diff --git a/Cargo.toml b/Cargo.toml index 98673e0..8c400c7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "ap-relay" description = "A simple activitypub relay" -version = "0.3.57" +version = "0.3.58" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" From 73cc4862d99fab1e004d8d94373e14f6c625b7c3 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 20 Nov 2022 21:43:09 -0600 Subject: [PATCH 30/60] Bump deps --- Cargo.lock | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b17e7b9..e693ecc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -626,9 +626,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.76" +version = "1.0.77" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76a284da2e6fe2092f2353e51713435363112dfd60030e22add80be333fb928f" +checksum = "e9f73505338f7d905b19d18738976aae232eb46b8efc15554ffc56deb5d9ebe4" [[package]] name = "cfg-if" @@ -1920,9 +1920,9 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.4.0" +version = "6.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5bf27447411e9ee3ff51186bf7a08e16c341efdde93f4d823e8844429bed7e" +checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" [[package]] name = "overload" From 205e794b9e914283a65ac8590cccc4bf711e1188 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 20 Nov 2022 22:46:20 -0600 Subject: [PATCH 31/60] Add more logging around TLS config issues --- src/config.rs | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/src/config.rs b/src/config.rs index 308c0bf..6f8db5a 100644 --- a/src/config.rs +++ b/src/config.rs @@ -143,9 +143,18 @@ impl Config { let scheme = if config.https { "https" } else { "http" }; let base_uri = iri!(format!("{}://{}", scheme, config.hostname)).into_absolute(); - let tls = config - .tls_key - .and_then(|key| config.tls_cert.map(|cert| TlsConfig { key, cert })); + let tls = match (config.tls_key, config.tls_cert) { + (Some(key), Some(cert)) => Some(TlsConfig { key, cert }), + (Some(_), None) => { + tracing::warn!("TLS_KEY is set but TLS_CERT isn't , not building TLS config"); + None + } + (None, Some(_)) => { + tracing::warn!("TLS_CERT is set but TLS_KEY isn't , not building TLS config"); + None + } + (None, None) => None, + }; Ok(Config { hostname: config.hostname, @@ -170,6 +179,7 @@ impl Config { let tls = if let Some(tls) = &self.tls { tls } else { + tracing::warn!("No TLS config present"); return Ok(None); }; @@ -177,18 +187,22 @@ impl Config { let certs = rustls_pemfile::certs(&mut certs_reader)?; let mut key_reader = BufReader::new(std::fs::File::open(&tls.key)?); - let keys = rustls_pemfile::read_all(&mut key_reader)?; + let key = rustls_pemfile::read_one(&mut key_reader)?; let certs = certs.into_iter().map(Certificate).collect(); - let key = if let Some(key) = keys.into_iter().find_map(|item| match item { - rustls_pemfile::Item::RSAKey(der) => Some(PrivateKey(der)), - rustls_pemfile::Item::PKCS8Key(der) => Some(PrivateKey(der)), - rustls_pemfile::Item::ECKey(der) => Some(PrivateKey(der)), - _ => None, - }) { - key + let key = if let Some(key) = key { + match key { + rustls_pemfile::Item::RSAKey(der) => PrivateKey(der), + rustls_pemfile::Item::PKCS8Key(der) => PrivateKey(der), + rustls_pemfile::Item::ECKey(der) => PrivateKey(der), + _ => { + tracing::warn!("Unknown key format: {:?}", key); + return Ok(None); + } + } } else { + tracing::warn!("Failed to read private key"); return Ok(None); }; From d8f3f1d0e95352a55cf83dbaac9f500a33ca5d91 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 20 Nov 2022 22:47:20 -0600 Subject: [PATCH 32/60] Add one more log in TLS config --- src/config.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/config.rs b/src/config.rs index 6f8db5a..a03ea3d 100644 --- a/src/config.rs +++ b/src/config.rs @@ -186,6 +186,11 @@ impl Config { let mut certs_reader = BufReader::new(std::fs::File::open(&tls.cert)?); let certs = rustls_pemfile::certs(&mut certs_reader)?; + if certs.is_empty() { + tracing::warn!("No certs read from certificate file"); + return Ok(None); + } + let mut key_reader = BufReader::new(std::fs::File::open(&tls.key)?); let key = rustls_pemfile::read_one(&mut key_reader)?; From a0195d94aa3bc5e6c0cdf95475d25b7b674dd606 Mon Sep 17 00:00:00 2001 From: asonix Date: Sun, 20 Nov 2022 22:47:49 -0600 Subject: [PATCH 33/60] Bump version --- Cargo.lock | 2 +- Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e693ecc..dc5ce3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -281,7 +281,7 @@ checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "ap-relay" -version = "0.3.58" +version = "0.3.59" dependencies = [ "activitystreams", "activitystreams-ext", diff --git a/Cargo.toml b/Cargo.toml index 8c400c7..2c8c1af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "ap-relay" description = "A simple activitypub relay" -version = "0.3.58" +version = "0.3.59" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" From 9133dd7688c4c21efd1fa824a4fe37cb8fd6a937 Mon Sep 17 00:00:00 2001 From: asonix Date: Mon, 21 Nov 2022 11:16:21 -0600 Subject: [PATCH 34/60] Add optional footer blurb --- .env | 1 + README.md | 3 +++ src/config.rs | 15 +++++++++++++++ templates/index.rs.html | 3 +++ 4 files changed, 22 insertions(+) diff --git a/.env b/.env index e8adc6f..b28a8c6 100644 --- a/.env +++ b/.env @@ -2,4 +2,5 @@ HOSTNAME=localhost:8079 PORT=8079 RESTRICTED_MODE=true API_TOKEN=somesecretpassword +FOOTER_BLURB="Contact @asonix for inquiries" # OPENTELEMETRY_URL=http://localhost:4317 diff --git a/README.md b/README.md index ed0957b..273f3bc 100644 --- a/README.md +++ b/README.md @@ -100,6 +100,7 @@ TELEGRAM_TOKEN=secret TELEGRAM_ADMIN_HANDLE=your_handle TLS_KEY=/path/to/key TLS_CERT=/path/to/cert +FOOTER_BLURB="Contact @asonix ``` #### Descriptions @@ -137,6 +138,8 @@ The handle of the telegram user allowed to administer the relay. There is no def Optional - This is specified if you are running the relay directly on the internet and have a TLS key to provide HTTPS for your relay ##### `TLS_CERT` Optional - This is specified if you are running the relay directly on the internet and have a TLS certificate chain to provide HTTPS for your relay +##### `FOOTER_BLURB` +Optional - Add custom notes in the footer of the page ### Subscribing Mastodon admins can subscribe to this relay by adding the `/inbox` route to their relay settings. diff --git a/src/config.rs b/src/config.rs index a03ea3d..b1da0cc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -37,6 +37,7 @@ pub(crate) struct ParsedConfig { api_token: Option, tls_key: Option, tls_cert: Option, + footer_blurb: Option, } #[derive(Clone)] @@ -56,6 +57,7 @@ pub struct Config { telegram_admin_handle: Option, api_token: Option, tls: Option, + footer_blurb: Option, } #[derive(Clone)] @@ -112,6 +114,7 @@ impl std::fmt::Debug for Config { .field("api_token", &"[redacted]") .field("tls_key", &"[redacted]") .field("tls_cert", &"[redacted]") + .field("footer_blurb", &self.footer_blurb) .finish() } } @@ -135,6 +138,7 @@ impl Config { .set_default("api_token", None as Option<&str>)? .set_default("tls_key", None as Option<&str>)? .set_default("tls_cert", None as Option<&str>)? + .set_default("footer_blurb", None as Option<&str>)? .add_source(Environment::default()) .build()?; @@ -172,6 +176,7 @@ impl Config { telegram_admin_handle: config.telegram_admin_handle, api_token: config.api_token, tls, + footer_blurb: config.footer_blurb, }) } @@ -214,6 +219,16 @@ impl Config { Ok(Some((certs, key))) } + pub(crate) fn footer_blurb(&self) -> Option> { + if let Some(blurb) = &self.footer_blurb { + if !blurb.is_empty() { + return Some(crate::templates::Html(blurb)); + } + } + + None + } + pub(crate) fn sled_path(&self) -> &PathBuf { &self.sled_path } diff --git a/templates/index.rs.html b/templates/index.rs.html index 20c0390..fe5161b 100644 --- a/templates/index.rs.html +++ b/templates/index.rs.html @@ -84,6 +84,9 @@ templates::{info, instance, statics::index_css},