Merge branch 'upstream' into max

This commit is contained in:
Maxime Augier 2022-11-19 14:09:39 +01:00
commit e89ee20d38
36 changed files with 992 additions and 428 deletions

1
.env
View File

@ -1,4 +1,5 @@
HOSTNAME=localhost:8079 HOSTNAME=localhost:8079
PORT=8079 PORT=8079
RESTRICTED_MODE=true RESTRICTED_MODE=true
API_TOKEN=somesecretpassword
# OPENTELEMETRY_URL=http://localhost:4317 # OPENTELEMETRY_URL=http://localhost:4317

115
Cargo.lock generated
View File

@ -279,7 +279,7 @@ checksum = "216261ddc8289130e551ddcd5ce8a064710c0d064a4d2895c67151c92b5443f6"
[[package]] [[package]]
name = "ap-relay" name = "ap-relay"
version = "0.3.34" version = "0.3.50"
dependencies = [ dependencies = [
"activitystreams", "activitystreams",
"activitystreams-ext", "activitystreams-ext",
@ -291,6 +291,7 @@ dependencies = [
"awc", "awc",
"background-jobs", "background-jobs",
"base64", "base64",
"bcrypt",
"clap", "clap",
"config", "config",
"console-subscriber", "console-subscriber",
@ -494,16 +495,15 @@ dependencies = [
[[package]] [[package]]
name = "background-jobs-actix" name = "background-jobs-actix"
version = "0.13.0" version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8660626a2d8781b50cbe0e3b63d8e2a7e08a90e80fa2bca8e8cc19deff72ebf4" checksum = "47263ad9c5679419347dae655c2fa2cba078b0eaa51ac758d4f0e9690c06910b"
dependencies = [ dependencies = [
"actix-rt", "actix-rt",
"anyhow", "anyhow",
"async-mutex", "async-mutex",
"async-trait", "async-trait",
"background-jobs-core", "background-jobs-core",
"num_cpus",
"serde", "serde",
"serde_json", "serde_json",
"thiserror", "thiserror",
@ -544,6 +544,18 @@ version = "1.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b645a089122eccb6111b4f81cbc1a49f5900ac4666bb93ac027feaecf15607bf" checksum = "b645a089122eccb6111b4f81cbc1a49f5900ac4666bb93ac027feaecf15607bf"
[[package]]
name = "bcrypt"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7e7c93a3fb23b2fdde989b2c9ec4dd153063ec81f408507f84c090cd91c6641"
dependencies = [
"base64",
"blowfish",
"getrandom",
"zeroize",
]
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "1.3.2" version = "1.3.2"
@ -559,6 +571,16 @@ dependencies = [
"generic-array", "generic-array",
] ]
[[package]]
name = "blowfish"
version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e412e2cd0f2b2d93e02543ceae7917b3c70331573df19ee046bcbc35e45e87d7"
dependencies = [
"byteorder",
"cipher",
]
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.11.1" version = "3.11.1"
@ -594,9 +616,9 @@ dependencies = [
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.75" version = "1.0.76"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41ca34107f97baef6cfb231b32f36115781856b8f8208e8c580e0bcaea374842" checksum = "76a284da2e6fe2092f2353e51713435363112dfd60030e22add80be333fb928f"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
@ -606,19 +628,29 @@ checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]] [[package]]
name = "chrono" name = "chrono"
version = "0.4.22" version = "0.4.23"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfd4d1b31faaa3a89d7934dbded3111da0d2ef28e3ebccdb4f0179f5929d1ef1" checksum = "16b0a3d9ed01224b22057780a37bb8c5dbfe1be8ba48678e7bf57ec4b385411f"
dependencies = [ dependencies = [
"num-integer", "num-integer",
"num-traits", "num-traits",
] ]
[[package]] [[package]]
name = "clap" name = "cipher"
version = "4.0.22" version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "91b9970d7505127a162fdaa9b96428d28a479ba78c9ec7550a63a5d9863db682" checksum = "d1873270f8f7942c191139cb8a40fd228da6c3fd2fc376d7e92d47aa14aeb59e"
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "clap"
version = "4.0.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2148adefda54e14492fb9bddcc600b4344c5d1a3123bd666dcb939c6f0e0e57e"
dependencies = [ dependencies = [
"atty", "atty",
"bitflags", "bitflags",
@ -708,9 +740,9 @@ dependencies = [
[[package]] [[package]]
name = "const-oid" name = "const-oid"
version = "0.9.0" version = "0.9.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "722e23542a15cea1f65d4a1419c4cfd7a26706c70871a13a04238ca3f40f1661" checksum = "cec318a675afcb6a1ea1d4340e2d377e56e47c266f28043ceccbf4412ddfdd3b"
[[package]] [[package]]
name = "convert_case" name = "convert_case"
@ -852,9 +884,9 @@ dependencies = [
[[package]] [[package]]
name = "digest" name = "digest"
version = "0.10.5" version = "0.10.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adfbc57365a37acbd2ebf2b64d7e69bb766e2fea813521ed536f5d0520dcf86c" checksum = "8168378f4e5023e7218c89c891c0fd8ecdb5e5e4f18cb78f38cf245dd021e76f"
dependencies = [ dependencies = [
"block-buffer", "block-buffer",
"const-oid", "const-oid",
@ -1264,9 +1296,9 @@ dependencies = [
[[package]] [[package]]
name = "hyper-rustls" name = "hyper-rustls"
version = "0.23.0" version = "0.23.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d87c48c02e0dc5e3b849a2041db3029fd066650f8f717c07bf8ed78ccb895cac" checksum = "59df7c4e19c950e6e0e868dcc0a300b09a9b88e9ec55bd879ca819087a77355d"
dependencies = [ dependencies = [
"http", "http",
"hyper", "hyper",
@ -1305,14 +1337,23 @@ dependencies = [
[[package]] [[package]]
name = "indexmap" name = "indexmap"
version = "1.9.1" version = "1.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "10a35a97730320ffe8e2d410b5d3b69279b98d2c14bdb8b70ea89ecf7888d41e" checksum = "1885e79c1fc4b10f0e172c475f458b7f7b93061064d98c3293e98c5ba0c8b399"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"hashbrown", "hashbrown",
] ]
[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "instant" name = "instant"
version = "0.1.12" version = "0.1.12"
@ -1404,9 +1445,9 @@ checksum = "fc7fcc620a3bff7cdd7a365be3376c97191aeaccc2a603e600951e452615bf89"
[[package]] [[package]]
name = "libm" name = "libm"
version = "0.2.5" version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "292a948cd991e376cf75541fe5b97a1081d713c618b4f1b9500f8844e49eb565" checksum = "348108ab3fba42ec82ff6e9564fc4ca0247bdccdc68dd8af9764bbc79c3c8ffb"
[[package]] [[package]]
name = "linked-hash-map" name = "linked-hash-map"
@ -1627,9 +1668,9 @@ dependencies = [
[[package]] [[package]]
name = "num-bigint-dig" name = "num-bigint-dig"
version = "0.8.1" version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "566d173b2f9406afbc5510a90925d5a2cd80cae4605631f1212303df265de011" checksum = "2399c9463abc5f909349d8aa9ba080e0b88b3ce2885389b60b993f39b1a56905"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"lazy_static", "lazy_static",
@ -1793,9 +1834,9 @@ dependencies = [
[[package]] [[package]]
name = "os_str_bytes" name = "os_str_bytes"
version = "6.3.1" version = "6.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3baf96e39c5359d2eb0dd6ccb42c62b91d9678aa68160d261b9e0ccbf9e9dea9" checksum = "7b5bf27447411e9ee3ff51186bf7a08e16c341efdde93f4d823e8844429bed7e"
[[package]] [[package]]
name = "overload" name = "overload"
@ -2228,9 +2269,9 @@ dependencies = [
[[package]] [[package]]
name = "reqwest" name = "reqwest"
version = "0.11.12" version = "0.11.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "431949c384f4e2ae07605ccaa56d1d9d2ecdb5cadd4f9577ccfab29f2e5149fc" checksum = "68cc60575865c7831548863cc02356512e3f1dc2f3f82cb837d7fc4cc8f3c97c"
dependencies = [ dependencies = [
"base64", "base64",
"bytes", "bytes",
@ -2295,9 +2336,9 @@ dependencies = [
[[package]] [[package]]
name = "rsa" name = "rsa"
version = "0.7.1" version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0ecc3307be66bfb3574577895555bacfb9a37a8d5cd959444b72ff02495c618" checksum = "094052d5470cbcef561cb848a7209968c9f12dfa6d668f4bca048ac5de51099c"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"digest", "digest",
@ -2449,9 +2490,9 @@ dependencies = [
[[package]] [[package]]
name = "serde_json" name = "serde_json"
version = "1.0.87" version = "1.0.88"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ce777b7b150d76b9cf60d28b55f5847135a003f7d7350c6be7a773508ce7d45" checksum = "8e8b3801309262e8184d9687fb697586833e939767aea0dda89f5a8e650e8bd7"
dependencies = [ dependencies = [
"itoa", "itoa",
"ryu", "ryu",
@ -2664,9 +2705,9 @@ checksum = "20f34339676cdcab560c9a82300c4c2581f68b9369aedf0fae86f2ff9565ff3e"
[[package]] [[package]]
name = "teloxide" name = "teloxide"
version = "0.11.1" version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94734a391eb4f3b6172b285fc10593192f9bdb4c8a377075cff063d967f0e43b" checksum = "19017dde82bddcbbdf8e40484f23985f4097b1baef4f7c0e006195ad1e6d4e3c"
dependencies = [ dependencies = [
"aquamarine", "aquamarine",
"bytes", "bytes",
@ -2838,9 +2879,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c"
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.21.2" version = "1.22.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9e03c497dc955702ba729190dc4aac6f2a0ce97f913e5b1b5912fc5039d9099" checksum = "d76ce4a75fb488c605c54bf610f221cea8b0dafb53333c1a67e8ee199dcd2ae3"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"bytes", "bytes",
@ -3216,9 +3257,9 @@ checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.2.1" version = "1.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "feb41e78f93363bb2df8b0e86a2ca30eed7806ea16ea0c790d757cf93f79be83" checksum = "422ee0de9031b5b948b97a8fc04e3aa35230001a722ddd27943e0be31564ce4c"
dependencies = [ dependencies = [
"getrandom", "getrandom",
"serde", "serde",

View File

@ -1,7 +1,7 @@
[package] [package]
name = "ap-relay" name = "ap-relay"
description = "A simple activitypub relay" description = "A simple activitypub relay"
version = "0.3.34" version = "0.3.50"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0" license = "AGPL-3.0"
readme = "README.md" readme = "README.md"
@ -29,6 +29,7 @@ activitystreams = "0.7.0-alpha.19"
activitystreams-ext = "0.1.0-alpha.2" activitystreams-ext = "0.1.0-alpha.2"
ammonia = "3.1.0" ammonia = "3.1.0"
awc = { version = "3.0.0", default-features = false, features = ["rustls"] } awc = { version = "3.0.0", default-features = false, features = ["rustls"] }
bcrypt = "0.13"
base64 = "0.13" base64 = "0.13"
clap = { version = "4.0.0", features = ["derive"] } clap = { version = "4.0.0", features = ["derive"] }
config = "0.13.0" config = "0.13.0"

View File

@ -36,6 +36,9 @@ To simply run the server, the command is as follows
$ ./relay $ ./relay
``` ```
#### Administration
> **NOTE:** The server _must be running_ in order to update the lists with the following commands
To learn about any other tasks, the `--help` flag can be passed To learn about any other tasks, the `--help` flag can be passed
```bash ```bash
An activitypub relay An activitypub relay
@ -90,6 +93,8 @@ HTTPS=true
PRETTY_LOG=false PRETTY_LOG=false
PUBLISH_BLOCKS=true PUBLISH_BLOCKS=true
SLED_PATH=./sled/db-0.34 SLED_PATH=./sled/db-0.34
RUST_LOG=warn
API_TOKEN=somepasswordishtoken
OPENTELEMETRY_URL=localhost:4317 OPENTELEMETRY_URL=localhost:4317
TELEGRAM_TOKEN=secret TELEGRAM_TOKEN=secret
TELEGRAM_ADMIN_HANDLE=your_handle TELEGRAM_ADMIN_HANDLE=your_handle
@ -114,8 +119,12 @@ Whether the current server is running on an HTTPS port or not. This is used for
Whether or not to publish a list of blocked domains in the `nodeinfo` metadata for the server. It defaults to `false`. Whether or not to publish a list of blocked domains in the `nodeinfo` metadata for the server. It defaults to `false`.
##### `SLED_PATH` ##### `SLED_PATH`
Where to store the on-disk database of connected servers. This defaults to `./sled/db-0.34`. Where to store the on-disk database of connected servers. This defaults to `./sled/db-0.34`.
##### `RUST_LOG`
The log level to print. Available levels are `ERROR`, `WARN`, `INFO`, `DEBUG`, and `TRACE`. You can also specify module paths to enable some logs but not others, such as `RUST_LOG=warn,tracing_actix_web=info,relay=info`
##### `SOURCE_REPO` ##### `SOURCE_REPO`
The URL to the source code for the relay. This defaults to `https://git.asonix.dog/asonix/relay`, but should be changed if you're running a fork hosted elsewhere. The URL to the source code for the relay. This defaults to `https://git.asonix.dog/asonix/relay`, but should be changed if you're running a fork hosted elsewhere.
##### `API_TOKEN`
The Secret token used to access the admin APIs. This must be set for the commandline to function
##### `OPENTELEMETRY_URL` ##### `OPENTELEMETRY_URL`
A URL for exporting opentelemetry spans. This is mostly useful for debugging. There is no default, since most people probably don't run an opentelemetry collector. A URL for exporting opentelemetry spans. This is mostly useful for debugging. There is no default, since most people probably don't run an opentelemetry collector.
##### `TELEGRAM_TOKEN` ##### `TELEGRAM_TOKEN`

24
src/admin.rs Normal file
View File

@ -0,0 +1,24 @@
use activitystreams::iri_string::types::IriString;
pub mod client;
pub mod routes;
#[derive(serde::Deserialize, serde::Serialize)]
pub(crate) struct Domains {
domains: Vec<String>,
}
#[derive(serde::Deserialize, serde::Serialize)]
pub(crate) struct AllowedDomains {
pub(crate) allowed_domains: Vec<String>,
}
#[derive(serde::Deserialize, serde::Serialize)]
pub(crate) struct BlockedDomains {
pub(crate) blocked_domains: Vec<String>,
}
#[derive(serde::Deserialize, serde::Serialize)]
pub(crate) struct ConnectedActors {
pub(crate) connected_actors: Vec<IriString>,
}

103
src/admin/client.rs Normal file
View File

@ -0,0 +1,103 @@
use crate::{
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains},
config::{AdminUrlKind, Config},
error::{Error, ErrorKind},
};
use awc::Client;
use serde::de::DeserializeOwned;
pub(crate) async fn allow(
client: &Client,
config: &Config,
domains: Vec<String>,
) -> Result<(), Error> {
post_domains(client, config, domains, AdminUrlKind::Allow).await
}
pub(crate) async fn disallow(
client: &Client,
config: &Config,
domains: Vec<String>,
) -> Result<(), Error> {
post_domains(client, config, domains, AdminUrlKind::Disallow).await
}
pub(crate) async fn block(
client: &Client,
config: &Config,
domains: Vec<String>,
) -> Result<(), Error> {
post_domains(client, config, domains, AdminUrlKind::Block).await
}
pub(crate) async fn unblock(
client: &Client,
config: &Config,
domains: Vec<String>,
) -> Result<(), Error> {
post_domains(client, config, domains, AdminUrlKind::Unblock).await
}
pub(crate) async fn allowed(client: &Client, config: &Config) -> Result<AllowedDomains, Error> {
get_results(client, config, AdminUrlKind::Allowed).await
}
pub(crate) async fn blocked(client: &Client, config: &Config) -> Result<BlockedDomains, Error> {
get_results(client, config, AdminUrlKind::Blocked).await
}
pub(crate) async fn connected(client: &Client, config: &Config) -> Result<ConnectedActors, Error> {
get_results(client, config, AdminUrlKind::Connected).await
}
async fn get_results<T: DeserializeOwned>(
client: &Client,
config: &Config,
url_kind: AdminUrlKind,
) -> Result<T, Error> {
let x_api_token = config.x_api_token().ok_or(ErrorKind::MissingApiToken)?;
let iri = config.generate_admin_url(url_kind);
let mut res = client
.get(iri.as_str())
.insert_header(x_api_token)
.send()
.await
.map_err(|e| ErrorKind::SendRequest(iri.to_string(), e.to_string()))?;
if !res.status().is_success() {
return Err(ErrorKind::Status(iri.to_string(), res.status()).into());
}
let t = res
.json()
.await
.map_err(|e| ErrorKind::ReceiveResponse(iri.to_string(), e.to_string()))?;
Ok(t)
}
async fn post_domains(
client: &Client,
config: &Config,
domains: Vec<String>,
url_kind: AdminUrlKind,
) -> Result<(), Error> {
let x_api_token = config.x_api_token().ok_or(ErrorKind::MissingApiToken)?;
let iri = config.generate_admin_url(url_kind);
let res = client
.post(iri.as_str())
.insert_header(x_api_token)
.send_json(&Domains { domains })
.await
.map_err(|e| ErrorKind::SendRequest(iri.to_string(), e.to_string()))?;
if !res.status().is_success() {
tracing::warn!("Failed to allow domains");
}
Ok(())
}

60
src/admin/routes.rs Normal file
View File

@ -0,0 +1,60 @@
use crate::{
admin::{AllowedDomains, BlockedDomains, ConnectedActors, Domains},
error::Error,
extractors::Admin,
};
use actix_web::{web::Json, HttpResponse};
pub(crate) async fn allow(
admin: Admin,
Json(Domains { domains }): Json<Domains>,
) -> Result<HttpResponse, Error> {
admin.db_ref().add_allows(domains).await?;
Ok(HttpResponse::NoContent().finish())
}
pub(crate) async fn disallow(
admin: Admin,
Json(Domains { domains }): Json<Domains>,
) -> Result<HttpResponse, Error> {
admin.db_ref().remove_allows(domains).await?;
Ok(HttpResponse::NoContent().finish())
}
pub(crate) async fn block(
admin: Admin,
Json(Domains { domains }): Json<Domains>,
) -> Result<HttpResponse, Error> {
admin.db_ref().add_blocks(domains).await?;
Ok(HttpResponse::NoContent().finish())
}
pub(crate) async fn unblock(
admin: Admin,
Json(Domains { domains }): Json<Domains>,
) -> Result<HttpResponse, Error> {
admin.db_ref().remove_blocks(domains).await?;
Ok(HttpResponse::NoContent().finish())
}
pub(crate) async fn allowed(admin: Admin) -> Result<Json<AllowedDomains>, Error> {
let allowed_domains = admin.db_ref().allows().await?;
Ok(Json(AllowedDomains { allowed_domains }))
}
pub(crate) async fn blocked(admin: Admin) -> Result<Json<BlockedDomains>, Error> {
let blocked_domains = admin.db_ref().blocks().await?;
Ok(Json(BlockedDomains { blocked_domains }))
}
pub(crate) async fn connected(admin: Admin) -> Result<Json<ConnectedActors>, Error> {
let connected_actors = admin.db_ref().connected_ids().await?;
Ok(Json(ConnectedActors { connected_actors }))
}

View File

@ -17,6 +17,10 @@ pub(crate) struct Args {
} }
impl Args { impl Args {
pub(crate) fn any(&self) -> bool {
!self.blocks.is_empty() || !self.allowed.is_empty() || self.list
}
pub(crate) fn new() -> Self { pub(crate) fn new() -> Self {
Self::parse() Self::parse()
} }

View File

@ -1,6 +1,7 @@
use crate::{ use crate::{
data::{ActorCache, State}, data::{ActorCache, State},
error::Error, error::Error,
extractors::{AdminConfig, XApiToken},
middleware::MyVerify, middleware::MyVerify,
requests::Requests, requests::Requests,
}; };
@ -32,6 +33,7 @@ pub(crate) struct ParsedConfig {
opentelemetry_url: Option<IriString>, opentelemetry_url: Option<IriString>,
telegram_token: Option<String>, telegram_token: Option<String>,
telegram_admin_handle: Option<String>, telegram_admin_handle: Option<String>,
api_token: Option<String>,
} }
#[derive(Clone)] #[derive(Clone)]
@ -49,6 +51,7 @@ pub struct Config {
opentelemetry_url: Option<IriString>, opentelemetry_url: Option<IriString>,
telegram_token: Option<String>, telegram_token: Option<String>,
telegram_admin_handle: Option<String>, telegram_admin_handle: Option<String>,
api_token: Option<String>,
} }
#[derive(Debug)] #[derive(Debug)]
@ -65,6 +68,17 @@ pub enum UrlKind {
Outbox, Outbox,
} }
#[derive(Debug)]
pub enum AdminUrlKind {
Allow,
Disallow,
Block,
Unblock,
Allowed,
Blocked,
Connected,
}
impl std::fmt::Debug for Config { impl std::fmt::Debug for Config {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Config") f.debug_struct("Config")
@ -84,6 +98,7 @@ impl std::fmt::Debug for Config {
) )
.field("telegram_token", &"[redacted]") .field("telegram_token", &"[redacted]")
.field("telegram_admin_handle", &self.telegram_admin_handle) .field("telegram_admin_handle", &self.telegram_admin_handle)
.field("api_token", &"[redacted]")
.finish() .finish()
} }
} }
@ -93,7 +108,7 @@ impl Config {
let config = config::Config::builder() let config = config::Config::builder()
.set_default("hostname", "localhost:8080")? .set_default("hostname", "localhost:8080")?
.set_default("addr", "127.0.0.1")? .set_default("addr", "127.0.0.1")?
.set_default::<_, u64>("port", 8080)? .set_default("port", 8080u64)?
.set_default("debug", true)? .set_default("debug", true)?
.set_default("restricted_mode", false)? .set_default("restricted_mode", false)?
.set_default("validate_signatures", false)? .set_default("validate_signatures", false)?
@ -104,6 +119,7 @@ impl Config {
.set_default("opentelemetry_url", None as Option<&str>)? .set_default("opentelemetry_url", None as Option<&str>)?
.set_default("telegram_token", None as Option<&str>)? .set_default("telegram_token", None as Option<&str>)?
.set_default("telegram_admin_handle", None as Option<&str>)? .set_default("telegram_admin_handle", None as Option<&str>)?
.set_default("api_token", None as Option<&str>)?
.add_source(Environment::default()) .add_source(Environment::default())
.build()?; .build()?;
@ -126,6 +142,7 @@ impl Config {
opentelemetry_url: config.opentelemetry_url, opentelemetry_url: config.opentelemetry_url,
telegram_token: config.telegram_token, telegram_token: config.telegram_token,
telegram_admin_handle: config.telegram_admin_handle, telegram_admin_handle: config.telegram_admin_handle,
api_token: config.api_token,
}) })
} }
@ -158,6 +175,24 @@ impl Config {
} }
} }
pub(crate) fn x_api_token(&self) -> Option<XApiToken> {
self.api_token.clone().map(XApiToken::new)
}
pub(crate) fn admin_config(&self) -> Option<actix_web::web::Data<AdminConfig>> {
if let Some(api_token) = &self.api_token {
match AdminConfig::build(api_token) {
Ok(conf) => Some(actix_web::web::Data::new(conf)),
Err(e) => {
tracing::error!("Error creating admin config: {}", e);
None
}
}
} else {
None
}
}
pub(crate) fn bind_address(&self) -> (IpAddr, u16) { pub(crate) fn bind_address(&self) -> (IpAddr, u16) {
(self.addr, self.port) (self.addr, self.port)
} }
@ -281,4 +316,30 @@ impl Config {
Ok(iri) Ok(iri)
} }
pub(crate) fn generate_admin_url(&self, kind: AdminUrlKind) -> IriString {
self.do_generate_admin_url(kind)
.expect("Generated valid IRI")
}
fn do_generate_admin_url(&self, kind: AdminUrlKind) -> Result<IriString, Error> {
let iri = match kind {
AdminUrlKind::Allow => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/allow")?.as_ref())?,
AdminUrlKind::Disallow => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/disallow")?.as_ref())?,
AdminUrlKind::Block => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/block")?.as_ref())?,
AdminUrlKind::Unblock => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/unblock")?.as_ref())?,
AdminUrlKind::Allowed => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/allowed")?.as_ref())?,
AdminUrlKind::Blocked => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/blocked")?.as_ref())?,
AdminUrlKind::Connected => FixedBaseResolver::new(self.base_uri.as_ref())
.try_resolve(IriRelativeStr::new("api/v1/admin/connected")?.as_ref())?,
};
Ok(iri)
}
} }

View File

@ -37,7 +37,7 @@ impl ActorCache {
ActorCache { db } ActorCache { db }
} }
#[tracing::instrument(name = "Get Actor", skip_all, fields(id = id.to_string().as_str(), requests))] #[tracing::instrument(level = "debug" name = "Get Actor", skip_all, fields(id = id.to_string().as_str(), requests))]
pub(crate) async fn get( pub(crate) async fn get(
&self, &self,
id: &IriString, id: &IriString,
@ -54,7 +54,7 @@ impl ActorCache {
.map(MaybeCached::Fetched) .map(MaybeCached::Fetched)
} }
#[tracing::instrument(name = "Add Connection", skip(self))] #[tracing::instrument(level = "debug", name = "Add Connection", skip(self))]
pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> { pub(crate) async fn add_connection(&self, actor: Actor) -> Result<(), Error> {
let add_connection = self.db.add_connection(actor.id.clone()); let add_connection = self.db.add_connection(actor.id.clone());
let save_actor = self.db.save_actor(actor); let save_actor = self.db.save_actor(actor);
@ -64,12 +64,12 @@ impl ActorCache {
Ok(()) Ok(())
} }
#[tracing::instrument(name = "Remove Connection", skip(self))] #[tracing::instrument(level = "debug", name = "Remove Connection", skip(self))]
pub(crate) async fn remove_connection(&self, actor: &Actor) -> Result<(), Error> { pub(crate) async fn remove_connection(&self, actor: &Actor) -> Result<(), Error> {
self.db.remove_connection(actor.id.clone()).await self.db.remove_connection(actor.id.clone()).await
} }
#[tracing::instrument(name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str(), requests))] #[tracing::instrument(level = "debug", name = "Fetch remote actor", skip_all, fields(id = id.to_string().as_str(), requests))]
pub(crate) async fn get_no_cache( pub(crate) async fn get_no_cache(
&self, &self,
id: &IriString, id: &IriString,

View File

@ -1,14 +1,7 @@
use crate::{ use crate::{db::Db, error::Error};
db::{Db, MediaMeta},
error::Error,
};
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_web::web::Bytes;
use std::time::{Duration, SystemTime};
use uuid::Uuid; use uuid::Uuid;
static MEDIA_DURATION: Duration = Duration::from_secs(60 * 60 * 24 * 2);
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct MediaCache { pub struct MediaCache {
db: Db, db: Db,
@ -19,42 +12,16 @@ impl MediaCache {
MediaCache { db } MediaCache { db }
} }
#[tracing::instrument(name = "Get media uuid", skip_all, fields(url = url.to_string().as_str()))] #[tracing::instrument(level = "debug", name = "Get media uuid", skip_all, fields(url = url.to_string().as_str()))]
pub(crate) async fn get_uuid(&self, url: IriString) -> Result<Option<Uuid>, Error> { pub(crate) async fn get_uuid(&self, url: IriString) -> Result<Option<Uuid>, Error> {
self.db.media_id(url).await self.db.media_id(url).await
} }
#[tracing::instrument(name = "Get media url", skip(self))] #[tracing::instrument(level = "debug", name = "Get media url", skip(self))]
pub(crate) async fn get_url(&self, uuid: Uuid) -> Result<Option<IriString>, Error> { pub(crate) async fn get_url(&self, uuid: Uuid) -> Result<Option<IriString>, Error> {
self.db.media_url(uuid).await self.db.media_url(uuid).await
} }
#[tracing::instrument(name = "Is media outdated", skip(self))]
pub(crate) async fn is_outdated(&self, uuid: Uuid) -> Result<bool, Error> {
if let Some(meta) = self.db.media_meta(uuid).await? {
if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
return Ok(false);
}
}
Ok(true)
}
#[tracing::instrument(name = "Get media bytes", skip(self))]
pub(crate) async fn get_bytes(&self, uuid: Uuid) -> Result<Option<(String, Bytes)>, Error> {
if let Some(meta) = self.db.media_meta(uuid).await? {
if meta.saved_at + MEDIA_DURATION > SystemTime::now() {
return self
.db
.media_bytes(uuid)
.await
.map(|opt| opt.map(|bytes| (meta.media_type, bytes)));
}
}
Ok(None)
}
#[tracing::instrument(name = "Store media url", skip_all, fields(url = url.to_string().as_str()))] #[tracing::instrument(name = "Store media url", skip_all, fields(url = url.to_string().as_str()))]
pub(crate) async fn store_url(&self, url: IriString) -> Result<Uuid, Error> { pub(crate) async fn store_url(&self, url: IriString) -> Result<Uuid, Error> {
let uuid = Uuid::new_v4(); let uuid = Uuid::new_v4();
@ -63,23 +30,4 @@ impl MediaCache {
Ok(uuid) Ok(uuid)
} }
#[tracing::instrument(name = "store media bytes", skip(self, bytes))]
pub(crate) async fn store_bytes(
&self,
uuid: Uuid,
media_type: String,
bytes: Bytes,
) -> Result<(), Error> {
self.db
.save_bytes(
uuid,
MediaMeta {
media_type,
saved_at: SystemTime::now(),
},
bytes,
)
.await
}
} }

View File

@ -34,7 +34,7 @@ impl NodeCache {
NodeCache { db } NodeCache { db }
} }
#[tracing::instrument(name = "Get nodes", skip(self))] #[tracing::instrument(level = "debug", name = "Get nodes", skip(self))]
pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> { pub(crate) async fn nodes(&self) -> Result<Vec<Node>, Error> {
let infos = self.db.connected_info(); let infos = self.db.connected_info();
let instances = self.db.connected_instance(); let instances = self.db.connected_instance();
@ -59,7 +59,7 @@ impl NodeCache {
Ok(vec) Ok(vec)
} }
#[tracing::instrument(name = "Is NodeInfo Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] #[tracing::instrument(level = "debug", name = "Is NodeInfo Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))]
pub(crate) async fn is_nodeinfo_outdated(&self, actor_id: IriString) -> bool { pub(crate) async fn is_nodeinfo_outdated(&self, actor_id: IriString) -> bool {
self.db self.db
.info(actor_id) .info(actor_id)
@ -68,7 +68,7 @@ impl NodeCache {
.unwrap_or(true) .unwrap_or(true)
} }
#[tracing::instrument(name = "Is Contact Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] #[tracing::instrument(level = "debug", name = "Is Contact Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))]
pub(crate) async fn is_contact_outdated(&self, actor_id: IriString) -> bool { pub(crate) async fn is_contact_outdated(&self, actor_id: IriString) -> bool {
self.db self.db
.contact(actor_id) .contact(actor_id)
@ -77,7 +77,7 @@ impl NodeCache {
.unwrap_or(true) .unwrap_or(true)
} }
#[tracing::instrument(name = "Is Instance Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))] #[tracing::instrument(level = "debug", name = "Is Instance Outdated", skip_all, fields(actor_id = actor_id.to_string().as_str()))]
pub(crate) async fn is_instance_outdated(&self, actor_id: IriString) -> bool { pub(crate) async fn is_instance_outdated(&self, actor_id: IriString) -> bool {
self.db self.db
.instance(actor_id) .instance(actor_id)
@ -86,7 +86,7 @@ impl NodeCache {
.unwrap_or(true) .unwrap_or(true)
} }
#[tracing::instrument(name = "Save node info", skip_all, fields(actor_id = actor_id.to_string().as_str(), software, version, reg))] #[tracing::instrument(level = "debug", name = "Save node info", skip_all, fields(actor_id = actor_id.to_string().as_str(), software, version, reg))]
pub(crate) async fn set_info( pub(crate) async fn set_info(
&self, &self,
actor_id: IriString, actor_id: IriString,
@ -108,6 +108,7 @@ impl NodeCache {
} }
#[tracing::instrument( #[tracing::instrument(
level = "debug",
name = "Save instance info", name = "Save instance info",
skip_all, skip_all,
fields( fields(
@ -144,6 +145,7 @@ impl NodeCache {
} }
#[tracing::instrument( #[tracing::instrument(
level = "debug",
name = "Save contact info", name = "Save contact info",
skip_all, skip_all,
fields( fields(

View File

@ -48,6 +48,7 @@ impl State {
} }
#[tracing::instrument( #[tracing::instrument(
level = "debug",
name = "Get inboxes for other domains", name = "Get inboxes for other domains",
skip_all, skip_all,
fields( fields(
@ -85,10 +86,10 @@ impl State {
self.object_cache.write().await.put(object_id, actor_id); self.object_cache.write().await.put(object_id, actor_id);
} }
#[tracing::instrument(name = "Building state", skip_all)] #[tracing::instrument(level = "debug", name = "Building state", skip_all)]
pub(crate) async fn build(db: Db) -> Result<Self, Error> { pub(crate) async fn build(db: Db) -> Result<Self, Error> {
let private_key = if let Ok(Some(key)) = db.private_key().await { let private_key = if let Ok(Some(key)) = db.private_key().await {
tracing::info!("Using existing key"); tracing::debug!("Using existing key");
key key
} else { } else {
tracing::info!("Generating new keys"); tracing::info!("Generating new keys");

View File

@ -3,7 +3,6 @@ use crate::{
error::{Error, ErrorKind}, error::{Error, ErrorKind},
}; };
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_web::web::Bytes;
use rsa::{ use rsa::{
pkcs8::{DecodePrivateKey, EncodePrivateKey}, pkcs8::{DecodePrivateKey, EncodePrivateKey},
RsaPrivateKey, RsaPrivateKey,
@ -26,8 +25,6 @@ struct Inner {
settings: Tree, settings: Tree,
media_url_media_id: Tree, media_url_media_id: Tree,
media_id_media_url: Tree, media_id_media_url: Tree,
media_id_media_bytes: Tree,
media_id_media_meta: Tree,
actor_id_info: Tree, actor_id_info: Tree,
actor_id_instance: Tree, actor_id_instance: Tree,
actor_id_contact: Tree, actor_id_contact: Tree,
@ -63,12 +60,6 @@ impl std::fmt::Debug for Actor {
} }
} }
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct MediaMeta {
pub(crate) media_type: String,
pub(crate) saved_at: SystemTime,
}
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub struct Info { pub struct Info {
pub(crate) software: String, pub(crate) software: String,
@ -253,8 +244,6 @@ impl Db {
settings: db.open_tree("settings")?, settings: db.open_tree("settings")?,
media_url_media_id: db.open_tree("media-url-media-id")?, media_url_media_id: db.open_tree("media-url-media-id")?,
media_id_media_url: db.open_tree("media-id-media-url")?, media_id_media_url: db.open_tree("media-id-media-url")?,
media_id_media_bytes: db.open_tree("media-id-media-bytes")?,
media_id_media_meta: db.open_tree("media-id-media-meta")?,
actor_id_info: db.open_tree("actor-id-info")?, actor_id_info: db.open_tree("actor-id-info")?,
actor_id_instance: db.open_tree("actor-id-instance")?, actor_id_instance: db.open_tree("actor-id-instance")?,
actor_id_contact: db.open_tree("actor-id-contact")?, actor_id_contact: db.open_tree("actor-id-contact")?,
@ -281,10 +270,6 @@ impl Db {
self.unblock(|inner| Ok(inner.connected().collect())).await self.unblock(|inner| Ok(inner.connected().collect())).await
} }
pub(crate) async fn allowed_domains(&self) -> Result<Vec<String>, Error> {
self.unblock(|inner| Ok(inner.allowed().collect())).await
}
pub(crate) async fn save_info(&self, actor_id: IriString, info: Info) -> Result<(), Error> { pub(crate) async fn save_info(&self, actor_id: IriString, info: Info) -> Result<(), Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
let vec = serde_json::to_vec(&info)?; let vec = serde_json::to_vec(&info)?;
@ -396,25 +381,6 @@ impl Db {
.await .await
} }
pub(crate) async fn save_bytes(
&self,
id: Uuid,
meta: MediaMeta,
bytes: Bytes,
) -> Result<(), Error> {
self.unblock(move |inner| {
let vec = serde_json::to_vec(&meta)?;
inner
.media_id_media_bytes
.insert(id.as_bytes(), bytes.as_ref())?;
inner.media_id_media_meta.insert(id.as_bytes(), vec)?;
Ok(())
})
.await
}
pub(crate) async fn media_id(&self, url: IriString) -> Result<Option<Uuid>, Error> { pub(crate) async fn media_id(&self, url: IriString) -> Result<Option<Uuid>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.media_url_media_id.get(url.as_str().as_bytes())? { if let Some(ivec) = inner.media_url_media_id.get(url.as_str().as_bytes())? {
@ -437,29 +403,6 @@ impl Db {
.await .await
} }
pub(crate) async fn media_bytes(&self, id: Uuid) -> Result<Option<Bytes>, Error> {
self.unblock(move |inner| {
if let Some(ivec) = inner.media_id_media_bytes.get(id.as_bytes())? {
Ok(Some(Bytes::copy_from_slice(&ivec)))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn media_meta(&self, id: Uuid) -> Result<Option<MediaMeta>, Error> {
self.unblock(move |inner| {
if let Some(ivec) = inner.media_id_media_meta.get(id.as_bytes())? {
let meta = serde_json::from_slice(&ivec)?;
Ok(Some(meta))
} else {
Ok(None)
}
})
.await
}
pub(crate) async fn blocks(&self) -> Result<Vec<String>, Error> { pub(crate) async fn blocks(&self) -> Result<Vec<String>, Error> {
self.unblock(|inner| Ok(inner.blocks().collect())).await self.unblock(|inner| Ok(inner.blocks().collect())).await
} }

View File

@ -7,7 +7,6 @@ use actix_web::{
}; };
use http_signature_normalization_actix::PrepareSignError; use http_signature_normalization_actix::PrepareSignError;
use std::{convert::Infallible, fmt::Debug, io}; use std::{convert::Infallible, fmt::Debug, io};
use tracing::error;
use tracing_error::SpanTrace; use tracing_error::SpanTrace;
pub(crate) struct Error { pub(crate) struct Error {
@ -15,6 +14,20 @@ pub(crate) struct Error {
kind: ErrorKind, kind: ErrorKind,
} }
impl Error {
pub(crate) fn is_breaker(&self) -> bool {
matches!(self.kind, ErrorKind::Breaker)
}
pub(crate) fn is_not_found(&self) -> bool {
matches!(self.kind, ErrorKind::Status(_, StatusCode::NOT_FOUND))
}
pub(crate) fn is_bad_request(&self) -> bool {
matches!(self.kind, ErrorKind::Status(_, StatusCode::BAD_REQUEST))
}
}
impl std::fmt::Debug for Error { impl std::fmt::Debug for Error {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
writeln!(f, "{:?}", self.kind) writeln!(f, "{:?}", self.kind)
@ -117,9 +130,6 @@ pub(crate) enum ErrorKind {
#[error("{0}")] #[error("{0}")]
HostMismatch(#[from] CheckError), HostMismatch(#[from] CheckError),
#[error("Invalid or missing content type")]
ContentType,
#[error("Couldn't flush buffer")] #[error("Couldn't flush buffer")]
FlushBuffer, FlushBuffer,
@ -164,6 +174,9 @@ pub(crate) enum ErrorKind {
#[error("Failed to extract fields from {0}")] #[error("Failed to extract fields from {0}")]
Extract(&'static str), Extract(&'static str),
#[error("No API Token supplied")]
MissingApiToken,
} }
impl ResponseError for Error { impl ResponseError for Error {

222
src/extractors.rs Normal file
View File

@ -0,0 +1,222 @@
use actix_web::{
dev::Payload,
error::{BlockingError, ParseError},
http::{
header::{from_one_raw_str, Header, HeaderName, HeaderValue, TryIntoHeaderValue},
StatusCode,
},
web::Data,
FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError,
};
use bcrypt::{BcryptError, DEFAULT_COST};
use futures_util::future::LocalBoxFuture;
use http_signature_normalization_actix::prelude::InvalidHeaderValue;
use std::{convert::Infallible, str::FromStr};
use tracing_error::SpanTrace;
use crate::db::Db;
#[derive(Clone)]
pub(crate) struct AdminConfig {
hashed_api_token: String,
}
impl AdminConfig {
pub(crate) fn build(api_token: &str) -> Result<Self, Error> {
Ok(AdminConfig {
hashed_api_token: bcrypt::hash(api_token, DEFAULT_COST).map_err(Error::bcrypt_hash)?,
})
}
fn verify(&self, token: XApiToken) -> Result<bool, Error> {
bcrypt::verify(&token.0, &self.hashed_api_token).map_err(Error::bcrypt_verify)
}
}
pub(crate) struct Admin {
db: Data<Db>,
}
impl Admin {
fn prepare_verify(
req: &HttpRequest,
) -> Result<(Data<Db>, Data<AdminConfig>, XApiToken), Error> {
let hashed_api_token = req
.app_data::<Data<AdminConfig>>()
.ok_or_else(Error::missing_config)?
.clone();
let x_api_token = XApiToken::parse(req).map_err(Error::parse_header)?;
let db = req
.app_data::<Data<Db>>()
.ok_or_else(Error::missing_db)?
.clone();
Ok((db, hashed_api_token, x_api_token))
}
#[tracing::instrument(level = "debug", skip_all)]
async fn verify(
hashed_api_token: Data<AdminConfig>,
x_api_token: XApiToken,
) -> Result<(), Error> {
if actix_web::web::block(move || hashed_api_token.verify(x_api_token))
.await
.map_err(Error::canceled)??
{
return Ok(());
}
Err(Error::invalid())
}
pub(crate) fn db_ref(&self) -> &Db {
&self.db
}
}
#[derive(Debug, thiserror::Error)]
#[error("Failed authentication")]
pub(crate) struct Error {
context: SpanTrace,
#[source]
kind: ErrorKind,
}
impl Error {
fn invalid() -> Self {
Error {
context: SpanTrace::capture(),
kind: ErrorKind::Invalid,
}
}
fn missing_config() -> Self {
Error {
context: SpanTrace::capture(),
kind: ErrorKind::MissingConfig,
}
}
fn missing_db() -> Self {
Error {
context: SpanTrace::capture(),
kind: ErrorKind::MissingDb,
}
}
fn bcrypt_verify(e: BcryptError) -> Self {
Error {
context: SpanTrace::capture(),
kind: ErrorKind::BCryptVerify(e),
}
}
fn bcrypt_hash(e: BcryptError) -> Self {
Error {
context: SpanTrace::capture(),
kind: ErrorKind::BCryptHash(e),
}
}
fn parse_header(e: ParseError) -> Self {
Error {
context: SpanTrace::capture(),
kind: ErrorKind::ParseHeader(e),
}
}
fn canceled(_: BlockingError) -> Self {
Error {
context: SpanTrace::capture(),
kind: ErrorKind::Canceled,
}
}
}
#[derive(Debug, thiserror::Error)]
enum ErrorKind {
#[error("Invalid API Token")]
Invalid,
#[error("Missing Config")]
MissingConfig,
#[error("Missing Db")]
MissingDb,
#[error("Panic in verify")]
Canceled,
#[error("Verifying")]
BCryptVerify(#[source] BcryptError),
#[error("Hashing")]
BCryptHash(#[source] BcryptError),
#[error("Parse Header")]
ParseHeader(#[source] ParseError),
}
impl ResponseError for Error {
fn status_code(&self) -> StatusCode {
match self.kind {
ErrorKind::Invalid | ErrorKind::ParseHeader(_) => StatusCode::BAD_REQUEST,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
fn error_response(&self) -> HttpResponse {
HttpResponse::build(self.status_code())
.json(serde_json::json!({ "msg": self.kind.to_string() }))
}
}
impl FromRequest for Admin {
type Error = Error;
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
let res = Self::prepare_verify(req);
Box::pin(async move {
let (db, c, t) = res?;
Self::verify(c, t).await?;
Ok(Admin { db })
})
}
}
pub(crate) struct XApiToken(String);
impl XApiToken {
pub(crate) fn new(token: String) -> Self {
Self(token)
}
}
impl Header for XApiToken {
fn name() -> HeaderName {
HeaderName::from_static("x-api-token")
}
fn parse<M: HttpMessage>(msg: &M) -> Result<Self, ParseError> {
from_one_raw_str(msg.headers().get(Self::name()))
}
}
impl TryIntoHeaderValue for XApiToken {
type Error = InvalidHeaderValue;
fn try_into_value(self) -> Result<HeaderValue, Self::Error> {
HeaderValue::from_str(&self.0)
}
}
impl FromStr for XApiToken {
type Err = Infallible;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(XApiToken(s.to_string()))
}
}

View File

@ -21,7 +21,7 @@ impl std::fmt::Debug for Announce {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Announce") f.debug_struct("Announce")
.field("object_id", &self.object_id.to_string()) .field("object_id", &self.object_id.to_string())
.field("actor", &self.actor) .field("actor_id", &self.actor.id)
.finish() .finish()
} }
} }

View File

@ -13,12 +13,21 @@ use activitystreams::{
use background_jobs::ActixJob; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Follow { pub(crate) struct Follow {
input: AcceptedActivities, input: AcceptedActivities,
actor: Actor, actor: Actor,
} }
impl std::fmt::Debug for Follow {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Follow")
.field("input", &self.input.id_unchecked())
.field("actor", &self.actor.id)
.finish()
}
}
impl Follow { impl Follow {
pub fn new(input: AcceptedActivities, actor: Actor) -> Self { pub fn new(input: AcceptedActivities, actor: Actor) -> Self {
Follow { input, actor } Follow { input, actor }

View File

@ -8,12 +8,21 @@ use activitystreams::prelude::*;
use background_jobs::ActixJob; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Forward { pub(crate) struct Forward {
input: AcceptedActivities, input: AcceptedActivities,
actor: Actor, actor: Actor,
} }
impl std::fmt::Debug for Forward {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Forward")
.field("input", &self.input.id_unchecked())
.field("actor", &self.actor.id)
.finish()
}
}
impl Forward { impl Forward {
pub fn new(input: AcceptedActivities, actor: Actor) -> Self { pub fn new(input: AcceptedActivities, actor: Actor) -> Self {
Forward { input, actor } Forward { input, actor }

View File

@ -7,9 +7,15 @@ use crate::{
use background_jobs::ActixJob; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Reject(pub(crate) Actor); pub(crate) struct Reject(pub(crate) Actor);
impl std::fmt::Debug for Reject {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Reject").field("actor", &self.0.id).finish()
}
}
impl Reject { impl Reject {
#[tracing::instrument(name = "Reject", skip(state))] #[tracing::instrument(name = "Reject", skip(state))]
async fn perform(self, state: JobState) -> Result<(), Error> { async fn perform(self, state: JobState) -> Result<(), Error> {

View File

@ -5,15 +5,25 @@ use crate::{
error::Error, error::Error,
jobs::{apub::generate_undo_follow, Deliver, JobState}, jobs::{apub::generate_undo_follow, Deliver, JobState},
}; };
use activitystreams::prelude::BaseExt;
use background_jobs::ActixJob; use background_jobs::ActixJob;
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)] #[derive(Clone, serde::Deserialize, serde::Serialize)]
pub(crate) struct Undo { pub(crate) struct Undo {
input: AcceptedActivities, input: AcceptedActivities,
actor: Actor, actor: Actor,
} }
impl std::fmt::Debug for Undo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Undo")
.field("input", &self.input.id_unchecked())
.field("actor", &self.actor.id)
.finish()
}
}
impl Undo { impl Undo {
pub(crate) fn new(input: AcceptedActivities, actor: Actor) -> Self { pub(crate) fn new(input: AcceptedActivities, actor: Actor) -> Self {
Undo { input, actor } Undo { input, actor }

View File

@ -1,44 +0,0 @@
use crate::{error::Error, jobs::JobState};
use background_jobs::ActixJob;
use std::{future::Future, pin::Pin};
use uuid::Uuid;
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
pub(crate) struct CacheMedia {
uuid: Uuid,
}
impl CacheMedia {
pub(crate) fn new(uuid: Uuid) -> Self {
CacheMedia { uuid }
}
#[tracing::instrument(name = "Cache media", skip(state))]
async fn perform(self, state: JobState) -> Result<(), Error> {
if !state.media.is_outdated(self.uuid).await? {
return Ok(());
}
if let Some(url) = state.media.get_url(self.uuid).await? {
let (content_type, bytes) = state.requests.fetch_bytes(url.as_str()).await?;
state
.media
.store_bytes(self.uuid, content_type, bytes)
.await?;
}
Ok(())
}
}
impl ActixJob for CacheMedia {
type State = JobState;
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
const NAME: &'static str = "relay::jobs::CacheMedia";
fn run(self, state: Self::State) -> Self::Future {
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
}
}

View File

@ -40,10 +40,18 @@ impl QueryContact {
return Ok(()); return Ok(());
} }
let contact = state let contact = match state
.requests .requests
.fetch::<AcceptedActors>(self.contact_id.as_str()) .fetch::<AcceptedActors>(self.contact_id.as_str())
.await?; .await
{
Ok(contact) => contact,
Err(e) if e.is_breaker() => {
tracing::debug!("Not retrying due to failed breaker");
return Ok(());
}
Err(e) => return Err(e),
};
let (username, display_name, url, avatar) = let (username, display_name, url, avatar) =
to_contact(contact).ok_or(ErrorKind::Extract("contact"))?; to_contact(contact).ok_or(ErrorKind::Extract("contact"))?;

View File

@ -1,4 +1,7 @@
use crate::{error::Error, jobs::JobState}; use crate::{
error::Error,
jobs::{debug_object, JobState},
};
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use background_jobs::{ActixJob, Backoff}; use background_jobs::{ActixJob, Backoff};
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
@ -13,7 +16,8 @@ impl std::fmt::Debug for Deliver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Deliver") f.debug_struct("Deliver")
.field("to", &self.to.to_string()) .field("to", &self.to.to_string())
.field("data", &self.data) .field("activity", &self.data["type"])
.field("object", debug_object(&self.data))
.finish() .finish()
} }
} }
@ -31,7 +35,17 @@ impl Deliver {
#[tracing::instrument(name = "Deliver", skip(state))] #[tracing::instrument(name = "Deliver", skip(state))]
async fn permform(self, state: JobState) -> Result<(), Error> { async fn permform(self, state: JobState) -> Result<(), Error> {
state.requests.deliver(self.to, &self.data).await?; if let Err(e) = state.requests.deliver(self.to, &self.data).await {
if e.is_breaker() {
tracing::debug!("Not trying due to failed breaker");
return Ok(());
}
if e.is_bad_request() {
tracing::debug!("Server didn't understand the activity");
return Ok(());
}
return Err(e);
}
Ok(()) Ok(())
} }
} }

View File

@ -1,6 +1,6 @@
use crate::{ use crate::{
error::Error, error::Error,
jobs::{Deliver, JobState}, jobs::{debug_object, Deliver, JobState},
}; };
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use background_jobs::ActixJob; use background_jobs::ActixJob;
@ -14,17 +14,9 @@ pub(crate) struct DeliverMany {
impl std::fmt::Debug for DeliverMany { impl std::fmt::Debug for DeliverMany {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let to = format!(
"[{}]",
self.to
.iter()
.map(|u| u.to_string())
.collect::<Vec<_>>()
.join(", ")
);
f.debug_struct("DeliverMany") f.debug_struct("DeliverMany")
.field("to", &to) .field("activity", &self.data["type"])
.field("data", &self.data) .field("object", debug_object(&self.data))
.finish() .finish()
} }
} }

File diff suppressed because one or more lines are too long

View File

@ -1,5 +1,4 @@
pub mod apub; pub mod apub;
mod cache_media;
mod contact; mod contact;
mod deliver; mod deliver;
mod deliver_many; mod deliver_many;
@ -8,7 +7,7 @@ mod nodeinfo;
mod process_listeners; mod process_listeners;
pub(crate) use self::{ pub(crate) use self::{
cache_media::CacheMedia, contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany, contact::QueryContact, deliver::Deliver, deliver_many::DeliverMany,
instance::QueryInstance, nodeinfo::QueryNodeinfo, instance::QueryInstance, nodeinfo::QueryNodeinfo,
}; };
@ -25,6 +24,20 @@ use background_jobs::{
}; };
use std::time::Duration; use std::time::Duration;
fn debug_object(activity: &serde_json::Value) -> &serde_json::Value {
let mut object = &activity["object"]["type"];
if object.is_null() {
object = &activity["object"]["id"];
}
if object.is_null() {
object = &activity["object"];
}
object
}
pub(crate) fn create_workers( pub(crate) fn create_workers(
state: State, state: State,
actors: ActorCache, actors: ActorCache,
@ -45,7 +58,6 @@ pub(crate) fn create_workers(
.register::<QueryNodeinfo>() .register::<QueryNodeinfo>()
.register::<QueryInstance>() .register::<QueryInstance>()
.register::<Listeners>() .register::<Listeners>()
.register::<CacheMedia>()
.register::<QueryContact>() .register::<QueryContact>()
.register::<apub::Announce>() .register::<apub::Announce>()
.register::<apub::Follow>() .register::<apub::Follow>()
@ -124,3 +136,59 @@ impl JobServer {
.map_err(Into::into) .map_err(Into::into)
} }
} }
struct Boolish {
inner: bool,
}
impl std::ops::Deref for Boolish {
type Target = bool;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl<'de> serde::Deserialize<'de> for Boolish {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
#[derive(serde::Deserialize)]
#[serde(untagged)]
enum BoolThing {
Bool(bool),
String(String),
}
let thing: BoolThing = serde::Deserialize::deserialize(deserializer)?;
match thing {
BoolThing::Bool(inner) => Ok(Boolish { inner }),
BoolThing::String(s) if s.to_lowercase() == "false" => Ok(Boolish { inner: false }),
BoolThing::String(_) => Ok(Boolish { inner: true }),
}
}
}
#[cfg(test)]
mod tests {
use super::Boolish;
#[test]
fn boolish_works() {
const CASES: &[(&str, bool)] = &[
("false", false),
("\"false\"", false),
("\"FALSE\"", false),
("true", true),
("\"true\"", true),
("\"anything else\"", true),
];
for (case, output) in CASES {
let b: Boolish = serde_json::from_str(case).unwrap();
assert_eq!(*b, *output);
}
}
}

File diff suppressed because one or more lines are too long

View File

@ -12,12 +12,14 @@ use tracing_error::ErrorLayer;
use tracing_log::LogTracer; use tracing_log::LogTracer;
use tracing_subscriber::{filter::Targets, fmt::format::FmtSpan, layer::SubscriberExt, Layer}; use tracing_subscriber::{filter::Targets, fmt::format::FmtSpan, layer::SubscriberExt, Layer};
mod admin;
mod apub; mod apub;
mod args; mod args;
mod config; mod config;
mod data; mod data;
mod db; mod db;
mod error; mod error;
mod extractors;
mod jobs; mod jobs;
mod middleware; mod middleware;
mod requests; mod requests;
@ -97,30 +99,50 @@ async fn main() -> Result<(), anyhow::Error> {
init_subscriber(Config::software_name(), config.opentelemetry_url())?; init_subscriber(Config::software_name(), config.opentelemetry_url())?;
let db = Db::build(&config)?;
let args = Args::new(); let args = Args::new();
if args.list() { if args.any() {
for domain in db.blocks().await? { let client = requests::build_client(&config.user_agent());
println!("block {}", domain);
if !args.blocks().is_empty() || !args.allowed().is_empty() {
if args.undo() {
admin::client::unblock(&client, &config, args.blocks().to_vec()).await?;
admin::client::disallow(&client, &config, args.allowed().to_vec()).await?;
} else {
admin::client::block(&client, &config, args.blocks().to_vec()).await?;
admin::client::allow(&client, &config, args.allowed().to_vec()).await?;
}
println!("Updated lists");
} }
for domain in db.allows().await? {
println!("allow {}", domain); if args.list() {
let (blocked, allowed, connected) = tokio::try_join!(
admin::client::blocked(&client, &config),
admin::client::allowed(&client, &config),
admin::client::connected(&client, &config)
)?;
let mut report = String::from("Report:\n");
if !allowed.allowed_domains.is_empty() {
report += "\nAllowed\n\t";
report += &allowed.allowed_domains.join("\n\t");
}
if !blocked.blocked_domains.is_empty() {
report += "\n\nBlocked\n\t";
report += &blocked.blocked_domains.join("\n\t");
}
if !connected.connected_actors.is_empty() {
report += "\n\nConnected\n\t";
report += &connected.connected_actors.join("\n\t");
}
report += "\n";
println!("{report}");
} }
return Ok(()); return Ok(());
} }
if !args.blocks().is_empty() || !args.allowed().is_empty() { let db = Db::build(&config)?;
if args.undo() {
db.remove_blocks(args.blocks().to_vec()).await?;
db.remove_allows(args.allowed().to_vec()).await?;
} else {
db.add_blocks(args.blocks().to_vec()).await?;
db.add_allows(args.allowed().to_vec()).await?;
}
return Ok(());
}
let media = MediaCache::new(db.clone()); let media = MediaCache::new(db.clone());
let state = State::build(db.clone()).await?; let state = State::build(db.clone()).await?;
@ -135,15 +157,22 @@ async fn main() -> Result<(), anyhow::Error> {
let bind_address = config.bind_address(); let bind_address = config.bind_address();
HttpServer::new(move || { HttpServer::new(move || {
App::new() let app = App::new()
.wrap(TracingLogger::default())
.app_data(web::Data::new(db.clone())) .app_data(web::Data::new(db.clone()))
.app_data(web::Data::new(state.clone())) .app_data(web::Data::new(state.clone()))
.app_data(web::Data::new(state.requests(&config))) .app_data(web::Data::new(state.requests(&config)))
.app_data(web::Data::new(actors.clone())) .app_data(web::Data::new(actors.clone()))
.app_data(web::Data::new(config.clone())) .app_data(web::Data::new(config.clone()))
.app_data(web::Data::new(job_server.clone())) .app_data(web::Data::new(job_server.clone()))
.app_data(web::Data::new(media.clone())) .app_data(web::Data::new(media.clone()));
let app = if let Some(data) = config.admin_config() {
app.app_data(data)
} else {
app
};
app.wrap(TracingLogger::default())
.service(web::resource("/").route(web::get().to(index))) .service(web::resource("/").route(web::get().to(index)))
.service(web::resource("/media/{path}").route(web::get().to(routes::media))) .service(web::resource("/media/{path}").route(web::get().to(routes::media)))
.service( .service(
@ -165,6 +194,18 @@ async fn main() -> Result<(), anyhow::Error> {
.service(web::resource("/nodeinfo").route(web::get().to(nodeinfo_meta))), .service(web::resource("/nodeinfo").route(web::get().to(nodeinfo_meta))),
) )
.service(web::resource("/static/{filename}").route(web::get().to(statics))) .service(web::resource("/static/{filename}").route(web::get().to(statics)))
.service(
web::scope("/api/v1").service(
web::scope("/admin")
.route("/allow", web::post().to(admin::routes::allow))
.route("/disallow", web::post().to(admin::routes::disallow))
.route("/block", web::post().to(admin::routes::block))
.route("/unblock", web::post().to(admin::routes::unblock))
.route("/allowed", web::get().to(admin::routes::allowed))
.route("/blocked", web::get().to(admin::routes::blocked))
.route("/connected", web::get().to(admin::routes::connected)),
),
)
}) })
.bind(bind_address)? .bind(bind_address)?
.run() .run()

View File

@ -16,7 +16,7 @@ use std::{future::Future, pin::Pin};
pub(crate) struct MyVerify(pub Requests, pub ActorCache, pub State); pub(crate) struct MyVerify(pub Requests, pub ActorCache, pub State);
impl MyVerify { impl MyVerify {
#[tracing::instrument("Verify signature", skip(self))] #[tracing::instrument("Verify signature", skip(self, signature))]
async fn verify( async fn verify(
&self, &self,
algorithm: Option<Algorithm>, algorithm: Option<Algorithm>,
@ -26,6 +26,9 @@ impl MyVerify {
) -> Result<bool, Error> { ) -> Result<bool, Error> {
let public_key_id = iri!(key_id); let public_key_id = iri!(key_id);
// receiving an activity from a domain indicates it is probably online
self.0.reset_breaker(&public_key_id);
let actor_id = if let Some(mut actor_id) = self let actor_id = if let Some(mut actor_id) = self
.2 .2
.db .db

View File

@ -1,10 +1,7 @@
use crate::error::{Error, ErrorKind}; use crate::error::{Error, ErrorKind};
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_web::{ use actix_web::http::header::Date;
http::{header::Date, StatusCode}, use awc::{error::SendRequestError, Client, ClientResponse};
web::Bytes,
};
use awc::Client;
use dashmap::DashMap; use dashmap::DashMap;
use http_signature_normalization_actix::prelude::*; use http_signature_normalization_actix::prelude::*;
use rand::thread_rng; use rand::thread_rng;
@ -20,7 +17,6 @@ use std::{
}, },
time::{Duration, SystemTime}, time::{Duration, SystemTime},
}; };
use tracing::{debug, info, warn};
use tracing_awc::Tracing; use tracing_awc::Tracing;
const ONE_SECOND: u64 = 1; const ONE_SECOND: u64 = 1;
@ -57,6 +53,9 @@ impl Breakers {
let should_write = { let should_write = {
if let Some(mut breaker) = self.inner.get_mut(authority) { if let Some(mut breaker) = self.inner.get_mut(authority) {
breaker.fail(); breaker.fail();
if !breaker.should_try() {
tracing::warn!("Failed breaker for {}", authority);
}
false false
} else { } else {
true true
@ -105,17 +104,12 @@ struct Breaker {
} }
impl Breaker { impl Breaker {
const fn failure_threshold() -> usize { const FAILURE_WAIT: Duration = Duration::from_secs(ONE_DAY);
10 const FAILURE_THRESHOLD: usize = 10;
}
fn failure_wait() -> Duration {
Duration::from_secs(ONE_DAY)
}
fn should_try(&self) -> bool { fn should_try(&self) -> bool {
self.failures < Self::failure_threshold() self.failures < Self::FAILURE_THRESHOLD
|| self.last_attempt + Self::failure_wait() < SystemTime::now() || self.last_attempt + Self::FAILURE_WAIT < SystemTime::now()
} }
fn fail(&mut self) { fn fail(&mut self) {
@ -166,6 +160,14 @@ impl std::fmt::Debug for Requests {
} }
} }
pub(crate) fn build_client(user_agent: &str) -> Client {
Client::builder()
.wrap(Tracing)
.add_default_header(("User-Agent", user_agent.to_string()))
.timeout(Duration::from_secs(15))
.finish()
}
impl Requests { impl Requests {
pub(crate) fn new( pub(crate) fn new(
key_id: String, key_id: String,
@ -174,12 +176,7 @@ impl Requests {
breakers: Breakers, breakers: Breakers,
) -> Self { ) -> Self {
Requests { Requests {
client: Rc::new(RefCell::new( client: Rc::new(RefCell::new(build_client(&user_agent))),
Client::builder()
.wrap(Tracing)
.add_default_header(("User-Agent", user_agent.clone()))
.finish(),
)),
consecutive_errors: Rc::new(AtomicUsize::new(0)), consecutive_errors: Rc::new(AtomicUsize::new(0)),
error_limit: 3, error_limit: 3,
key_id, key_id,
@ -190,14 +187,15 @@ impl Requests {
} }
} }
pub(crate) fn reset_breaker(&self, iri: &IriString) {
self.breakers.succeed(iri);
}
fn count_err(&self) { fn count_err(&self) {
let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed); let count = self.consecutive_errors.fetch_add(1, Ordering::Relaxed);
if count + 1 >= self.error_limit { if count + 1 >= self.error_limit {
warn!("{} consecutive errors, rebuilding http client", count); tracing::warn!("{} consecutive errors, rebuilding http client", count + 1);
*self.client.borrow_mut() = Client::builder() *self.client.borrow_mut() = build_client(&self.user_agent);
.wrap(Tracing)
.add_default_header(("User-Agent", self.user_agent.clone()))
.finish();
self.reset_err(); self.reset_err();
} }
} }
@ -206,7 +204,41 @@ impl Requests {
self.consecutive_errors.swap(0, Ordering::Relaxed); self.consecutive_errors.swap(0, Ordering::Relaxed);
} }
#[tracing::instrument(name = "Fetch Json", skip(self))] async fn check_response(
&self,
parsed_url: &IriString,
res: Result<ClientResponse, SendRequestError>,
) -> Result<ClientResponse, Error> {
if res.is_err() {
self.count_err();
self.breakers.fail(&parsed_url);
}
let mut res =
res.map_err(|e| ErrorKind::SendRequest(parsed_url.to_string(), e.to_string()))?;
self.reset_err();
if !res.status().is_success() {
self.breakers.fail(&parsed_url);
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
tracing::warn!("Response from {}, {}", parsed_url, s);
}
}
}
return Err(ErrorKind::Status(parsed_url.to_string(), res.status()).into());
}
self.breakers.succeed(&parsed_url);
Ok(res)
}
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, Error> pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
@ -214,7 +246,7 @@ impl Requests {
self.do_fetch(url, "application/json").await self.do_fetch(url, "application/json").await
} }
#[tracing::instrument(name = "Fetch Activity+Json", skip(self))] #[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, Error> pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
@ -233,6 +265,7 @@ impl Requests {
} }
let signer = self.signer(); let signer = self.signer();
let span = tracing::Span::current();
let client: Client = self.client.borrow().clone(); let client: Client = self.client.borrow().clone();
let res = client let res = client
@ -242,36 +275,16 @@ impl Requests {
.signature( .signature(
self.config.clone(), self.config.clone(),
self.key_id.clone(), self.key_id.clone(),
move |signing_string| signer.sign(signing_string), move |signing_string| {
span.record("signing_string", signing_string);
span.in_scope(|| signer.sign(signing_string))
},
) )
.await? .await?
.send() .send()
.await; .await;
if res.is_err() { let mut res = self.check_response(&parsed_url, res).await?;
self.count_err();
self.breakers.fail(&parsed_url);
}
let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?;
self.reset_err();
if !res.status().is_success() {
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
debug!("Response from {}, {}", url, s);
}
}
}
self.breakers.fail(&parsed_url);
return Err(ErrorKind::Status(url.to_string(), res.status()).into());
}
self.breakers.succeed(&parsed_url);
let body = res let body = res
.body() .body()
@ -281,80 +294,42 @@ impl Requests {
Ok(serde_json::from_slice(body.as_ref())?) Ok(serde_json::from_slice(body.as_ref())?)
} }
#[tracing::instrument(name = "Fetch Bytes", skip(self))] #[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))]
pub(crate) async fn fetch_bytes(&self, url: &str) -> Result<(String, Bytes), Error> { pub(crate) async fn fetch_response(&self, url: IriString) -> Result<ClientResponse, Error> {
let parsed_url = url.parse::<IriString>()?; if !self.breakers.should_try(&url) {
if !self.breakers.should_try(&parsed_url) {
return Err(ErrorKind::Breaker.into()); return Err(ErrorKind::Breaker.into());
} }
info!("Fetching bytes for {}", url);
let signer = self.signer(); let signer = self.signer();
let span = tracing::Span::current();
let client: Client = self.client.borrow().clone(); let client: Client = self.client.borrow().clone();
let res = client let res = client
.get(url) .get(url.as_str())
.insert_header(("Accept", "*/*")) .insert_header(("Accept", "*/*"))
.insert_header(Date(SystemTime::now().into())) .insert_header(Date(SystemTime::now().into()))
.no_decompress()
.signature( .signature(
self.config.clone(), self.config.clone(),
self.key_id.clone(), self.key_id.clone(),
move |signing_string| signer.sign(signing_string), move |signing_string| {
span.record("signing_string", signing_string);
span.in_scope(|| signer.sign(signing_string))
},
) )
.await? .await?
.send() .send()
.await; .await;
if res.is_err() { let res = self.check_response(&url, res).await?;
self.breakers.fail(&parsed_url);
self.count_err();
}
let mut res = res.map_err(|e| ErrorKind::SendRequest(url.to_string(), e.to_string()))?; Ok(res)
self.reset_err();
let content_type = if let Some(content_type) = res.headers().get("content-type") {
if let Ok(s) = content_type.to_str() {
s.to_owned()
} else {
return Err(ErrorKind::ContentType.into());
}
} else {
return Err(ErrorKind::ContentType.into());
};
if !res.status().is_success() {
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
debug!("Response from {}, {}", url, s);
}
}
}
self.breakers.fail(&parsed_url);
return Err(ErrorKind::Status(url.to_string(), res.status()).into());
}
self.breakers.succeed(&parsed_url);
let bytes = match res.body().limit(1024 * 1024 * 4).await {
Err(e) => {
return Err(ErrorKind::ReceiveResponse(url.to_string(), e.to_string()).into());
}
Ok(bytes) => bytes,
};
Ok((content_type, bytes))
} }
#[tracing::instrument( #[tracing::instrument(
"Deliver to Inbox", "Deliver to Inbox",
skip_all, skip_all,
fields(inbox = inbox.to_string().as_str(), item) fields(inbox = inbox.to_string().as_str(), signing_string)
)] )]
pub(crate) async fn deliver<T>(&self, inbox: IriString, item: &T) -> Result<(), Error> pub(crate) async fn deliver<T>(&self, inbox: IriString, item: &T) -> Result<(), Error>
where where
@ -365,6 +340,7 @@ impl Requests {
} }
let signer = self.signer(); let signer = self.signer();
let span = tracing::Span::current();
let item_string = serde_json::to_string(item)?; let item_string = serde_json::to_string(item)?;
let client: Client = self.client.borrow().clone(); let client: Client = self.client.borrow().clone();
@ -378,39 +354,17 @@ impl Requests {
self.key_id.clone(), self.key_id.clone(),
Sha256::new(), Sha256::new(),
item_string, item_string,
move |signing_string| signer.sign(signing_string), move |signing_string| {
span.record("signing_string", signing_string);
span.in_scope(|| signer.sign(signing_string))
},
) )
.await? .await?
.split(); .split();
let res = req.send_body(body).await; let res = req.send_body(body).await;
if res.is_err() { self.check_response(&inbox, res).await?;
self.count_err();
self.breakers.fail(&inbox);
}
let mut res = res.map_err(|e| ErrorKind::SendRequest(inbox.to_string(), e.to_string()))?;
self.reset_err();
if !res.status().is_success() {
// Bad Request means the server didn't understand our activity - that's fine
if res.status() != StatusCode::BAD_REQUEST {
if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() {
warn!("Response from {}, {}", inbox.as_str(), s);
}
}
}
self.breakers.fail(&inbox);
return Err(ErrorKind::Status(inbox.to_string(), res.status()).into());
}
}
self.breakers.succeed(&inbox);
Ok(()) Ok(())
} }

View File

@ -16,7 +16,7 @@ use activitystreams::{
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpResponse};
use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified}; use http_signature_normalization_actix::prelude::{DigestVerified, SignatureVerified};
#[tracing::instrument(name = "Inbox", skip(actors, client, jobs, config, state))] #[tracing::instrument(name = "Inbox", skip_all)]
pub(crate) async fn route( pub(crate) async fn route(
state: web::Data<State>, state: web::Data<State>,
actors: web::Data<ActorCache>, actors: web::Data<ActorCache>,

View File

@ -1,19 +1,40 @@
use crate::{ use crate::{
config::Config, config::Config,
data::State, data::{Node, State},
error::{Error, ErrorKind}, error::{Error, ErrorKind},
}; };
use actix_web::{web, HttpResponse}; use actix_web::{web, HttpResponse};
use rand::{seq::SliceRandom, thread_rng}; use rand::{seq::SliceRandom, thread_rng};
use std::io::BufWriter; use std::io::BufWriter;
fn open_reg(node: &Node) -> bool {
node.instance
.as_ref()
.map(|i| i.reg)
.or_else(|| node.info.as_ref().map(|i| i.reg))
.unwrap_or(false)
}
#[tracing::instrument(name = "Index", skip(config, state))] #[tracing::instrument(name = "Index", skip(config, state))]
pub(crate) async fn route( pub(crate) async fn route(
state: web::Data<State>, state: web::Data<State>,
config: web::Data<Config>, config: web::Data<Config>,
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let mut nodes = state.node_cache().nodes().await?; let mut nodes = state.node_cache().nodes().await?;
nodes.shuffle(&mut thread_rng());
nodes.sort_by(|lhs, rhs| match (open_reg(lhs), open_reg(rhs)) {
(true, true) | (false, false) => std::cmp::Ordering::Equal,
(true, false) => std::cmp::Ordering::Less,
(false, true) => std::cmp::Ordering::Greater,
});
if let Some((i, _)) = nodes.iter().enumerate().find(|(_, node)| !open_reg(node)) {
nodes[..i].shuffle(&mut thread_rng());
nodes[i..].shuffle(&mut thread_rng());
} else {
nodes.shuffle(&mut thread_rng());
}
let mut buf = BufWriter::new(Vec::new()); let mut buf = BufWriter::new(Vec::new());
crate::templates::index(&mut buf, &nodes, &config)?; crate::templates::index(&mut buf, &nodes, &config)?;

View File

@ -1,8 +1,5 @@
use crate::{data::MediaCache, error::Error, requests::Requests}; use crate::{data::MediaCache, error::Error, requests::Requests};
use actix_web::{ use actix_web::{body::BodyStream, web, HttpResponse};
http::header::{CacheControl, CacheDirective},
web, HttpResponse,
};
use uuid::Uuid; use uuid::Uuid;
#[tracing::instrument(name = "Media", skip(media, requests))] #[tracing::instrument(name = "Media", skip(media, requests))]
@ -13,30 +10,17 @@ pub(crate) async fn route(
) -> Result<HttpResponse, Error> { ) -> Result<HttpResponse, Error> {
let uuid = uuid.into_inner(); let uuid = uuid.into_inner();
if let Some((content_type, bytes)) = media.get_bytes(uuid).await? {
return Ok(cached(content_type, bytes));
}
if let Some(url) = media.get_url(uuid).await? { if let Some(url) = media.get_url(uuid).await? {
let (content_type, bytes) = requests.fetch_bytes(url.as_str()).await?; let res = requests.fetch_response(url).await?;
media let mut response = HttpResponse::build(res.status());
.store_bytes(uuid, content_type.clone(), bytes.clone())
.await?;
return Ok(cached(content_type, bytes)); for (name, value) in res.headers().iter().filter(|(h, _)| *h != "connection") {
response.insert_header((name.clone(), value.clone()));
}
return Ok(response.body(BodyStream::new(res)));
} }
Ok(HttpResponse::NotFound().finish()) Ok(HttpResponse::NotFound().finish())
} }
fn cached(content_type: String, bytes: web::Bytes) -> HttpResponse {
HttpResponse::Ok()
.insert_header(CacheControl(vec![
CacheDirective::Public,
CacheDirective::MaxAge(60 * 60 * 24),
CacheDirective::Extension("immutable".to_owned(), None),
]))
.content_type(content_type)
.body(bytes)
}

View File

@ -105,7 +105,7 @@ async fn answer(bot: Bot, msg: Message, cmd: Command, db: Db) -> ResponseResult<
.await?; .await?;
} }
Command::ListAllowed => { Command::ListAllowed => {
if let Ok(allowed) = db.allowed_domains().await { if let Ok(allowed) = db.allows().await {
bot.send_message(msg.chat.id, allowed.join("\n")).await?; bot.send_message(msg.chat.id, allowed.join("\n")).await?;
} }
} }

View File

@ -25,7 +25,8 @@ templates::{info, instance, statics::index_css},
</header> </header>
<main> <main>
<section> <section>
<h3>Instances fédérées</h3> <h3>@nodes.len() instances fédérées</h3>
<h3>@nodes.len() Connected Servers</h3>
@if nodes.is_empty() { @if nodes.is_empty() {
<p>Aucune instance fédérée en ce moment.</p> <p>Aucune instance fédérée en ce moment.</p>
} else { } else {