diff --git a/.env b/.env index c007da1..e8adc6f 100644 --- a/.env +++ b/.env @@ -1,4 +1,5 @@ HOSTNAME=localhost:8079 PORT=8079 RESTRICTED_MODE=true +API_TOKEN=somesecretpassword # OPENTELEMETRY_URL=http://localhost:4317 diff --git a/Cargo.lock b/Cargo.lock index 9906982..e38d3f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -279,7 +279,7 @@ checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6" [[package]] name = "ap-relay" -version = "0.3.34" +version = "0.3.50" dependencies = [ "activitystreams", "activitystreams-ext", @@ -291,6 +291,7 @@ dependencies = [ "awc", "background-jobs", "base64", + "bcrypt", "clap", "config", "console-subscriber", @@ -494,16 +495,15 @@ dependencies = [ [[package]] name = "background-jobs-actix" -version = "0.13.0" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8660626a2d8781b50cbe0e3b63d8e2a7e08a90e80fa2bca8e8cc19deff72ebf4" +checksum = "47263ad9c5679419347dae655c2fa2cba078b0eaa51ac758d4f0e9690c06910b" dependencies = [ "actix-rt", "anyhow", "async-mutex", "async-trait", "background-jobs-core", - "num_cpus", "serde", "serde_json", "thiserror", @@ -544,6 +544,18 @@ version = "1.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b645a089122eccb6111b4f81cbc1a49f5900ac4666bb93ac027feaecf15607bf" +[[package]] +name = "bcrypt" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7e7c93a3fb23b2fdde989b2c9ec4dd153063ec81f408507f84c090cd91c6641" +dependencies = [ + "base64", + "blowfish", + "getrandom", + "zeroize", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -559,6 +571,16 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blowfish" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e412e2cd0f2b2d93e02543ceae7917b3c70331573df19ee046bcbc35e45e87d7" +dependencies = [ + "byteorder", + "cipher", +] + [[package]] name = "bumpalo" version = "3.11.1" @@ -594,9 +616,9 @@ dependencies = [ [[package]] name = "cc" -version = "1.0.75" +version = "1.0.76" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41ca34107f97baef6cfb231b32f36115781856b8f8208e8c580e0bcaea374842" +checksum = "76a284da2e6fe2092f2353e51713435363112dfd60030e22add80be333fb928f" [[package]] name = "cfg-if" @@ -606,19 +628,29 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" [[package]] name = "chrono" -version = "0.4.22" +version = "0.4.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" +checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f" dependencies = [ "num-integer", "num-traits", ] [[package]] -name = "clap" -version = "4.0.22" +name = "cipher" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "91b9970d7505127a162fdaa9b96428d28a479ba78c9ec7550a63a5d9863db682" +checksum = "d1873270f8f7942c191139cb8a40fd228da6c3fd2fc376d7e92d47aa14aeb59e" +dependencies = [ + "crypto-common", + "inout", +] + +[[package]] +name = "clap" +version = "4.0.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2148adefda54e14492fb9bddcc600b4344c5d1a3123bd666dcb939c6f0e0e57e" dependencies = [ "atty", "bitflags", @@ -708,9 +740,9 @@ dependencies = [ [[package]] name = "const-oid" -version = "0.9.0" +version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "722e23542a15cea1f65d4a1419c4cfd7a26706c70871a13a04238ca3f40f1661" +checksum = "cec318a675afcb6a1ea1d4340e2d377e56e47c266f28043ceccbf4412ddfdd3b" [[package]] name = "convert_case" @@ -852,9 +884,9 @@ dependencies = [ [[package]] name = "digest" -version = "0.10.5" +version = "0.10.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c" +checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f" dependencies = [ "block-buffer", "const-oid", @@ -1264,9 +1296,9 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.23.0" +version = "0.23.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" +checksum = "59df7c4e19c950e6e0e868dcc0a300b09a9b88e9ec55bd879ca819087a77355d" dependencies = [ "http", "hyper", @@ -1305,14 +1337,23 @@ dependencies = [ [[package]] name = "indexmap" -version = "1.9.1" +version = "1.9.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" +checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399" dependencies = [ "autocfg", "hashbrown", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "generic-array", +] + [[package]] name = "instant" version = "0.1.12" @@ -1404,9 +1445,9 @@ checksum = "fc7fcc620a3bff7cdd7a365be3376c97191aeaccc2a603e600951e452615bf89" [[package]] name = "libm" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "292a948cd991e376cf75541fe5b97a1081d713c618b4f1b9500f8844e49eb565" +checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb" [[package]] name = "linked-hash-map" @@ -1627,9 +1668,9 @@ dependencies = [ [[package]] name = "num-bigint-dig" -version = "0.8.1" +version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "566d173b2f9406afbc5510a90925d5a2cd80cae4605631f1212303df265de011" +checksum = "2399c9463abc5f909349d8aa9ba080e0b88b3ce2885389b60b993f39b1a56905" dependencies = [ "byteorder", "lazy_static", @@ -1793,9 +1834,9 @@ dependencies = [ [[package]] name = "os_str_bytes" -version = "6.3.1" +version = "6.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3baf96e39c5359d2eb0dd6ccb42c62b91d9678aa68160d261b9e0ccbf9e9dea9" +checksum = "7b5bf27447411e9ee3ff51186bf7a08e16c341efdde93f4d823e8844429bed7e" [[package]] name = "overload" @@ -2228,9 +2269,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.12" +version = "0.11.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" +checksum = "68cc60575865c7831548863cc02356512e3f1dc2f3f82cb837d7fc4cc8f3c97c" dependencies = [ "base64", "bytes", @@ -2295,9 +2336,9 @@ dependencies = [ [[package]] name = "rsa" -version = "0.7.1" +version = "0.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0ecc3307be66bfb3574577895555bacfb9a37a8d5cd959444b72ff02495c618" +checksum = "094052d5470cbcef561cb848a7209968c9f12dfa6d668f4bca048ac5de51099c" dependencies = [ "byteorder", "digest", @@ -2449,9 +2490,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.87" +version = "1.0.88" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ce777b7b150d76b9cf60d28b55f5847135a003f7d7350c6be7a773508ce7d45" +checksum = "8e8b3801309262e8184d9687fb697586833e939767aea0dda89f5a8e650e8bd7" dependencies = [ "itoa", "ryu", @@ -2664,9 +2705,9 @@ checksum = "20f34339676cdcab560c9a82300c4c2581f68b9369aedf0fae86f2ff9565ff3e" [[package]] name = "teloxide" -version = "0.11.1" +version = "0.11.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94734a391eb4f3b6172b285fc10593192f9bdb4c8a377075cff063d967f0e43b" +checksum = "19017dde82bddcbbdf8e40484f23985f4097b1baef4f7c0e006195ad1e6d4e3c" dependencies = [ "aquamarine", "bytes", @@ -2838,9 +2879,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.21.2" +version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" +checksum = "d76ce4a75fb488c605c54bf610f221cea8b0dafb53333c1a67e8ee199dcd2ae3" dependencies = [ "autocfg", "bytes", @@ -3216,9 +3257,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" [[package]] name = "uuid" -version = "1.2.1" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83" +checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c" dependencies = [ "getrandom", "serde", diff --git a/Cargo.toml b/Cargo.toml index bda38f8..dcddf46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "ap-relay" description = "A simple activitypub relay" -version = "0.3.34" +version = "0.3.50" authors = ["asonix "] license = "AGPL-3.0" readme = "README.md" @@ -29,6 +29,7 @@ activitystreams = "0.7.0-alpha.19" activitystreams-ext = "0.1.0-alpha.2" ammonia = "3.1.0" awc = { version = "3.0.0", default-features = false, features = ["rustls"] } +bcrypt = "0.13" base64 = "0.13" clap = { version = "4.0.0", features = ["derive"] } config = "0.13.0" diff --git a/README.md b/README.md index fe79fd2..1506ca2 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,9 @@ To simply run the server, the command is as follows $ ./relay ``` +#### Administration +> **NOTE:** The server _must be running_ in order to update the lists with the following commands + To learn about any other tasks, the `--help` flag can be passed ```bash An activitypub relay @@ -90,6 +93,8 @@ HTTPS=true PRETTY_LOG=false PUBLISH_BLOCKS=true SLED_PATH=./sled/db-0.34 +RUST_LOG=warn +API_TOKEN=somepasswordishtoken OPENTELEMETRY_URL=localhost:4317 TELEGRAM_TOKEN=secret TELEGRAM_ADMIN_HANDLE=your_handle @@ -114,8 +119,12 @@ Whether the current server is running on an HTTPS port or not. This is used for Whether or not to publish a list of blocked domains in the `nodeinfo` metadata for the server. It defaults to `false`. ##### `SLED_PATH` Where to store the on-disk database of connected servers. This defaults to `./sled/db-0.34`. +##### `RUST_LOG` +The log level to print. Available levels are `ERROR`, `WARN`, `INFO`, `DEBUG`, and `TRACE`. You can also specify module paths to enable some logs but not others, such as `RUST_LOG=warn,tracing_actix_web=info,relay=info` ##### `SOURCE_REPO` The URL to the source code for the relay. This defaults to `https://git.asonix.dog/asonix/relay`, but should be changed if you're running a fork hosted elsewhere. +##### `API_TOKEN` +The Secret token used to access the admin APIs. This must be set for the commandline to function ##### `OPENTELEMETRY_URL` A URL for exporting opentelemetry spans. This is mostly useful for debugging. There is no default, since most people probably don't run an opentelemetry collector. ##### `TELEGRAM_TOKEN` diff --git a/src/admin.rs b/src/admin.rs new file mode 100644 index 0000000..e7fc665 --- /dev/null +++ b/src/admin.rs @@ -0,0 +1,24 @@ +use activitystreams::iri_string::types::IriString; + +pub mod client; +pub mod routes; + +#[derive(serde::Deserialize, serde::Serialize)] +pub(crate) struct Domains { + domains: Vec, +} + +#[derive(serde::Deserialize, serde::Serialize)] +pub(crate) struct AllowedDomains { + pub(crate) allowed_domains: Vec, +} + +#[derive(serde::Deserialize, serde::Serialize)] +pub(crate) struct BlockedDomains { + pub(crate) blocked_domains: Vec, +} + +#[derive(serde::Deserialize, serde::Serialize)] +pub(crate) struct ConnectedActors { + pub(crate) connected_actors: Vec, +} diff --git a/src/admin/client.rs b/src/admin/client.rs new file mode 100644 index 0000000..f63151f --- /dev/null +++ b/src/admin/client.rs @@ -0,0 +1,103 @@ +use crate::{ + admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains}, + config::{AdminUrlKind, Config}, + error::{Error, ErrorKind}, +}; +use awc::Client; +use serde::de::DeserializeOwned; + +pub(crate) async fn allow( + client: &Client, + config: &Config, + domains: Vec, +) -> Result<(), Error> { + post_domains(client, config, domains, AdminUrlKind::Allow).await +} + +pub(crate) async fn disallow( + client: &Client, + config: &Config, + domains: Vec, +) -> Result<(), Error> { + post_domains(client, config, domains, AdminUrlKind::Disallow).await +} + +pub(crate) async fn block( + client: &Client, + config: &Config, + domains: Vec, +) -> Result<(), Error> { + post_domains(client, config, domains, AdminUrlKind::Block).await +} + +pub(crate) async fn unblock( + client: &Client, + config: &Config, + domains: Vec, +) -> Result<(), Error> { + post_domains(client, config, domains, AdminUrlKind::Unblock).await +} + +pub(crate) async fn allowed(client: &Client, config: &Config) -> Result { + get_results(client, config, AdminUrlKind::Allowed).await +} + +pub(crate) async fn blocked(client: &Client, config: &Config) -> Result { + get_results(client, config, AdminUrlKind::Blocked).await +} + +pub(crate) async fn connected(client: &Client, config: &Config) -> Result { + get_results(client, config, AdminUrlKind::Connected).await +} + +async fn get_results( + client: &Client, + config: &Config, + url_kind: AdminUrlKind, +) -> Result { + let x_api_token = config.x_api_token().ok_or(ErrorKind::MissingApiToken)?; + + let iri = config.generate_admin_url(url_kind); + + let mut res = client + .get(iri.as_str()) + .insert_header(x_api_token) + .send() + .await + .map_err(|e| ErrorKind::SendRequest(iri.to_string(), e.to_string()))?; + + if !res.status().is_success() { + return Err(ErrorKind::Status(iri.to_string(), res.status()).into()); + } + + let t = res + .json() + .await + .map_err(|e| ErrorKind::ReceiveResponse(iri.to_string(), e.to_string()))?; + + Ok(t) +} + +async fn post_domains( + client: &Client, + config: &Config, + domains: Vec, + url_kind: AdminUrlKind, +) -> Result<(), Error> { + let x_api_token = config.x_api_token().ok_or(ErrorKind::MissingApiToken)?; + + let iri = config.generate_admin_url(url_kind); + + let res = client + .post(iri.as_str()) + .insert_header(x_api_token) + .send_json(&Domains { domains }) + .await + .map_err(|e| ErrorKind::SendRequest(iri.to_string(), e.to_string()))?; + + if !res.status().is_success() { + tracing::warn!("Failed to allow domains"); + } + + Ok(()) +} diff --git a/src/admin/routes.rs b/src/admin/routes.rs new file mode 100644 index 0000000..c33efca --- /dev/null +++ b/src/admin/routes.rs @@ -0,0 +1,60 @@ +use crate::{ + admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains}, + error::Error, + extractors::Admin, +}; +use actix_web::{web::Json, HttpResponse}; + +pub(crate) async fn allow( + admin: Admin, + Json(Domains { domains }): Json, +) -> Result { + admin.db_ref().add_allows(domains).await?; + + Ok(HttpResponse::NoContent().finish()) +} + +pub(crate) async fn disallow( + admin: Admin, + Json(Domains { domains }): Json, +) -> Result { + admin.db_ref().remove_allows(domains).await?; + + Ok(HttpResponse::NoContent().finish()) +} + +pub(crate) async fn block( + admin: Admin, + Json(Domains { domains }): Json, +) -> Result { + admin.db_ref().add_blocks(domains).await?; + + Ok(HttpResponse::NoContent().finish()) +} + +pub(crate) async fn unblock( + admin: Admin, + Json(Domains { domains }): Json, +) -> Result { + admin.db_ref().remove_blocks(domains).await?; + + Ok(HttpResponse::NoContent().finish()) +} + +pub(crate) async fn allowed(admin: Admin) -> Result, Error> { + let allowed_domains = admin.db_ref().allows().await?; + + Ok(Json(AllowedDomains { allowed_domains })) +} + +pub(crate) async fn blocked(admin: Admin) -> Result, Error> { + let blocked_domains = admin.db_ref().blocks().await?; + + Ok(Json(BlockedDomains { blocked_domains })) +} + +pub(crate) async fn connected(admin: Admin) -> Result, Error> { + let connected_actors = admin.db_ref().connected_ids().await?; + + Ok(Json(ConnectedActors { connected_actors })) +} diff --git a/src/args.rs b/src/args.rs index 61e093f..6b6c054 100644 --- a/src/args.rs +++ b/src/args.rs @@ -17,6 +17,10 @@ pub(crate) struct Args { } impl Args { + pub(crate) fn any(&self) -> bool { + !self.blocks.is_empty() || !self.allowed.is_empty() || self.list + } + pub(crate) fn new() -> Self { Self::parse() } diff --git a/src/config.rs b/src/config.rs index e42b755..284f807 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,6 +1,7 @@ use crate::{ data::{ActorCache, State}, error::Error, + extractors::{AdminConfig, XApiToken}, middleware::MyVerify, requests::Requests, }; @@ -32,6 +33,7 @@ pub(crate) struct ParsedConfig { opentelemetry_url: Option, telegram_token: Option, telegram_admin_handle: Option, + api_token: Option, } #[derive(Clone)] @@ -49,6 +51,7 @@ pub struct Config { opentelemetry_url: Option, telegram_token: Option, telegram_admin_handle: Option, + api_token: Option, } #[derive(Debug)] @@ -65,6 +68,17 @@ pub enum UrlKind { Outbox, } +#[derive(Debug)] +pub enum AdminUrlKind { + Allow, + Disallow, + Block, + Unblock, + Allowed, + Blocked, + Connected, +} + impl std::fmt::Debug for Config { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Config") @@ -84,6 +98,7 @@ impl std::fmt::Debug for Config { ) .field("telegram_token", &"[redacted]") .field("telegram_admin_handle", &self.telegram_admin_handle) + .field("api_token", &"[redacted]") .finish() } } @@ -93,7 +108,7 @@ impl Config { let config = config::Config::builder() .set_default("hostname", "localhost:8080")? .set_default("addr", "127.0.0.1")? - .set_default::<_, u64>("port", 8080)? + .set_default("port", 8080u64)? .set_default("debug", true)? .set_default("restricted_mode", false)? .set_default("validate_signatures", false)? @@ -104,6 +119,7 @@ impl Config { .set_default("opentelemetry_url", None as Option<&str>)? .set_default("telegram_token", None as Option<&str>)? .set_default("telegram_admin_handle", None as Option<&str>)? + .set_default("api_token", None as Option<&str>)? .add_source(Environment::default()) .build()?; @@ -126,6 +142,7 @@ impl Config { opentelemetry_url: config.opentelemetry_url, telegram_token: config.telegram_token, telegram_admin_handle: config.telegram_admin_handle, + api_token: config.api_token, }) } @@ -158,6 +175,24 @@ impl Config { } } + pub(crate) fn x_api_token(&self) -> Option { + self.api_token.clone().map(XApiToken::new) + } + + pub(crate) fn admin_config(&self) -> Option> { + if let Some(api_token) = &self.api_token { + match AdminConfig::build(api_token) { + Ok(conf) => Some(actix_web::web::Data::new(conf)), + Err(e) => { + tracing::error!("Error creating admin config: {}", e); + None + } + } + } else { + None + } + } + pub(crate) fn bind_address(&self) -> (IpAddr, u16) { (self.addr, self.port) } @@ -281,4 +316,30 @@ impl Config { Ok(iri) } + + pub(crate) fn generate_admin_url(&self, kind: AdminUrlKind) -> IriString { + self.do_generate_admin_url(kind) + .expect("Generated valid IRI") + } + + fn do_generate_admin_url(&self, kind: AdminUrlKind) -> Result { + let iri = match kind { + AdminUrlKind::Allow => FixedBaseResolver::new(self.base_uri.as_ref()) + .try_resolve(IriRelativeStr::new("api/v1/admin/allow")?.as_ref())?, + AdminUrlKind::Disallow => FixedBaseResolver::new(self.base_uri.as_ref()) + .try_resolve(IriRelativeStr::new("api/v1/admin/disallow")?.as_ref())?, + AdminUrlKind::Block => FixedBaseResolver::new(self.base_uri.as_ref()) + .try_resolve(IriRelativeStr::new("api/v1/admin/block")?.as_ref())?, + AdminUrlKind::Unblock => FixedBaseResolver::new(self.base_uri.as_ref()) + .try_resolve(IriRelativeStr::new("api/v1/admin/unblock")?.as_ref())?, + AdminUrlKind::Allowed => FixedBaseResolver::new(self.base_uri.as_ref()) + .try_resolve(IriRelativeStr::new("api/v1/admin/allowed")?.as_ref())?, + AdminUrlKind::Blocked => FixedBaseResolver::new(self.base_uri.as_ref()) + .try_resolve(IriRelativeStr::new("api/v1/admin/blocked")?.as_ref())?, + AdminUrlKind::Connected => FixedBaseResolver::new(self.base_uri.as_ref()) + .try_resolve(IriRelativeStr::new("api/v1/admin/connected")?.as_ref())?, + }; + + Ok(iri) + } } diff --git a/src/data/actor.rs b/src/data/actor.rs index 50f0195..c0b3ddd 100644 --- a/src/data/actor.rs +++ b/src/data/actor.rs @@ -37,7 +37,7 @@ impl ActorCache { ActorCache { db } } - #[tracing::instrument(name = "Get Actor", skip_all, fields(id = id.to_string().as_str(), requests))] + #[tracing::instrument(level = "debug" name = "Get Actor", skip_all, fields(id = id.to_string().as_str(), requests))] pub(crate) async fn get( &self, id: &IriString, @@ -54,7 +54,7 @@ impl ActorCache { .map(MaybeCached::Fetched) } - #[tracing::instrument(name = "Add Connection", skip(self))] + #[tracing::instrument(level = "debug", name = "Add Connection", skip(self))] pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> { let add_connection = self.db.add_connection(actor.id.clone()); let save_actor = self.db.save_actor(actor); @@ -64,12 +64,12 @@ impl ActorCache { Ok(()) } - #[tracing::instrument(name = "Remove Connection", skip(self))] + #[tracing::instrument(level = "debug", name = "Remove Connection", skip(self))] pub(crate) async fn remove_connection(&self, actor: &Actor) -> Result<(), Error> { self.db.remove_connection(actor.id.clone()).await } - #[tracing::instrument(name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str(), requests))] + #[tracing::instrument(level = "debug", name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str(), requests))] pub(crate) async fn get_no_cache( &self, id: &IriString, diff --git a/src/data/media.rs b/src/data/media.rs index 18bc6e4..20136d6 100644 --- a/src/data/media.rs +++ b/src/data/media.rs @@ -1,14 +1,7 @@ -use crate::{ - db::{Db, MediaMeta}, - error::Error, -}; +use crate::{db::Db, error::Error}; use activitystreams::iri_string::types::IriString; -use actix_web::web::Bytes; -use std::time::{Duration, SystemTime}; use uuid::Uuid; -static MEDIA_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 2); - #[derive(Clone, Debug)] pub struct MediaCache { db: Db, @@ -19,42 +12,16 @@ impl MediaCache { MediaCache { db } } - #[tracing::instrument(name = "Get media uuid", skip_all, fields(url = url.to_string().as_str()))] + #[tracing::instrument(level = "debug", name = "Get media uuid", skip_all, fields(url = url.to_string().as_str()))] pub(crate) async fn get_uuid(&self, url: IriString) -> Result, Error> { self.db.media_id(url).await } - #[tracing::instrument(name = "Get media url", skip(self))] + #[tracing::instrument(level = "debug", name = "Get media url", skip(self))] pub(crate) async fn get_url(&self, uuid: Uuid) -> Result, Error> { self.db.media_url(uuid).await } - #[tracing::instrument(name = "Is media outdated", skip(self))] - pub(crate) async fn is_outdated(&self, uuid: Uuid) -> Result { - if let Some(meta) = self.db.media_meta(uuid).await? { - if meta.saved_at + MEDIA_DURATION > SystemTime::now() { - return Ok(false); - } - } - - Ok(true) - } - - #[tracing::instrument(name = "Get media bytes", skip(self))] - pub(crate) async fn get_bytes(&self, uuid: Uuid) -> Result, Error> { - if let Some(meta) = self.db.media_meta(uuid).await? { - if meta.saved_at + MEDIA_DURATION > SystemTime::now() { - return self - .db - .media_bytes(uuid) - .await - .map(|opt| opt.map(|bytes| (meta.media_type, bytes))); - } - } - - Ok(None) - } - #[tracing::instrument(name = "Store media url", skip_all, fields(url = url.to_string().as_str()))] pub(crate) async fn store_url(&self, url: IriString) -> Result { let uuid = Uuid::new_v4(); @@ -63,23 +30,4 @@ impl MediaCache { Ok(uuid) } - - #[tracing::instrument(name = "store media bytes", skip(self, bytes))] - pub(crate) async fn store_bytes( - &self, - uuid: Uuid, - media_type: String, - bytes: Bytes, - ) -> Result<(), Error> { - self.db - .save_bytes( - uuid, - MediaMeta { - media_type, - saved_at: SystemTime::now(), - }, - bytes, - ) - .await - } } diff --git a/src/data/node.rs b/src/data/node.rs index 730da30..815cc7c 100644 --- a/src/data/node.rs +++ b/src/data/node.rs @@ -34,7 +34,7 @@ impl NodeCache { NodeCache { db } } - #[tracing::instrument(name = "Get nodes", skip(self))] + #[tracing::instrument(level = "debug", name = "Get nodes", skip(self))] pub(crate) async fn nodes(&self) -> Result, Error> { let infos = self.db.connected_info(); let instances = self.db.connected_instance(); @@ -59,7 +59,7 @@ impl NodeCache { Ok(vec) } - #[tracing::instrument(name = "Is NodeInfo Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] + #[tracing::instrument(level = "debug", name = "Is NodeInfo Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] pub(crate) async fn is_nodeinfo_outdated(&self, actor_id: IriString) -> bool { self.db .info(actor_id) @@ -68,7 +68,7 @@ impl NodeCache { .unwrap_or(true) } - #[tracing::instrument(name = "Is Contact Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] + #[tracing::instrument(level = "debug", name = "Is Contact Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] pub(crate) async fn is_contact_outdated(&self, actor_id: IriString) -> bool { self.db .contact(actor_id) @@ -77,7 +77,7 @@ impl NodeCache { .unwrap_or(true) } - #[tracing::instrument(name = "Is Instance Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] + #[tracing::instrument(level = "debug", name = "Is Instance Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] pub(crate) async fn is_instance_outdated(&self, actor_id: IriString) -> bool { self.db .instance(actor_id) @@ -86,7 +86,7 @@ impl NodeCache { .unwrap_or(true) } - #[tracing::instrument(name = "Save node info", skip_all, fields(actor_id = actor_id.to_string().as_str(), software, version, reg))] + #[tracing::instrument(level = "debug", name = "Save node info", skip_all, fields(actor_id = actor_id.to_string().as_str(), software, version, reg))] pub(crate) async fn set_info( &self, actor_id: IriString, @@ -108,6 +108,7 @@ impl NodeCache { } #[tracing::instrument( + level = "debug", name = "Save instance info", skip_all, fields( @@ -144,6 +145,7 @@ impl NodeCache { } #[tracing::instrument( + level = "debug", name = "Save contact info", skip_all, fields( diff --git a/src/data/state.rs b/src/data/state.rs index f583281..93a3f9a 100644 --- a/src/data/state.rs +++ b/src/data/state.rs @@ -48,6 +48,7 @@ impl State { } #[tracing::instrument( + level = "debug", name = "Get inboxes for other domains", skip_all, fields( @@ -85,10 +86,10 @@ impl State { self.object_cache.write().await.put(object_id, actor_id); } - #[tracing::instrument(name = "Building state", skip_all)] + #[tracing::instrument(level = "debug", name = "Building state", skip_all)] pub(crate) async fn build(db: Db) -> Result { let private_key = if let Ok(Some(key)) = db.private_key().await { - tracing::info!("Using existing key"); + tracing::debug!("Using existing key"); key } else { tracing::info!("Generating new keys"); diff --git a/src/db.rs b/src/db.rs index fcfadba..685405f 100644 --- a/src/db.rs +++ b/src/db.rs @@ -3,7 +3,6 @@ use crate::{ error::{Error, ErrorKind}, }; use activitystreams::iri_string::types::IriString; -use actix_web::web::Bytes; use rsa::{ pkcs8::{DecodePrivateKey, EncodePrivateKey}, RsaPrivateKey, @@ -26,8 +25,6 @@ struct Inner { settings: Tree, media_url_media_id: Tree, media_id_media_url: Tree, - media_id_media_bytes: Tree, - media_id_media_meta: Tree, actor_id_info: Tree, actor_id_instance: Tree, actor_id_contact: Tree, @@ -63,12 +60,6 @@ impl std::fmt::Debug for Actor { } } -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] -pub(crate) struct MediaMeta { - pub(crate) media_type: String, - pub(crate) saved_at: SystemTime, -} - #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] pub struct Info { pub(crate) software: String, @@ -253,8 +244,6 @@ impl Db { settings: db.open_tree("settings")?, media_url_media_id: db.open_tree("media-url-media-id")?, media_id_media_url: db.open_tree("media-id-media-url")?, - media_id_media_bytes: db.open_tree("media-id-media-bytes")?, - media_id_media_meta: db.open_tree("media-id-media-meta")?, actor_id_info: db.open_tree("actor-id-info")?, actor_id_instance: db.open_tree("actor-id-instance")?, actor_id_contact: db.open_tree("actor-id-contact")?, @@ -281,10 +270,6 @@ impl Db { self.unblock(|inner| Ok(inner.connected().collect())).await } - pub(crate) async fn allowed_domains(&self) -> Result, Error> { - self.unblock(|inner| Ok(inner.allowed().collect())).await - } - pub(crate) async fn save_info(&self, actor_id: IriString, info: Info) -> Result<(), Error> { self.unblock(move |inner| { let vec = serde_json::to_vec(&info)?; @@ -396,25 +381,6 @@ impl Db { .await } - pub(crate) async fn save_bytes( - &self, - id: Uuid, - meta: MediaMeta, - bytes: Bytes, - ) -> Result<(), Error> { - self.unblock(move |inner| { - let vec = serde_json::to_vec(&meta)?; - - inner - .media_id_media_bytes - .insert(id.as_bytes(), bytes.as_ref())?; - inner.media_id_media_meta.insert(id.as_bytes(), vec)?; - - Ok(()) - }) - .await - } - pub(crate) async fn media_id(&self, url: IriString) -> Result, Error> { self.unblock(move |inner| { if let Some(ivec) = inner.media_url_media_id.get(url.as_str().as_bytes())? { @@ -437,29 +403,6 @@ impl Db { .await } - pub(crate) async fn media_bytes(&self, id: Uuid) -> Result, Error> { - self.unblock(move |inner| { - if let Some(ivec) = inner.media_id_media_bytes.get(id.as_bytes())? { - Ok(Some(Bytes::copy_from_slice(&ivec))) - } else { - Ok(None) - } - }) - .await - } - - pub(crate) async fn media_meta(&self, id: Uuid) -> Result, Error> { - self.unblock(move |inner| { - if let Some(ivec) = inner.media_id_media_meta.get(id.as_bytes())? { - let meta = serde_json::from_slice(&ivec)?; - Ok(Some(meta)) - } else { - Ok(None) - } - }) - .await - } - pub(crate) async fn blocks(&self) -> Result, Error> { self.unblock(|inner| Ok(inner.blocks().collect())).await } diff --git a/src/error.rs b/src/error.rs index 473ccf1..93c3a29 100644 --- a/src/error.rs +++ b/src/error.rs @@ -7,7 +7,6 @@ use actix_web::{ }; use http_signature_normalization_actix::PrepareSignError; use std::{convert::Infallible, fmt::Debug, io}; -use tracing::error; use tracing_error::SpanTrace; pub(crate) struct Error { @@ -15,6 +14,20 @@ pub(crate) struct Error { kind: ErrorKind, } +impl Error { + pub(crate) fn is_breaker(&self) -> bool { + matches!(self.kind, ErrorKind::Breaker) + } + + pub(crate) fn is_not_found(&self) -> bool { + matches!(self.kind, ErrorKind::Status(_, StatusCode::NOT_FOUND)) + } + + pub(crate) fn is_bad_request(&self) -> bool { + matches!(self.kind, ErrorKind::Status(_, StatusCode::BAD_REQUEST)) + } +} + impl std::fmt::Debug for Error { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { writeln!(f, "{:?}", self.kind) @@ -117,9 +130,6 @@ pub(crate) enum ErrorKind { #[error("{0}")] HostMismatch(#[from] CheckError), - #[error("Invalid or missing content type")] - ContentType, - #[error("Couldn't flush buffer")] FlushBuffer, @@ -164,6 +174,9 @@ pub(crate) enum ErrorKind { #[error("Failed to extract fields from {0}")] Extract(&'static str), + + #[error("No API Token supplied")] + MissingApiToken, } impl ResponseError for Error { diff --git a/src/extractors.rs b/src/extractors.rs new file mode 100644 index 0000000..d6cf1b1 --- /dev/null +++ b/src/extractors.rs @@ -0,0 +1,222 @@ +use actix_web::{ + dev::Payload, + error::{BlockingError, ParseError}, + http::{ + header::{from_one_raw_str, Header, HeaderName, HeaderValue, TryIntoHeaderValue}, + StatusCode, + }, + web::Data, + FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError, +}; +use bcrypt::{BcryptError, DEFAULT_COST}; +use futures_util::future::LocalBoxFuture; +use http_signature_normalization_actix::prelude::InvalidHeaderValue; +use std::{convert::Infallible, str::FromStr}; +use tracing_error::SpanTrace; + +use crate::db::Db; + +#[derive(Clone)] +pub(crate) struct AdminConfig { + hashed_api_token: String, +} + +impl AdminConfig { + pub(crate) fn build(api_token: &str) -> Result { + Ok(AdminConfig { + hashed_api_token: bcrypt::hash(api_token, DEFAULT_COST).map_err(Error::bcrypt_hash)?, + }) + } + + fn verify(&self, token: XApiToken) -> Result { + bcrypt::verify(&token.0, &self.hashed_api_token).map_err(Error::bcrypt_verify) + } +} + +pub(crate) struct Admin { + db: Data, +} + +impl Admin { + fn prepare_verify( + req: &HttpRequest, + ) -> Result<(Data, Data, XApiToken), Error> { + let hashed_api_token = req + .app_data::>() + .ok_or_else(Error::missing_config)? + .clone(); + + let x_api_token = XApiToken::parse(req).map_err(Error::parse_header)?; + + let db = req + .app_data::>() + .ok_or_else(Error::missing_db)? + .clone(); + + Ok((db, hashed_api_token, x_api_token)) + } + + #[tracing::instrument(level = "debug", skip_all)] + async fn verify( + hashed_api_token: Data, + x_api_token: XApiToken, + ) -> Result<(), Error> { + if actix_web::web::block(move || hashed_api_token.verify(x_api_token)) + .await + .map_err(Error::canceled)?? + { + return Ok(()); + } + + Err(Error::invalid()) + } + + pub(crate) fn db_ref(&self) -> &Db { + &self.db + } +} + +#[derive(Debug, thiserror::Error)] +#[error("Failed authentication")] +pub(crate) struct Error { + context: SpanTrace, + #[source] + kind: ErrorKind, +} + +impl Error { + fn invalid() -> Self { + Error { + context: SpanTrace::capture(), + kind: ErrorKind::Invalid, + } + } + + fn missing_config() -> Self { + Error { + context: SpanTrace::capture(), + kind: ErrorKind::MissingConfig, + } + } + + fn missing_db() -> Self { + Error { + context: SpanTrace::capture(), + kind: ErrorKind::MissingDb, + } + } + + fn bcrypt_verify(e: BcryptError) -> Self { + Error { + context: SpanTrace::capture(), + kind: ErrorKind::BCryptVerify(e), + } + } + + fn bcrypt_hash(e: BcryptError) -> Self { + Error { + context: SpanTrace::capture(), + kind: ErrorKind::BCryptHash(e), + } + } + + fn parse_header(e: ParseError) -> Self { + Error { + context: SpanTrace::capture(), + kind: ErrorKind::ParseHeader(e), + } + } + + fn canceled(_: BlockingError) -> Self { + Error { + context: SpanTrace::capture(), + kind: ErrorKind::Canceled, + } + } +} + +#[derive(Debug, thiserror::Error)] +enum ErrorKind { + #[error("Invalid API Token")] + Invalid, + + #[error("Missing Config")] + MissingConfig, + + #[error("Missing Db")] + MissingDb, + + #[error("Panic in verify")] + Canceled, + + #[error("Verifying")] + BCryptVerify(#[source] BcryptError), + + #[error("Hashing")] + BCryptHash(#[source] BcryptError), + + #[error("Parse Header")] + ParseHeader(#[source] ParseError), +} + +impl ResponseError for Error { + fn status_code(&self) -> StatusCode { + match self.kind { + ErrorKind::Invalid | ErrorKind::ParseHeader(_) => StatusCode::BAD_REQUEST, + _ => StatusCode::INTERNAL_SERVER_ERROR, + } + } + + fn error_response(&self) -> HttpResponse { + HttpResponse::build(self.status_code()) + .json(serde_json::json!({ "msg": self.kind.to_string() })) + } +} + +impl FromRequest for Admin { + type Error = Error; + type Future = LocalBoxFuture<'static, Result>; + + fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future { + let res = Self::prepare_verify(req); + Box::pin(async move { + let (db, c, t) = res?; + Self::verify(c, t).await?; + Ok(Admin { db }) + }) + } +} + +pub(crate) struct XApiToken(String); + +impl XApiToken { + pub(crate) fn new(token: String) -> Self { + Self(token) + } +} + +impl Header for XApiToken { + fn name() -> HeaderName { + HeaderName::from_static("x-api-token") + } + + fn parse(msg: &M) -> Result { + from_one_raw_str(msg.headers().get(Self::name())) + } +} + +impl TryIntoHeaderValue for XApiToken { + type Error = InvalidHeaderValue; + + fn try_into_value(self) -> Result { + HeaderValue::from_str(&self.0) + } +} + +impl FromStr for XApiToken { + type Err = Infallible; + + fn from_str(s: &str) -> Result { + Ok(XApiToken(s.to_string())) + } +} diff --git a/src/jobs/apub/announce.rs b/src/jobs/apub/announce.rs index d03f48e..26048c1 100644 --- a/src/jobs/apub/announce.rs +++ b/src/jobs/apub/announce.rs @@ -21,7 +21,7 @@ impl std::fmt::Debug for Announce { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Announce") .field("object_id", &self.object_id.to_string()) - .field("actor", &self.actor) + .field("actor_id", &self.actor.id) .finish() } } diff --git a/src/jobs/apub/follow.rs b/src/jobs/apub/follow.rs index e76f0ca..d78b544 100644 --- a/src/jobs/apub/follow.rs +++ b/src/jobs/apub/follow.rs @@ -13,12 +13,21 @@ use activitystreams::{ use background_jobs::ActixJob; use std::{future::Future, pin::Pin}; -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct Follow { input: AcceptedActivities, actor: Actor, } +impl std::fmt::Debug for Follow { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Follow") + .field("input", &self.input.id_unchecked()) + .field("actor", &self.actor.id) + .finish() + } +} + impl Follow { pub fn new(input: AcceptedActivities, actor: Actor) -> Self { Follow { input, actor } diff --git a/src/jobs/apub/forward.rs b/src/jobs/apub/forward.rs index e790826..1f72a32 100644 --- a/src/jobs/apub/forward.rs +++ b/src/jobs/apub/forward.rs @@ -8,12 +8,21 @@ use activitystreams::prelude::*; use background_jobs::ActixJob; use std::{future::Future, pin::Pin}; -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct Forward { input: AcceptedActivities, actor: Actor, } +impl std::fmt::Debug for Forward { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Forward") + .field("input", &self.input.id_unchecked()) + .field("actor", &self.actor.id) + .finish() + } +} + impl Forward { pub fn new(input: AcceptedActivities, actor: Actor) -> Self { Forward { input, actor } diff --git a/src/jobs/apub/reject.rs b/src/jobs/apub/reject.rs index 43a8553..f6ee0e7 100644 --- a/src/jobs/apub/reject.rs +++ b/src/jobs/apub/reject.rs @@ -7,9 +7,15 @@ use crate::{ use background_jobs::ActixJob; use std::{future::Future, pin::Pin}; -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct Reject(pub(crate) Actor); +impl std::fmt::Debug for Reject { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Reject").field("actor", &self.0.id).finish() + } +} + impl Reject { #[tracing::instrument(name = "Reject", skip(state))] async fn perform(self, state: JobState) -> Result<(), Error> { diff --git a/src/jobs/apub/undo.rs b/src/jobs/apub/undo.rs index e6fd1d1..0359bf2 100644 --- a/src/jobs/apub/undo.rs +++ b/src/jobs/apub/undo.rs @@ -5,15 +5,25 @@ use crate::{ error::Error, jobs::{apub::generate_undo_follow, Deliver, JobState}, }; +use activitystreams::prelude::BaseExt; use background_jobs::ActixJob; use std::{future::Future, pin::Pin}; -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] +#[derive(Clone, serde::Deserialize, serde::Serialize)] pub(crate) struct Undo { input: AcceptedActivities, actor: Actor, } +impl std::fmt::Debug for Undo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Undo") + .field("input", &self.input.id_unchecked()) + .field("actor", &self.actor.id) + .finish() + } +} + impl Undo { pub(crate) fn new(input: AcceptedActivities, actor: Actor) -> Self { Undo { input, actor } diff --git a/src/jobs/cache_media.rs b/src/jobs/cache_media.rs deleted file mode 100644 index 718a059..0000000 --- a/src/jobs/cache_media.rs +++ /dev/null @@ -1,44 +0,0 @@ -use crate::{error::Error, jobs::JobState}; -use background_jobs::ActixJob; -use std::{future::Future, pin::Pin}; -use uuid::Uuid; - -#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] -pub(crate) struct CacheMedia { - uuid: Uuid, -} - -impl CacheMedia { - pub(crate) fn new(uuid: Uuid) -> Self { - CacheMedia { uuid } - } - - #[tracing::instrument(name = "Cache media", skip(state))] - async fn perform(self, state: JobState) -> Result<(), Error> { - if !state.media.is_outdated(self.uuid).await? { - return Ok(()); - } - - if let Some(url) = state.media.get_url(self.uuid).await? { - let (content_type, bytes) = state.requests.fetch_bytes(url.as_str()).await?; - - state - .media - .store_bytes(self.uuid, content_type, bytes) - .await?; - } - - Ok(()) - } -} - -impl ActixJob for CacheMedia { - type State = JobState; - type Future = Pin>>>; - - const NAME: &'static str = "relay::jobs::CacheMedia"; - - fn run(self, state: Self::State) -> Self::Future { - Box::pin(async move { self.perform(state).await.map_err(Into::into) }) - } -} diff --git a/src/jobs/contact.rs b/src/jobs/contact.rs index 05ccb18..288941a 100644 --- a/src/jobs/contact.rs +++ b/src/jobs/contact.rs @@ -40,10 +40,18 @@ impl QueryContact { return Ok(()); } - let contact = state + let contact = match state .requests .fetch::(self.contact_id.as_str()) - .await?; + .await + { + Ok(contact) => contact, + Err(e) if e.is_breaker() => { + tracing::debug!("Not retrying due to failed breaker"); + return Ok(()); + } + Err(e) => return Err(e), + }; let (username, display_name, url, avatar) = to_contact(contact).ok_or(ErrorKind::Extract("contact"))?; diff --git a/src/jobs/deliver.rs b/src/jobs/deliver.rs index eb86ab8..9c01d8d 100644 --- a/src/jobs/deliver.rs +++ b/src/jobs/deliver.rs @@ -1,4 +1,7 @@ -use crate::{error::Error, jobs::JobState}; +use crate::{ + error::Error, + jobs::{debug_object, JobState}, +}; use activitystreams::iri_string::types::IriString; use background_jobs::{ActixJob, Backoff}; use std::{future::Future, pin::Pin}; @@ -13,7 +16,8 @@ impl std::fmt::Debug for Deliver { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Deliver") .field("to", &self.to.to_string()) - .field("data", &self.data) + .field("activity", &self.data["type"]) + .field("object", debug_object(&self.data)) .finish() } } @@ -31,7 +35,17 @@ impl Deliver { #[tracing::instrument(name = "Deliver", skip(state))] async fn permform(self, state: JobState) -> Result<(), Error> { - state.requests.deliver(self.to, &self.data).await?; + if let Err(e) = state.requests.deliver(self.to, &self.data).await { + if e.is_breaker() { + tracing::debug!("Not trying due to failed breaker"); + return Ok(()); + } + if e.is_bad_request() { + tracing::debug!("Server didn't understand the activity"); + return Ok(()); + } + return Err(e); + } Ok(()) } } diff --git a/src/jobs/deliver_many.rs b/src/jobs/deliver_many.rs index e37a12d..58cdc1f 100644 --- a/src/jobs/deliver_many.rs +++ b/src/jobs/deliver_many.rs @@ -1,6 +1,6 @@ use crate::{ error::Error, - jobs::{Deliver, JobState}, + jobs::{debug_object, Deliver, JobState}, }; use activitystreams::iri_string::types::IriString; use background_jobs::ActixJob; @@ -14,17 +14,9 @@ pub(crate) struct DeliverMany { impl std::fmt::Debug for DeliverMany { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let to = format!( - "[{}]", - self.to - .iter() - .map(|u| u.to_string()) - .collect::>() - .join(", ") - ); f.debug_struct("DeliverMany") - .field("to", &to) - .field("data", &self.data) + .field("activity", &self.data["type"]) + .field("object", debug_object(&self.data)) .finish() } } diff --git a/src/jobs/instance.rs b/src/jobs/instance.rs index 46d55ef..d3e2719 100644 --- a/src/jobs/instance.rs +++ b/src/jobs/instance.rs @@ -1,7 +1,7 @@ use crate::{ config::UrlKind, error::{Error, ErrorKind}, - jobs::{cache_media::CacheMedia, JobState}, + jobs::{Boolish, JobState}, }; use activitystreams::{iri, iri_string::types::IriString}; use background_jobs::ActixJob; @@ -47,10 +47,22 @@ impl QueryInstance { let scheme = self.actor_id.scheme_str(); let instance_uri = iri!(format!("{}://{}/api/v1/instance", scheme, authority)); - let instance = state + let instance = match state .requests .fetch_json::(instance_uri.as_str()) - .await?; + .await + { + Ok(instance) => instance, + Err(e) if e.is_breaker() => { + tracing::debug!("Not retrying due to failed breaker"); + return Ok(()); + } + Err(e) if e.is_not_found() => { + tracing::debug!("Server doesn't implement instance endpoint"); + return Ok(()); + } + Err(e) => return Err(e), + }; let description = instance.short_description.unwrap_or(instance.description); @@ -63,8 +75,6 @@ impl QueryInstance { let avatar = state.config.generate_url(UrlKind::Media(uuid)); - state.job_server.queue(CacheMedia::new(uuid)).await?; - state .node_cache .set_contact( @@ -86,7 +96,7 @@ impl QueryInstance { instance.title, description, instance.version, - instance.registrations, + *instance.registrations, instance.approval_required, ) .await?; @@ -116,7 +126,7 @@ struct Instance { short_description: Option, description: String, version: String, - registrations: bool, + registrations: Boolish, #[serde(default = "default_approval")] approval_required: bool, @@ -138,10 +148,17 @@ mod tests { use super::Instance; const ASONIX_INSTANCE: &'static str = r#"{"uri":"masto.asonix.dog","title":"asonix.dog","short_description":"The asonix of furry mastodon. For me and a few friends. DM me somewhere if u want an account lol","description":"A mastodon server that's only for me and nobody else sorry","email":"asonix@asonix.dog","version":"4.0.0rc2-asonix-changes","urls":{"streaming_api":"wss://masto.asonix.dog"},"stats":{"user_count":7,"status_count":12328,"domain_count":5146},"thumbnail":"https://masto.asonix.dog/system/site_uploads/files/000/000/002/@1x/32f51462a2b2bf2d.png","languages":["dog"],"registrations":false,"approval_required":false,"invites_enabled":false,"configuration":{"accounts":{"max_featured_tags":10},"statuses":{"max_characters":500,"max_media_attachments":4,"characters_reserved_per_url":23},"media_attachments":{"supported_mime_types":["image/jpeg","image/png","image/gif","image/heic","image/heif","image/webp","image/avif","video/webm","video/mp4","video/quicktime","video/ogg","audio/wave","audio/wav","audio/x-wav","audio/x-pn-wave","audio/vnd.wave","audio/ogg","audio/vorbis","audio/mpeg","audio/mp3","audio/webm","audio/flac","audio/aac","audio/m4a","audio/x-m4a","audio/mp4","audio/3gpp","video/x-ms-asf"],"image_size_limit":10485760,"image_matrix_limit":16777216,"video_size_limit":41943040,"video_frame_rate_limit":60,"video_matrix_limit":2304000},"polls":{"max_options":4,"max_characters_per_option":50,"min_expiration":300,"max_expiration":2629746}},"contact_account":{"id":"1","username":"asonix","acct":"asonix","display_name":"Liom on Mane :antiverified:","locked":true,"bot":false,"discoverable":true,"group":false,"created_at":"2021-02-09T00:00:00.000Z","note":"\u003cp\u003e26, local liom, friend, rust (lang) stan, bi \u003c/p\u003e\u003cp\u003eicon by \u003cspan class=\"h-card\"\u003e\u003ca href=\"https://furaffinity.net/user/lalupine\" target=\"blank\" rel=\"noopener noreferrer\" class=\"u-url mention\"\u003e@\u003cspan\u003elalupine@furaffinity.net\u003c/span\u003e\u003c/a\u003e\u003c/span\u003e\u003cbr /\u003eheader by \u003cspan class=\"h-card\"\u003e\u003ca href=\"https://furaffinity.net/user/tronixx\" target=\"blank\" rel=\"noopener noreferrer\" class=\"u-url mention\"\u003e@\u003cspan\u003etronixx@furaffinity.net\u003c/span\u003e\u003c/a\u003e\u003c/span\u003e\u003c/p\u003e\u003cp\u003eTestimonials:\u003c/p\u003e\u003cp\u003eStand: LIONS\u003cbr /\u003eStand User: AODE\u003cbr /\u003e- Keris (not on here)\u003c/p\u003e","url":"https://masto.asonix.dog/@asonix","avatar":"https://masto.asonix.dog/system/accounts/avatars/000/000/001/original/00852df0e6fee7e0.png","avatar_static":"https://masto.asonix.dog/system/accounts/avatars/000/000/001/original/00852df0e6fee7e0.png","header":"https://masto.asonix.dog/system/accounts/headers/000/000/001/original/8122ce3e5a745385.png","header_static":"https://masto.asonix.dog/system/accounts/headers/000/000/001/original/8122ce3e5a745385.png","followers_count":237,"following_count":474,"statuses_count":8798,"last_status_at":"2022-11-08","noindex":true,"emojis":[{"shortcode":"antiverified","url":"https://masto.asonix.dog/system/custom_emojis/images/000/030/053/original/bb0bc2e395b9a127.png","static_url":"https://masto.asonix.dog/system/custom_emojis/images/000/030/053/static/bb0bc2e395b9a127.png","visible_in_picker":true}],"fields":[{"name":"pronouns","value":"he/they","verified_at":null},{"name":"software","value":"bad","verified_at":null},{"name":"gitea","value":"\u003ca href=\"https://git.asonix.dog\" target=\"_blank\" rel=\"nofollow noopener noreferrer me\"\u003e\u003cspan class=\"invisible\"\u003ehttps://\u003c/span\u003e\u003cspan class=\"\"\u003egit.asonix.dog\u003c/span\u003e\u003cspan class=\"invisible\"\u003e\u003c/span\u003e\u003c/a\u003e","verified_at":null},{"name":"join my","value":"relay","verified_at":null}]},"rules":[]}"#; + const HYNET_INSTANCE: &'static str = r#"{"approval_required":false,"avatar_upload_limit":2000000,"background_image":"https://soc.hyena.network/images/city.jpg","background_upload_limit":4000000,"banner_upload_limit":4000000,"description":"Akkoma: The cooler fediverse server","description_limit":5000,"email":"me@hyena.network","languages":["en"],"max_toot_chars":"5000","pleroma":{"metadata":{"account_activation_required":true,"features":["pleroma_api","mastodon_api","mastodon_api_streaming","polls","v2_suggestions","pleroma_explicit_addressing","shareable_emoji_packs","multifetch","pleroma:api/v1/notifications:include_types_filter","chat","shout","relay","safe_dm_mentions","pleroma_emoji_reactions","pleroma_chat_messages","exposable_reactions","profile_directory","custom_emoji_reactions"],"federation":{"enabled":true,"exclusions":false,"mrf_hashtag":{"federated_timeline_removal":[],"reject":[],"sensitive":["nsfw"]},"mrf_policies":["SimplePolicy","EnsureRePrepended","HashtagPolicy"],"mrf_simple":{"accept":[],"avatar_removal":[],"banner_removal":[],"federated_timeline_removal":["botsin.space"],"followers_only":[],"media_nsfw":["mstdn.jp","wxw.moe","knzk.me","vipgirlfriend.xxx","humblr.social","switter.at","kinkyelephant.com","sinblr.com","kinky.business","rubber.social"],"media_removal":[],"reject":["*.10minutepleroma.com","101010.pl","13bells.com","2.distsn.org","2hu.club","2ndamendment.social","434.earth","4chan.icu","4qq.org","7td.org","80percent.social","a.nti.social","aaathats3as.com","accela.online","amala.schwartzwelt.xyz","angrytoday.com","anime.website","antitwitter.moe","antivaxxer.icu","archivefedifor.fun","artalley.social","bae.st","bajax.us","baraag.net","bbs.kawa-kun.com","beefyboys.club","beefyboys.win","bikeshed.party","bitcoinhackers.org","bleepp.com","blovice.bahnhof.cz","brighteon.social","buildthatwallandmakeamericagreatagain.trumpislovetrumpis.life","bungle.online","cawfee.club","censorship.icu","chungus.cc","club.darknight-coffee.org","clubcyberia.co","cock.fish","cock.li","comfyboy.club","contrapointsfan.club","coon.town","counter.social","cum.salon","d-fens.systems","definitely-not-archivefedifor.fun","degenerates.fail","desuposter.club","detroitriotcity.com","developer.gab.com","dogwhipping.day","eientei.org","enigmatic.observer","eveningzoo.club","exited.eu","federation.krowverse.services","fedi.cc","fedi.krowverse.services","fedi.pawlicker.com","fedi.vern.cc","freak.university","freeatlantis.com","freecumextremist.com","freesoftwareextremist.com","freespeech.firedragonstudios.com","freespeech.host","freespeechextremist.com","freevoice.space","freezepeach.xyz","froth.zone","fuckgov.org","gab.ai","gab.polaris-1.work","gab.protohype.net","gabfed.com","gameliberty.club","gearlandia.haus","gitmo.life","glindr.org","glittersluts.xyz","glowers.club","godspeed.moe","gorf.pub","goyim.app","gs.kawa-kun.com","hagra.net","hallsofamenti.io","hayu.sh","hentai.baby","honkwerx.tech","hunk.city","husk.site","iddqd.social","ika.moe","isexychat.space","jaeger.website","justicewarrior.social","kag.social","katiehopkinspolitical.icu","kiwifarms.cc","kiwifarms.is","kiwifarms.net","kohrville.net","koyu.space","kys.moe","lain.com","lain.sh","leafposter.club","lets.saynoto.lgbt","liberdon.com","libertarianism.club","ligma.pro","lolis.world","masochi.st","masthead.social","mastodon.digitalsuccess.dev","mastodon.fidonet.io","mastodon.grin.hu","mastodon.ml","midnightride.rs","milker.cafe","mobile.tmediatech.io","moon.holiday","mstdn.foxfam.club","mstdn.io","mstdn.starnix.network","mulmeyun.church","nazi.social","neckbeard.xyz","neenster.org","neko.ci","netzsphaere.xyz","newjack.city","nicecrew.digital","nnia.space","noagendasocial.com","norrebro.space","oursocialism.today","ovo.sc","pawoo.net","paypig.org","pedo.school","phreedom.tk","pieville.net","pkteerium.xyz","pl.murky.club","pl.spiderden.net","pl.tkammer.de","pl.zombiecats.run","pleroma.nobodyhasthe.biz","pleroma.runfox.tk","pleroma.site","plr.inferencium.net","pmth.us","poa.st","pod.vladtepesblog.com","political.icu","pooper.social","posting.lolicon.rocks","preteengirls.biz","prout.social","qoto.org","rage.lol","rakket.app","raplst.town","rdrama.cc","ryona.agency","s.sneak.berlin","seal.cafe","sealion.club","search.fedi.app","sementerrori.st","shitposter.club","shortstackran.ch","silkhe.art","sleepy.cafe","soc.mahodou.moe","soc.redeyes.site","social.076.ne.jp","social.anoxinon.de","social.chadland.net","social.freetalklive.com","social.getgle.org","social.handholding.io","social.headsca.la","social.imirhil.fr","social.lovingexpressions.net","social.manalejandro.com","social.midwaytrades.com","social.pseudo-whiskey.bar","social.targaryen.house","social.teci.world","societal.co","society.oftrolls.com","socks.pinnoto.org","socnet.supes.com","solagg.com","spinster.xyz","springbo.cc","stereophonic.space","sunshinegardens.org","theautisticinvestors.quest","thechad.zone","theduran.icu","theosis.church","toot.love","toots.alirezahayati.com","traboone.com","truthsocial.co.in","truthsocial.com","tuusin.misono-ya.info","tweety.icu","unbound.social","unsafe.space","varishangout.net","video.nobodyhasthe.biz","voicenews.icu","voluntaryism.club","waifu.social","weeaboo.space","whinge.town","wolfgirl.bar","workers.dev","wurm.host","xiii.ch","xn--p1abe3d.xn--80asehdb","yggdrasil.social","youjo.love"],"reject_deletes":[],"report_removal":[]},"mrf_simple_info":{"federated_timeline_removal":{"botsin.space":{"reason":"A lot of bot content"}},"media_nsfw":{"humblr.social":{"reason":"NSFW Instance, safe to assume most content is NSFW"},"kinky.business":{"reason":"NSFW Instance, safe to assume most content is NSFW"},"kinkyelephant.com":{"reason":"NSFW Instance, safe to assume most content is NSFW"},"knzk.me":{"reason":"Unmarked nsfw media"},"mstdn.jp":{"reason":"Not sure about the media policy"},"rubber.social":{"reason":"NSFW Instance, safe to assume most content is NSFW"},"sinblr.com":{"reason":"NSFW Instance, safe to assume most content is NSFW"},"switter.at":{"reason":"NSFW Instance, safe to assume most content is NSFW"},"vipgirlfriend.xxx":{"reason":"Unmarked nsfw media"},"wxw.moe":{"reason":"Unmarked nsfw media"}}},"quarantined_instances":[],"quarantined_instances_info":{"quarantined_instances":{}}},"fields_limits":{"max_fields":10,"max_remote_fields":20,"name_length":512,"value_length":2048},"post_formats":["text/plain","text/html","text/markdown","text/bbcode","text/x.misskeymarkdown"],"privileged_staff":false},"stats":{"mau":1},"vapid_public_key":"BMg4q-rT3rkMzc29F7OS5uM6t-Rx4HncMIB1NXrKwNlVRfX-W1kwgOuq5pDy-WhWmOZudaegftjBTCX3-pzdDFc"},"poll_limits":{"max_expiration":31536000,"max_option_chars":200,"max_options":20,"min_expiration":0},"registrations":"FALSE","shout_limit":5000,"stats":{"domain_count":1035,"status_count":7,"user_count":1},"thumbnail":"https://soc.hyena.network/instance/thumbnail.jpeg","title":"HyNET Social","upload_limit":16000000,"uri":"https://soc.hyena.network","urls":{"streaming_api":"wss://soc.hyena.network"},"version":"2.7.2 (compatible; Akkoma 3.0.0)"}"#; #[test] fn deser_masto_instance_with_contact() { let inst: Instance = serde_json::from_str(ASONIX_INSTANCE).unwrap(); let _ = inst.contact.unwrap(); } + + #[test] + fn deser_akkoma_instance_no_contact() { + let inst: Instance = serde_json::from_str(HYNET_INSTANCE).unwrap(); + assert!(inst.contact.is_none()); + } } diff --git a/src/jobs/mod.rs b/src/jobs/mod.rs index 5346046..6dfdbe9 100644 --- a/src/jobs/mod.rs +++ b/src/jobs/mod.rs @@ -1,5 +1,4 @@ pub mod apub; -mod cache_media; mod contact; mod deliver; mod deliver_many; @@ -8,7 +7,7 @@ mod nodeinfo; mod process_listeners; pub(crate) use self::{ - cache_media::CacheMedia, contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, + contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, instance::QueryInstance, nodeinfo::QueryNodeinfo, }; @@ -25,6 +24,20 @@ use background_jobs::{ }; use std::time::Duration; +fn debug_object(activity: &serde_json::Value) -> &serde_json::Value { + let mut object = &activity["object"]["type"]; + + if object.is_null() { + object = &activity["object"]["id"]; + } + + if object.is_null() { + object = &activity["object"]; + } + + object +} + pub(crate) fn create_workers( state: State, actors: ActorCache, @@ -45,7 +58,6 @@ pub(crate) fn create_workers( .register::() .register::() .register::() - .register::() .register::() .register::() .register::() @@ -124,3 +136,59 @@ impl JobServer { .map_err(Into::into) } } + +struct Boolish { + inner: bool, +} + +impl std::ops::Deref for Boolish { + type Target = bool; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl<'de> serde::Deserialize<'de> for Boolish { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + #[derive(serde::Deserialize)] + #[serde(untagged)] + enum BoolThing { + Bool(bool), + String(String), + } + + let thing: BoolThing = serde::Deserialize::deserialize(deserializer)?; + + match thing { + BoolThing::Bool(inner) => Ok(Boolish { inner }), + BoolThing::String(s) if s.to_lowercase() == "false" => Ok(Boolish { inner: false }), + BoolThing::String(_) => Ok(Boolish { inner: true }), + } + } +} + +#[cfg(test)] +mod tests { + use super::Boolish; + + #[test] + fn boolish_works() { + const CASES: &[(&str, bool)] = &[ + ("false", false), + ("\"false\"", false), + ("\"FALSE\"", false), + ("true", true), + ("\"true\"", true), + ("\"anything else\"", true), + ]; + + for (case, output) in CASES { + let b: Boolish = serde_json::from_str(case).unwrap(); + assert_eq!(*b, *output); + } + } +} diff --git a/src/jobs/nodeinfo.rs b/src/jobs/nodeinfo.rs index 43ea160..dbe2717 100644 --- a/src/jobs/nodeinfo.rs +++ b/src/jobs/nodeinfo.rs @@ -1,6 +1,6 @@ use crate::{ error::{Error, ErrorKind}, - jobs::{JobState, QueryContact}, + jobs::{Boolish, JobState, QueryContact}, }; use activitystreams::{iri, iri_string::types::IriString, primitives::OneOrMany}; use background_jobs::ActixJob; @@ -41,10 +41,18 @@ impl QueryNodeinfo { let scheme = self.actor_id.scheme_str(); let well_known_uri = iri!(format!("{}://{}/.well-known/nodeinfo", scheme, authority)); - let well_known = state + let well_known = match state .requests .fetch_json::(well_known_uri.as_str()) - .await?; + .await + { + Ok(well_known) => well_known, + Err(e) if e.is_breaker() => { + tracing::debug!("Not retrying due to failed breaker"); + return Ok(()); + } + Err(e) => return Err(e), + }; let href = if let Some(link) = well_known.links.into_iter().find(|l| l.rel.is_supported()) { link.href @@ -52,7 +60,14 @@ impl QueryNodeinfo { return Ok(()); }; - let nodeinfo = state.requests.fetch_json::(&href).await?; + let nodeinfo = match state.requests.fetch_json::(&href).await { + Ok(nodeinfo) => nodeinfo, + Err(e) if e.is_breaker() => { + tracing::debug!("Not retrying due to failed breaker"); + return Ok(()); + } + Err(e) => return Err(e), + }; state .node_cache @@ -60,7 +75,7 @@ impl QueryNodeinfo { self.actor_id.clone(), nodeinfo.software.name, nodeinfo.software.version, - nodeinfo.open_registrations, + *nodeinfo.open_registrations, ) .await?; @@ -98,7 +113,7 @@ struct Nodeinfo { version: SupportedVersion, software: Software, - open_registrations: bool, + open_registrations: Boolish, metadata: Option>, } @@ -214,6 +229,7 @@ mod tests { const ASONIX_DOG_4: &str = r#"{"links":[{"rel":"http://nodeinfo.diaspora.software/ns/schema/2.0","href":"https://masto.asonix.dog/nodeinfo/2.0"}]}"#; const RELAY_ASONIX_DOG: &str = r#"{"links":[{"rel":"http://nodeinfo.diaspora.software/ns/schema/2.0","href":"https://relay.asonix.dog/nodeinfo/2.0.json"}]}"#; const HYNET: &str = r#"{"links":[{"href":"https://soc.hyena.network/nodeinfo/2.0.json","rel":"http://nodeinfo.diaspora.software/ns/schema/2.0"},{"href":"https://soc.hyena.network/nodeinfo/2.1.json","rel":"http://nodeinfo.diaspora.software/ns/schema/2.1"}]}"#; + const NEW_HYNET: &str = r#"{"links":[{"href":"https://soc.hyena.network/nodeinfo/2.0.json","rel":"http://nodeinfo.diaspora.software/ns/schema/2.0"},{"href":"https://soc.hyena.network/nodeinfo/2.1.json","rel":"http://nodeinfo.diaspora.software/ns/schema/2.1"}]}"#; const BANANA_DOG_NODEINFO: &str = r#"{"version":"2.1","software":{"name":"corgidon","version":"3.1.3+corgi","repository":"https://github.com/msdos621/corgidon"},"protocols":["activitypub"],"usage":{"users":{"total":203,"activeMonth":115,"activeHalfyear":224},"localPosts":28856},"openRegistrations":true,"metadata":{"nodeName":"Banana.dog","nodeDescription":"\u003c/p\u003e\r\n\u003cp\u003e\r\nOfficially endorsed by \u003ca href=\"https://mastodon.social/@Gargron/100059130444127703\"\u003e@Gargron\u003c/a\u003e as a joke instance (along with \u003ca href=\"https://freedom.horse/about\"\u003efreedom.horse\u003c/a\u003e). Things that make banana.dog unique as an instance.\r\n\u003c/p\u003e\r\n\u003cul\u003e\r\n\u003cli\u003eFederates with TOR servers\u003c/li\u003e\r\n\u003cli\u003eStays up to date, often running newest mastodon code\u003c/li\u003e\r\n\u003cli\u003eUnique color scheme\u003c/li\u003e\r\n\u003cli\u003eA thorough set of rules\u003c/li\u003e\r\n\u003cli\u003eA BananaDogInc company. Visit our other sites sites including \u003ca href=\"https://betamax.video\"\u003ebetaMax.video\u003c/a\u003e, \u003ca href=\"https://psychicdebugging.com\"\u003epsychicdebugging\u003c/a\u003e and \u003ca href=\"https://somebody.once.told.me.the.world.is.gonnaroll.me/\"\u003egonnaroll\u003c/a\u003e\u003c/li\u003e\r\n\u003c/ul\u003e\r\n\u003cp\u003e\r\n\u003cem\u003eWho we are looking for:\u003c/em\u003e\r\nThis instance only allows senior toot engineers. If you have at least 10+ years of mastodon experience please apply here (https://banana.dog). We are looking for rockstar ninja rocket scientists and we offer unlimited PTO as well as a fully stocked snack bar (with soylent). We are a lean, agile, remote friendly mastodon startup that pays in the bottom 25% for senior tooters. All new members get equity via an innovative ICO call BananaCoin.\r\n\u003c/p\u003e\r\n\u003cp\u003e\r\n\u003cem\u003eThe interview process\u003c/em\u003e\r\nTo join we have a take home exam that involves you writing several hundred toots that we can use to screen you. We will then throw these away during your interview so that we can do a technical screening where we use a whiteboard to evaluate your ability to re-toot memes and shitpost in front of a panel. This panel will be composed of senior tooters who are all 30 year old cis white males (coincidence).\r\n\u003c/p\u003e\r\n\u003cp\u003e\r\n\u003cem\u003eHere are the reasons you may want to join:\u003c/em\u003e\r\nWe are an agile tooting startup (a tootup). That means for every senior tooter we have a designer, a UX person, a product manager, project manager and scrum master. We meet for 15min every day and plan twice a week in a 3 hour meeting but it’s cool because you get lunch and have to attend. Our tooters love it, I would know if they didn’t since we all have standing desks in an open office layouts d can hear everything!\r\n\u003c/p\u003e\r\n\u003cp\u003e\r\n\u003ca href=\"https://www.patreon.com/bePatron?u=178864\" data-patreon-widget-type=\"become-patron-button\"\u003eSupport our sites on Patreon\u003c/a\u003e\r\n\u003c/p\u003e\r\n\u003cp\u003e","nodeTerms":"","siteContactEmail":"corgi@banana.dog","domainCount":5841,"features":["mastodon_api","mastodon_api_streaming"],"invitesEnabled":true,"federation":{"rejectMedia":[],"rejectReports":[],"silence":[],"suspend":[]}},"services":{"outbound":[],"inbound":[]}}"#; const ASONIX_DOG_NODEINFO: &str = r#"{"version":"2.0","software":{"name":"mastodon","version":"3.1.3-asonix-changes"},"protocols":["activitypub"],"usage":{"users":{"total":19,"activeMonth":5,"activeHalfyear":5},"localPosts":43036},"openRegistrations":false}"#; @@ -221,6 +237,7 @@ mod tests { const RELAY_ASONIX_DOG_NODEINFO: &str = r#"{"version":"2.0","software":{"name":"aoderelay","version":"v0.1.0-master"},"protocols":["activitypub"],"services":{"inbound":[],"outbound":[]},"openRegistrations":false,"usage":{"users":{"total":1,"activeHalfyear":1,"activeMonth":1},"localPosts":0,"localComments":0},"metadata":{"peers":[],"blocks":[]}}"#; const HYNET_NODEINFO: &str = r#"{"metadata":{"accountActivationRequired":true,"features":["pleroma_api","mastodon_api","mastodon_api_streaming","polls","pleroma_explicit_addressing","shareable_emoji_packs","multifetch","pleroma:api/v1/notifications:include_types_filter","media_proxy","chat","relay","safe_dm_mentions","pleroma_emoji_reactions","pleroma_chat_messages"],"federation":{"enabled":true,"exclusions":false,"mrf_policies":["SimplePolicy","EnsureRePrepended"],"mrf_simple":{"accept":[],"avatar_removal":[],"banner_removal":[],"federated_timeline_removal":["botsin.space","humblr.social","switter.at","kinkyelephant.com","mstdn.foxfam.club","dajiaweibo.com"],"followers_only":[],"media_nsfw":["mstdn.jp","wxw.moe","knzk.me","anime.website","pl.nudie.social","neckbeard.xyz","baraag.net","pawoo.net","vipgirlfriend.xxx","humblr.social","switter.at","kinkyelephant.com","sinblr.com","kinky.business","rubber.social"],"media_removal":[],"reject":["gab.com","search.fedi.app","kiwifarms.cc","pawoo.net","2hu.club","gameliberty.club","loli.estate","shitasstits.life","social.homunyan.com","club.super-niche.club","vampire.estate","weeaboo.space","wxw.moe","youkai.town","kowai.youkai.town","preteengirls.biz","vipgirlfriend.xxx","social.myfreecams.com","pleroma.rareome.ga","ligma.pro","nnia.space","dickkickextremist.xyz","freespeechextremist.com","m.gretaoto.ca","7td.org","pl.smuglo.li","pleroma.hatthieves.es","jojo.singleuser.club","anime.website","rage.lol","shitposter.club"],"reject_deletes":[],"report_removal":[]},"quarantined_instances":["freespeechextremist.com","spinster.xyz"]},"fieldsLimits":{"maxFields":10,"maxRemoteFields":20,"nameLength":512,"valueLength":2048},"invitesEnabled":true,"mailerEnabled":true,"nodeDescription":"All the cackling for your hyaenid needs.","nodeName":"HyNET Social","pollLimits":{"max_expiration":31536000,"max_option_chars":200,"max_options":20,"min_expiration":0},"postFormats":["text/plain","text/html","text/markdown","text/bbcode"],"private":false,"restrictedNicknames":[".well-known","~","about","activities","api","auth","check_password","dev","friend-requests","inbox","internal","main","media","nodeinfo","notice","oauth","objects","ostatus_subscribe","pleroma","proxy","push","registration","relay","settings","status","tag","user-search","user_exists","users","web","verify_credentials","update_credentials","relationships","search","confirmation_resend","mfa"],"skipThreadContainment":true,"staffAccounts":["https://soc.hyena.network/users/HyNET","https://soc.hyena.network/users/mel"],"suggestions":{"enabled":false},"uploadLimits":{"avatar":2000000,"background":4000000,"banner":4000000,"general":10000000}},"openRegistrations":true,"protocols":["activitypub"],"services":{"inbound":[],"outbound":[]},"software":{"name":"pleroma","version":"2.2.50-724-gf917285b-develop+HyNET-prod"},"usage":{"localPosts":3444,"users":{"total":19}},"version":"2.0"}"#; + const NEW_HYNET_NODEINFO: &str = r#"{"metadata":{"accountActivationRequired":true,"features":["pleroma_api","mastodon_api","mastodon_api_streaming","polls","v2_suggestions","pleroma_explicit_addressing","shareable_emoji_packs","multifetch","pleroma:api/v1/notifications:include_types_filter","chat","shout","relay","safe_dm_mentions","pleroma_emoji_reactions","pleroma_chat_messages","exposable_reactions","profile_directory","custom_emoji_reactions"],"federation":{"enabled":true,"exclusions":false,"mrf_hashtag":{"federated_timeline_removal":[],"reject":[],"sensitive":["nsfw"]},"mrf_policies":["SimplePolicy","EnsureRePrepended","HashtagPolicy"],"mrf_simple":{"accept":[],"avatar_removal":[],"banner_removal":[],"federated_timeline_removal":["botsin.space"],"followers_only":[],"media_nsfw":["mstdn.jp","wxw.moe","knzk.me","vipgirlfriend.xxx","humblr.social","switter.at","kinkyelephant.com","sinblr.com","kinky.business","rubber.social"],"media_removal":[],"reject":["*.10minutepleroma.com","101010.pl","13bells.com","2.distsn.org","2hu.club","2ndamendment.social","434.earth","4chan.icu","4qq.org","7td.org","80percent.social","a.nti.social","aaathats3as.com","accela.online","amala.schwartzwelt.xyz","angrytoday.com","anime.website","antitwitter.moe","antivaxxer.icu","archivefedifor.fun","artalley.social","bae.st","bajax.us","baraag.net","bbs.kawa-kun.com","beefyboys.club","beefyboys.win","bikeshed.party","bitcoinhackers.org","bleepp.com","blovice.bahnhof.cz","brighteon.social","buildthatwallandmakeamericagreatagain.trumpislovetrumpis.life","bungle.online","cawfee.club","censorship.icu","chungus.cc","club.darknight-coffee.org","clubcyberia.co","cock.fish","cock.li","comfyboy.club","contrapointsfan.club","coon.town","counter.social","cum.salon","d-fens.systems","definitely-not-archivefedifor.fun","degenerates.fail","desuposter.club","detroitriotcity.com","developer.gab.com","dogwhipping.day","eientei.org","enigmatic.observer","eveningzoo.club","exited.eu","federation.krowverse.services","fedi.cc","fedi.krowverse.services","fedi.pawlicker.com","fedi.vern.cc","freak.university","freeatlantis.com","freecumextremist.com","freesoftwareextremist.com","freespeech.firedragonstudios.com","freespeech.host","freespeechextremist.com","freevoice.space","freezepeach.xyz","froth.zone","fuckgov.org","gab.ai","gab.polaris-1.work","gab.protohype.net","gabfed.com","gameliberty.club","gearlandia.haus","gitmo.life","glindr.org","glittersluts.xyz","glowers.club","godspeed.moe","gorf.pub","goyim.app","gs.kawa-kun.com","hagra.net","hallsofamenti.io","hayu.sh","hentai.baby","honkwerx.tech","hunk.city","husk.site","iddqd.social","ika.moe","isexychat.space","jaeger.website","justicewarrior.social","kag.social","katiehopkinspolitical.icu","kiwifarms.cc","kiwifarms.is","kiwifarms.net","kohrville.net","koyu.space","kys.moe","lain.com","lain.sh","leafposter.club","lets.saynoto.lgbt","liberdon.com","libertarianism.club","ligma.pro","lolis.world","masochi.st","masthead.social","mastodon.digitalsuccess.dev","mastodon.fidonet.io","mastodon.grin.hu","mastodon.ml","midnightride.rs","milker.cafe","mobile.tmediatech.io","moon.holiday","mstdn.foxfam.club","mstdn.io","mstdn.starnix.network","mulmeyun.church","nazi.social","neckbeard.xyz","neenster.org","neko.ci","netzsphaere.xyz","newjack.city","nicecrew.digital","nnia.space","noagendasocial.com","norrebro.space","oursocialism.today","ovo.sc","pawoo.net","paypig.org","pedo.school","phreedom.tk","pieville.net","pkteerium.xyz","pl.murky.club","pl.spiderden.net","pl.tkammer.de","pl.zombiecats.run","pleroma.nobodyhasthe.biz","pleroma.runfox.tk","pleroma.site","plr.inferencium.net","pmth.us","poa.st","pod.vladtepesblog.com","political.icu","pooper.social","posting.lolicon.rocks","preteengirls.biz","prout.social","qoto.org","rage.lol","rakket.app","raplst.town","rdrama.cc","ryona.agency","s.sneak.berlin","seal.cafe","sealion.club","search.fedi.app","sementerrori.st","shitposter.club","shortstackran.ch","silkhe.art","sleepy.cafe","soc.mahodou.moe","soc.redeyes.site","social.076.ne.jp","social.anoxinon.de","social.chadland.net","social.freetalklive.com","social.getgle.org","social.handholding.io","social.headsca.la","social.imirhil.fr","social.lovingexpressions.net","social.manalejandro.com","social.midwaytrades.com","social.pseudo-whiskey.bar","social.targaryen.house","social.teci.world","societal.co","society.oftrolls.com","socks.pinnoto.org","socnet.supes.com","solagg.com","spinster.xyz","springbo.cc","stereophonic.space","sunshinegardens.org","theautisticinvestors.quest","thechad.zone","theduran.icu","theosis.church","toot.love","toots.alirezahayati.com","traboone.com","truthsocial.co.in","truthsocial.com","tuusin.misono-ya.info","tweety.icu","unbound.social","unsafe.space","varishangout.net","video.nobodyhasthe.biz","voicenews.icu","voluntaryism.club","waifu.social","weeaboo.space","whinge.town","wolfgirl.bar","workers.dev","wurm.host","xiii.ch","xn--p1abe3d.xn--80asehdb","yggdrasil.social","youjo.love"],"reject_deletes":[],"report_removal":[]},"mrf_simple_info":{"federated_timeline_removal":{"botsin.space":{"reason":"A lot of bot content"}},"media_nsfw":{"humblr.social":{"reason":"NSFW Instance, safe to assume most content is NSFW"},"kinky.business":{"reason":"NSFW Instance, safe to assume most content is NSFW"},"kinkyelephant.com":{"reason":"NSFW Instance, safe to assume most content is NSFW"},"knzk.me":{"reason":"Unmarked nsfw media"},"mstdn.jp":{"reason":"Not sure about the media policy"},"rubber.social":{"reason":"NSFW Instance, safe to assume most content is NSFW"},"sinblr.com":{"reason":"NSFW Instance, safe to assume most content is NSFW"},"switter.at":{"reason":"NSFW Instance, safe to assume most content is NSFW"},"vipgirlfriend.xxx":{"reason":"Unmarked nsfw media"},"wxw.moe":{"reason":"Unmarked nsfw media"}}},"quarantined_instances":[],"quarantined_instances_info":{"quarantined_instances":{}}},"fieldsLimits":{"maxFields":10,"maxRemoteFields":20,"nameLength":512,"valueLength":2048},"invitesEnabled":false,"mailerEnabled":true,"nodeDescription":"Akkoma: The cooler fediverse server","nodeName":"HyNET Social","pollLimits":{"max_expiration":31536000,"max_option_chars":200,"max_options":20,"min_expiration":0},"postFormats":["text/plain","text/html","text/markdown","text/bbcode","text/x.misskeymarkdown"],"private":false,"restrictedNicknames":[".well-known","~","about","activities","api","auth","check_password","dev","friend-requests","inbox","internal","main","media","nodeinfo","notice","oauth","objects","ostatus_subscribe","pleroma","proxy","push","registration","relay","settings","status","tag","user-search","user_exists","users","web","verify_credentials","update_credentials","relationships","search","confirmation_resend","mfa"],"skipThreadContainment":true,"staffAccounts":["https://soc.hyena.network/users/mel"],"suggestions":{"enabled":false},"uploadLimits":{"avatar":2000000,"background":4000000,"banner":4000000,"general":16000000}},"openRegistrations":"FALSE","protocols":["activitypub"],"services":{"inbound":[],"outbound":[]},"software":{"name":"akkoma","version":"3.0.0"},"usage":{"localPosts":7,"users":{"total":1}},"version":"2.0"}"#; #[test] fn hyena_network() { @@ -243,6 +260,27 @@ mod tests { ); } + #[test] + fn new_hyena_network() { + is_supported(NEW_HYNET); + let nodeinfo = de::(NEW_HYNET_NODEINFO); + let accounts = nodeinfo + .metadata + .unwrap() + .into_iter() + .next() + .unwrap() + .staff_accounts + .unwrap(); + assert_eq!(accounts.len(), 1); + assert_eq!( + accounts[0], + "https://soc.hyena.network/users/mel" + .parse::() + .unwrap() + ); + } + #[test] fn asonix_dog_4() { is_supported(ASONIX_DOG_4); diff --git a/src/main.rs b/src/main.rs index 3302bc4..1d80a0d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,12 +12,14 @@ use tracing_error::ErrorLayer; use tracing_log::LogTracer; use tracing_subscriber::{filter::Targets, fmt::format::FmtSpan, layer::SubscriberExt, Layer}; +mod admin; mod apub; mod args; mod config; mod data; mod db; mod error; +mod extractors; mod jobs; mod middleware; mod requests; @@ -97,30 +99,50 @@ async fn main() -> Result<(), anyhow::Error> { init_subscriber(Config::software_name(), config.opentelemetry_url())?; - let db = Db::build(&config)?; - let args = Args::new(); - if args.list() { - for domain in db.blocks().await? { - println!("block {}", domain); + if args.any() { + let client = requests::build_client(&config.user_agent()); + + if !args.blocks().is_empty() || !args.allowed().is_empty() { + if args.undo() { + admin::client::unblock(&client, &config, args.blocks().to_vec()).await?; + admin::client::disallow(&client, &config, args.allowed().to_vec()).await?; + } else { + admin::client::block(&client, &config, args.blocks().to_vec()).await?; + admin::client::allow(&client, &config, args.allowed().to_vec()).await?; + } + println!("Updated lists"); } - for domain in db.allows().await? { - println!("allow {}", domain); + + if args.list() { + let (blocked, allowed, connected) = tokio::try_join!( + admin::client::blocked(&client, &config), + admin::client::allowed(&client, &config), + admin::client::connected(&client, &config) + )?; + + let mut report = String::from("Report:\n"); + if !allowed.allowed_domains.is_empty() { + report += "\nAllowed\n\t"; + report += &allowed.allowed_domains.join("\n\t"); + } + if !blocked.blocked_domains.is_empty() { + report += "\n\nBlocked\n\t"; + report += &blocked.blocked_domains.join("\n\t"); + } + if !connected.connected_actors.is_empty() { + report += "\n\nConnected\n\t"; + report += &connected.connected_actors.join("\n\t"); + } + report += "\n"; + println!("{report}"); } + return Ok(()); } - if !args.blocks().is_empty() || !args.allowed().is_empty() { - if args.undo() { - db.remove_blocks(args.blocks().to_vec()).await?; - db.remove_allows(args.allowed().to_vec()).await?; - } else { - db.add_blocks(args.blocks().to_vec()).await?; - db.add_allows(args.allowed().to_vec()).await?; - } - return Ok(()); - } + let db = Db::build(&config)?; let media = MediaCache::new(db.clone()); let state = State::build(db.clone()).await?; @@ -135,15 +157,22 @@ async fn main() -> Result<(), anyhow::Error> { let bind_address = config.bind_address(); HttpServer::new(move || { - App::new() - .wrap(TracingLogger::default()) + let app = App::new() .app_data(web::Data::new(db.clone())) .app_data(web::Data::new(state.clone())) .app_data(web::Data::new(state.requests(&config))) .app_data(web::Data::new(actors.clone())) .app_data(web::Data::new(config.clone())) .app_data(web::Data::new(job_server.clone())) - .app_data(web::Data::new(media.clone())) + .app_data(web::Data::new(media.clone())); + + let app = if let Some(data) = config.admin_config() { + app.app_data(data) + } else { + app + }; + + app.wrap(TracingLogger::default()) .service(web::resource("/").route(web::get().to(index))) .service(web::resource("/media/{path}").route(web::get().to(routes::media))) .service( @@ -165,6 +194,18 @@ async fn main() -> Result<(), anyhow::Error> { .service(web::resource("/nodeinfo").route(web::get().to(nodeinfo_meta))), ) .service(web::resource("/static/{filename}").route(web::get().to(statics))) + .service( + web::scope("/api/v1").service( + web::scope("/admin") + .route("/allow", web::post().to(admin::routes::allow)) + .route("/disallow", web::post().to(admin::routes::disallow)) + .route("/block", web::post().to(admin::routes::block)) + .route("/unblock", web::post().to(admin::routes::unblock)) + .route("/allowed", web::get().to(admin::routes::allowed)) + .route("/blocked", web::get().to(admin::routes::blocked)) + .route("/connected", web::get().to(admin::routes::connected)), + ), + ) }) .bind(bind_address)? .run() diff --git a/src/middleware/verifier.rs b/src/middleware/verifier.rs index b371fbd..8234787 100644 --- a/src/middleware/verifier.rs +++ b/src/middleware/verifier.rs @@ -16,7 +16,7 @@ use std::{future::Future, pin::Pin}; pub(crate) struct MyVerify(pub Requests, pub ActorCache, pub State); impl MyVerify { - #[tracing::instrument("Verify signature", skip(self))] + #[tracing::instrument("Verify signature", skip(self, signature))] async fn verify( &self, algorithm: Option, @@ -26,6 +26,9 @@ impl MyVerify { ) -> Result { let public_key_id = iri!(key_id); + // receiving an activity from a domain indicates it is probably online + self.0.reset_breaker(&public_key_id); + let actor_id = if let Some(mut actor_id) = self .2 .db diff --git a/src/requests.rs b/src/requests.rs index b55da16..332024a 100644 --- a/src/requests.rs +++ b/src/requests.rs @@ -1,10 +1,7 @@ use crate::error::{Error, ErrorKind}; use activitystreams::iri_string::types::IriString; -use actix_web::{ - http::{header::Date, StatusCode}, - web::Bytes, -}; -use awc::Client; +use actix_web::http::header::Date; +use awc::{error::SendRequestError, Client, ClientResponse}; use dashmap::DashMap; use http_signature_normalization_actix::prelude::*; use rand::thread_rng; @@ -20,7 +17,6 @@ use std::{ }, time::{Duration, SystemTime}, }; -use tracing::{debug, info, warn}; use tracing_awc::Tracing; const ONE_SECOND: u64 = 1; @@ -57,6 +53,9 @@ impl Breakers { let should_write = { if let Some(mut breaker) = self.inner.get_mut(authority) { breaker.fail(); + if !breaker.should_try() { + tracing::warn!("Failed breaker for {}", authority); + } false } else { true @@ -105,17 +104,12 @@ struct Breaker { } impl Breaker { - const fn failure_threshold() -> usize { - 10 - } - - fn failure_wait() -> Duration { - Duration::from_secs(ONE_DAY) - } + const FAILURE_WAIT: Duration = Duration::from_secs(ONE_DAY); + const FAILURE_THRESHOLD: usize = 10; fn should_try(&self) -> bool { - self.failures < Self::failure_threshold() - || self.last_attempt + Self::failure_wait() < SystemTime::now() + self.failures < Self::FAILURE_THRESHOLD + || self.last_attempt + Self::FAILURE_WAIT < SystemTime::now() } fn fail(&mut self) { @@ -166,6 +160,14 @@ impl std::fmt::Debug for Requests { } } +pub(crate) fn build_client(user_agent: &str) -> Client { + Client::builder() + .wrap(Tracing) + .add_default_header(("User-Agent", user_agent.to_string())) + .timeout(Duration::from_secs(15)) + .finish() +} + impl Requests { pub(crate) fn new( key_id: String, @@ -174,12 +176,7 @@ impl Requests { breakers: Breakers, ) -> Self { Requests { - client: Rc::new(RefCell::new( - Client::builder() - .wrap(Tracing) - .add_default_header(("User-Agent", user_agent.clone())) - .finish(), - )), + client: Rc::new(RefCell::new(build_client(&user_agent))), consecutive_errors: Rc::new(AtomicUsize::new(0)), error_limit: 3, key_id, @@ -190,14 +187,15 @@ impl Requests { } } + pub(crate) fn reset_breaker(&self, iri: &IriString) { + self.breakers.succeed(iri); + } + fn count_err(&self) { let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed); if count + 1 >= self.error_limit { - warn!("{} consecutive errors, rebuilding http client", count); - *self.client.borrow_mut() = Client::builder() - .wrap(Tracing) - .add_default_header(("User-Agent", self.user_agent.clone())) - .finish(); + tracing::warn!("{} consecutive errors, rebuilding http client", count + 1); + *self.client.borrow_mut() = build_client(&self.user_agent); self.reset_err(); } } @@ -206,7 +204,41 @@ impl Requests { self.consecutive_errors.swap(0, Ordering::Relaxed); } - #[tracing::instrument(name = "Fetch Json", skip(self))] + async fn check_response( + &self, + parsed_url: &IriString, + res: Result, + ) -> Result { + if res.is_err() { + self.count_err(); + self.breakers.fail(&parsed_url); + } + + let mut res = + res.map_err(|e| ErrorKind::SendRequest(parsed_url.to_string(), e.to_string()))?; + + self.reset_err(); + + if !res.status().is_success() { + self.breakers.fail(&parsed_url); + + if let Ok(bytes) = res.body().await { + if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { + if !s.is_empty() { + tracing::warn!("Response from {}, {}", parsed_url, s); + } + } + } + + return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into()); + } + + self.breakers.succeed(&parsed_url); + + Ok(res) + } + + #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))] pub(crate) async fn fetch_json(&self, url: &str) -> Result where T: serde::de::DeserializeOwned, @@ -214,7 +246,7 @@ impl Requests { self.do_fetch(url, "application/json").await } - #[tracing::instrument(name = "Fetch Activity+Json", skip(self))] + #[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))] pub(crate) async fn fetch(&self, url: &str) -> Result where T: serde::de::DeserializeOwned, @@ -233,6 +265,7 @@ impl Requests { } let signer = self.signer(); + let span = tracing::Span::current(); let client: Client = self.client.borrow().clone(); let res = client @@ -242,36 +275,16 @@ impl Requests { .signature( self.config.clone(), self.key_id.clone(), - move |signing_string| signer.sign(signing_string), + move |signing_string| { + span.record("signing_string", signing_string); + span.in_scope(|| signer.sign(signing_string)) + }, ) .await? .send() .await; - if res.is_err() { - self.count_err(); - self.breakers.fail(&parsed_url); - } - - let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?; - - self.reset_err(); - - if !res.status().is_success() { - if let Ok(bytes) = res.body().await { - if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { - if !s.is_empty() { - debug!("Response from {}, {}", url, s); - } - } - } - - self.breakers.fail(&parsed_url); - - return Err(ErrorKind::Status(url.to_string(), res.status()).into()); - } - - self.breakers.succeed(&parsed_url); + let mut res = self.check_response(&parsed_url, res).await?; let body = res .body() @@ -281,80 +294,42 @@ impl Requests { Ok(serde_json::from_slice(body.as_ref())?) } - #[tracing::instrument(name = "Fetch Bytes", skip(self))] - pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), Error> { - let parsed_url = url.parse::()?; - - if !self.breakers.should_try(&parsed_url) { + #[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))] + pub(crate) async fn fetch_response(&self, url: IriString) -> Result { + if !self.breakers.should_try(&url) { return Err(ErrorKind::Breaker.into()); } - info!("Fetching bytes for {}", url); let signer = self.signer(); + let span = tracing::Span::current(); let client: Client = self.client.borrow().clone(); let res = client - .get(url) + .get(url.as_str()) .insert_header(("Accept", "*/*")) .insert_header(Date(SystemTime::now().into())) + .no_decompress() .signature( self.config.clone(), self.key_id.clone(), - move |signing_string| signer.sign(signing_string), + move |signing_string| { + span.record("signing_string", signing_string); + span.in_scope(|| signer.sign(signing_string)) + }, ) .await? .send() .await; - if res.is_err() { - self.breakers.fail(&parsed_url); - self.count_err(); - } + let res = self.check_response(&url, res).await?; - let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?; - - self.reset_err(); - - let content_type = if let Some(content_type) = res.headers().get("content-type") { - if let Ok(s) = content_type.to_str() { - s.to_owned() - } else { - return Err(ErrorKind::ContentType.into()); - } - } else { - return Err(ErrorKind::ContentType.into()); - }; - - if !res.status().is_success() { - if let Ok(bytes) = res.body().await { - if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { - if !s.is_empty() { - debug!("Response from {}, {}", url, s); - } - } - } - - self.breakers.fail(&parsed_url); - - return Err(ErrorKind::Status(url.to_string(), res.status()).into()); - } - - self.breakers.succeed(&parsed_url); - - let bytes = match res.body().limit(1024 * 1024 * 4).await { - Err(e) => { - return Err(ErrorKind::ReceiveResponse(url.to_string(), e.to_string()).into()); - } - Ok(bytes) => bytes, - }; - - Ok((content_type, bytes)) + Ok(res) } #[tracing::instrument( "Deliver to Inbox", skip_all, - fields(inbox = inbox.to_string().as_str(), item) + fields(inbox = inbox.to_string().as_str(), signing_string) )] pub(crate) async fn deliver(&self, inbox: IriString, item: &T) -> Result<(), Error> where @@ -365,6 +340,7 @@ impl Requests { } let signer = self.signer(); + let span = tracing::Span::current(); let item_string = serde_json::to_string(item)?; let client: Client = self.client.borrow().clone(); @@ -378,39 +354,17 @@ impl Requests { self.key_id.clone(), Sha256::new(), item_string, - move |signing_string| signer.sign(signing_string), + move |signing_string| { + span.record("signing_string", signing_string); + span.in_scope(|| signer.sign(signing_string)) + }, ) .await? .split(); let res = req.send_body(body).await; - if res.is_err() { - self.count_err(); - self.breakers.fail(&inbox); - } - - let mut res = res.map_err(|e| ErrorKind::SendRequest(inbox.to_string(), e.to_string()))?; - - self.reset_err(); - - if !res.status().is_success() { - // Bad Request means the server didn't understand our activity - that's fine - if res.status() != StatusCode::BAD_REQUEST { - if let Ok(bytes) = res.body().await { - if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { - if !s.is_empty() { - warn!("Response from {}, {}", inbox.as_str(), s); - } - } - } - - self.breakers.fail(&inbox); - return Err(ErrorKind::Status(inbox.to_string(), res.status()).into()); - } - } - - self.breakers.succeed(&inbox); + self.check_response(&inbox, res).await?; Ok(()) } diff --git a/src/routes/inbox.rs b/src/routes/inbox.rs index 32d874e..14286df 100644 --- a/src/routes/inbox.rs +++ b/src/routes/inbox.rs @@ -16,7 +16,7 @@ use activitystreams::{ use actix_web::{web, HttpResponse}; use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified}; -#[tracing::instrument(name = "Inbox", skip(actors, client, jobs, config, state))] +#[tracing::instrument(name = "Inbox", skip_all)] pub(crate) async fn route( state: web::Data, actors: web::Data, diff --git a/src/routes/index.rs b/src/routes/index.rs index ea2634e..4e141d7 100644 --- a/src/routes/index.rs +++ b/src/routes/index.rs @@ -1,19 +1,40 @@ use crate::{ config::Config, - data::State, + data::{Node, State}, error::{Error, ErrorKind}, }; use actix_web::{web, HttpResponse}; use rand::{seq::SliceRandom, thread_rng}; use std::io::BufWriter; +fn open_reg(node: &Node) -> bool { + node.instance + .as_ref() + .map(|i| i.reg) + .or_else(|| node.info.as_ref().map(|i| i.reg)) + .unwrap_or(false) +} + #[tracing::instrument(name = "Index", skip(config, state))] pub(crate) async fn route( state: web::Data, config: web::Data, ) -> Result { let mut nodes = state.node_cache().nodes().await?; - nodes.shuffle(&mut thread_rng()); + + nodes.sort_by(|lhs, rhs| match (open_reg(lhs), open_reg(rhs)) { + (true, true) | (false, false) => std::cmp::Ordering::Equal, + (true, false) => std::cmp::Ordering::Less, + (false, true) => std::cmp::Ordering::Greater, + }); + + if let Some((i, _)) = nodes.iter().enumerate().find(|(_, node)| !open_reg(node)) { + nodes[..i].shuffle(&mut thread_rng()); + nodes[i..].shuffle(&mut thread_rng()); + } else { + nodes.shuffle(&mut thread_rng()); + } + let mut buf = BufWriter::new(Vec::new()); crate::templates::index(&mut buf, &nodes, &config)?; diff --git a/src/routes/media.rs b/src/routes/media.rs index d0d86c1..8a9de62 100644 --- a/src/routes/media.rs +++ b/src/routes/media.rs @@ -1,8 +1,5 @@ use crate::{data::MediaCache, error::Error, requests::Requests}; -use actix_web::{ - http::header::{CacheControl, CacheDirective}, - web, HttpResponse, -}; +use actix_web::{body::BodyStream, web, HttpResponse}; use uuid::Uuid; #[tracing::instrument(name = "Media", skip(media, requests))] @@ -13,30 +10,17 @@ pub(crate) async fn route( ) -> Result { let uuid = uuid.into_inner(); - if let Some((content_type, bytes)) = media.get_bytes(uuid).await? { - return Ok(cached(content_type, bytes)); - } - if let Some(url) = media.get_url(uuid).await? { - let (content_type, bytes) = requests.fetch_bytes(url.as_str()).await?; + let res = requests.fetch_response(url).await?; - media - .store_bytes(uuid, content_type.clone(), bytes.clone()) - .await?; + let mut response = HttpResponse::build(res.status()); - return Ok(cached(content_type, bytes)); + for (name, value) in res.headers().iter().filter(|(h, _)| *h != "connection") { + response.insert_header((name.clone(), value.clone())); + } + + return Ok(response.body(BodyStream::new(res))); } Ok(HttpResponse::NotFound().finish()) } - -fn cached(content_type: String, bytes: web::Bytes) -> HttpResponse { - HttpResponse::Ok() - .insert_header(CacheControl(vec![ - CacheDirective::Public, - CacheDirective::MaxAge(60 * 60 * 24), - CacheDirective::Extension("immutable".to_owned(), None), - ])) - .content_type(content_type) - .body(bytes) -} diff --git a/src/telegram.rs b/src/telegram.rs index 39636fb..270e1af 100644 --- a/src/telegram.rs +++ b/src/telegram.rs @@ -105,7 +105,7 @@ async fn answer(bot: Bot, msg: Message, cmd: Command, db: Db) -> ResponseResult< .await?; } Command::ListAllowed => { - if let Ok(allowed) = db.allowed_domains().await { + if let Ok(allowed) = db.allows().await { bot.send_message(msg.chat.id, allowed.join("\n")).await?; } } diff --git a/templates/index.rs.html b/templates/index.rs.html index 2e2e2e1..46591d5 100644 --- a/templates/index.rs.html +++ b/templates/index.rs.html @@ -25,7 +25,8 @@ templates::{info, instance, statics::index_css},
-

Instances fédérées

+

@nodes.len() instances fédérées

+

@nodes.len() Connected Servers

@if nodes.is_empty() {

Aucune instance fédérée en ce moment.

} else {