Merge tag 'v0.3.82' into max

This commit is contained in:
Maxime Augier 2023-03-05 17:16:33 +01:00
commit c5112cb9bb
25 changed files with 738 additions and 539 deletions

716
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,7 +1,7 @@
[package] [package]
name = "ap-relay" name = "ap-relay"
description = "A simple activitypub relay" description = "A simple activitypub relay"
version = "0.3.79" version = "0.3.82"
authors = ["asonix <asonix@asonix.dog>"] authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0" license = "AGPL-3.0"
readme = "README.md" readme = "README.md"
@ -33,8 +33,8 @@ activitystreams = "0.7.0-alpha.21"
activitystreams-ext = "0.1.0-alpha.3" activitystreams-ext = "0.1.0-alpha.3"
ammonia = "3.1.0" ammonia = "3.1.0"
awc = { version = "3.0.0", default-features = false, features = ["rustls"] } awc = { version = "3.0.0", default-features = false, features = ["rustls"] }
bcrypt = "0.13" bcrypt = "0.14"
base64 = "0.13" base64 = "0.21"
clap = { version = "4.0.0", features = ["derive"] } clap = { version = "4.0.0", features = ["derive"] }
config = "0.13.0" config = "0.13.0"
console-subscriber = { version = "0.1", optional = true } console-subscriber = { version = "0.1", optional = true }
@ -54,16 +54,14 @@ opentelemetry-otlp = "0.11"
pin-project-lite = "0.2.9" pin-project-lite = "0.2.9"
quanta = "0.10.1" quanta = "0.10.1"
rand = "0.8" rand = "0.8"
rsa = "0.7" rsa = { version = "0.8", features = ["sha2"] }
rsa-magic-public-key = "0.6.0" rsa-magic-public-key = "0.7.0"
rustls = "0.20.7" rustls = "0.20.7"
rustls-pemfile = "1.0.1" rustls-pemfile = "1.0.1"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
sha2 = { version = "0.10", features = ["oid"] }
signature = "1.6.4"
sled = "0.34.7" sled = "0.34.7"
teloxide = { version = "0.11.1", default-features = false, features = [ teloxide = { version = "0.12.0", default-features = false, features = [
"ctrlc_handler", "ctrlc_handler",
"macros", "macros",
"rustls", "rustls",
@ -100,8 +98,8 @@ version = "0.7.0"
[build-dependencies] [build-dependencies]
anyhow = "1.0" anyhow = "1.0"
dotenv = "0.15.0" dotenv = "0.15.0"
ructe = { version = "0.15.0", features = ["sass", "mime03"] } ructe = { version = "0.16.0", features = ["sass", "mime03"] }
toml = "0.5.8" toml = "0.7.0"
[profile.dev.package.rsa] [profile.dev.package.rsa]
opt-level = 3 opt-level = 3

View File

@ -6,7 +6,7 @@ _A simple and efficient activitypub relay_
If running docker, you can start the relay with the following command: If running docker, you can start the relay with the following command:
``` ```
$ sudo docker run --rm -it \ $ sudo docker run --rm -it \
-v "./:/mnt/" \ -v "$(pwd):/mnt/" \
-e ADDR=0.0.0.0 \ -e ADDR=0.0.0.0 \
-e SLED_PATH=/mnt/sled/db-0.34 \ -e SLED_PATH=/mnt/sled/db-0.34 \
-p 8080:8080 \ -p 8080:8080 \
@ -182,11 +182,17 @@ example, if the server is `https://relay.my.tld`, the correct URL would be
- Add {anything}, the Add {anything} is relayed verbatim to listening servers. - Add {anything}, the Add {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature signed with a JSON-LD signature
- Remove {anything}, the Remove {anything} is relayed verbatim to listening servers.
Note that this activity will likely be rejected by the listening servers unless it has been
signed with a JSON-LD signature
### Supported Discovery Protocols ### Supported Discovery Protocols
- Webfinger - Webfinger
- NodeInfo - NodeInfo
### Known issues
Pleroma and Akkoma do not support validating JSON-LD signatures, meaning many activities such as Delete, Update, Add, and Remove will be rejected with a message similar to `WARN: Response from https://example.com/inbox, "Invalid HTTP Signature"`. This is normal and not an issue with the relay.
### Contributing ### Contributing
Feel free to open issues for anything you find an issue with. Please note that any contributed code will be licensed under the AGPLv3. Feel free to open issues for anything you find an issue with. Please note that any contributed code will be licensed under the AGPLv3.

View File

@ -5,7 +5,7 @@ fn git_info() {
if let Ok(output) = Command::new("git").args(["rev-parse", "HEAD"]).output() { if let Ok(output) = Command::new("git").args(["rev-parse", "HEAD"]).output() {
if output.status.success() { if output.status.success() {
let git_hash = String::from_utf8_lossy(&output.stdout); let git_hash = String::from_utf8_lossy(&output.stdout);
println!("cargo:rustc-env=GIT_HASH={}", git_hash); println!("cargo:rustc-env=GIT_HASH={git_hash}");
println!("cargo:rustc-env=GIT_SHORT_HASH={}", &git_hash[..8]) println!("cargo:rustc-env=GIT_SHORT_HASH={}", &git_hash[..8])
} }
} }
@ -16,7 +16,7 @@ fn git_info() {
{ {
if output.status.success() { if output.status.success() {
let git_branch = String::from_utf8_lossy(&output.stdout); let git_branch = String::from_utf8_lossy(&output.stdout);
println!("cargo:rustc-env=GIT_BRANCH={}", git_branch); println!("cargo:rustc-env=GIT_BRANCH={git_branch}");
} }
} }
} }
@ -32,11 +32,11 @@ fn version_info() -> Result<(), anyhow::Error> {
let data: toml::Value = toml::from_str(&cargo_data)?; let data: toml::Value = toml::from_str(&cargo_data)?;
if let Some(version) = data["package"]["version"].as_str() { if let Some(version) = data["package"]["version"].as_str() {
println!("cargo:rustc-env=PKG_VERSION={}", version); println!("cargo:rustc-env=PKG_VERSION={version}");
} }
if let Some(name) = data["package"]["name"].as_str() { if let Some(name) = data["package"]["name"].as_str() {
println!("cargo:rustc-env=PKG_NAME={}", name); println!("cargo:rustc-env=PKG_NAME={name}");
} }
Ok(()) Ok(())

View File

@ -40,11 +40,11 @@ impl std::fmt::Display for Counter {
let labels = self let labels = self
.labels .labels
.iter() .iter()
.map(|(k, v)| format!("{}: {}", k, v)) .map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(", "); .join(", ");
write!(f, "{} - {}", labels, self.value) write!(f, "{labels} - {}", self.value)
} }
} }
@ -59,11 +59,11 @@ impl std::fmt::Display for Gauge {
let labels = self let labels = self
.labels .labels
.iter() .iter()
.map(|(k, v)| format!("{}: {}", k, v)) .map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(", "); .join(", ");
write!(f, "{} - {}", labels, self.value) write!(f, "{labels} - {}", self.value)
} }
} }
@ -78,7 +78,7 @@ impl std::fmt::Display for Histogram {
let labels = self let labels = self
.labels .labels
.iter() .iter()
.map(|(k, v)| format!("{}: {}", k, v)) .map(|(k, v)| format!("{k}: {v}"))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(", "); .join(", ");
@ -87,15 +87,15 @@ impl std::fmt::Display for Histogram {
.iter() .iter()
.map(|(k, v)| { .map(|(k, v)| {
if let Some(v) = v { if let Some(v) = v {
format!("{}: {:.6}", k, v) format!("{k}: {v:.6}")
} else { } else {
format!("{}: None,", k) format!("{k}: None,")
} }
}) })
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(", "); .join(", ");
write!(f, "{} - {}", labels, value) write!(f, "{labels} - {value}")
} }
} }
@ -172,18 +172,18 @@ impl Snapshot {
continue; continue;
} }
println!("\t{}", key); println!("\t{key}");
for counter in counters { for counter in counters {
println!("\t\t{}", counter); println!("\t\t{counter}");
} }
} }
for (key, counters) in merging { for (key, counters) in merging {
println!("\t{}", key); println!("\t{key}");
for (_, counter) in counters { for (_, counter) in counters {
if let Some(counter) = counter.merge() { if let Some(counter) = counter.merge() {
println!("\t\t{}", counter); println!("\t\t{counter}");
} }
} }
} }
@ -192,10 +192,10 @@ impl Snapshot {
if !self.gauges.is_empty() { if !self.gauges.is_empty() {
println!("Gauges"); println!("Gauges");
for (key, gauges) in self.gauges { for (key, gauges) in self.gauges {
println!("\t{}", key); println!("\t{key}");
for gauge in gauges { for gauge in gauges {
println!("\t\t{}", gauge); println!("\t\t{gauge}");
} }
} }
} }
@ -203,10 +203,10 @@ impl Snapshot {
if !self.histograms.is_empty() { if !self.histograms.is_empty() {
println!("Histograms"); println!("Histograms");
for (key, histograms) in self.histograms { for (key, histograms) in self.histograms {
println!("\t{}", key); println!("\t{key}");
for histogram in histograms { for histogram in histograms {
println!("\t\t{}", histogram); println!("\t\t{histogram}");
} }
} }
} }

View File

@ -12,8 +12,8 @@ use activitystreams::{
}; };
use config::Environment; use config::Environment;
use http_signature_normalization_actix::prelude::VerifyDigest; use http_signature_normalization_actix::prelude::VerifyDigest;
use rsa::sha2::{Digest, Sha256};
use rustls::{Certificate, PrivateKey}; use rustls::{Certificate, PrivateKey};
use sha2::{Digest, Sha256};
use std::{ use std::{
io::BufReader, io::BufReader,
net::{IpAddr, SocketAddr}, net::{IpAddr, SocketAddr},
@ -170,7 +170,7 @@ impl Config {
let config: ParsedConfig = config.try_deserialize()?; let config: ParsedConfig = config.try_deserialize()?;
let scheme = if config.https { "https" } else { "http" }; let scheme = if config.https { "https" } else { "http" };
let base_uri = iri!(format!("{}://{}", scheme, config.hostname)).into_absolute(); let base_uri = iri!(format!("{scheme}://{}", config.hostname)).into_absolute();
let tls = match (config.tls_key, config.tls_cert) { let tls = match (config.tls_key, config.tls_cert) {
(Some(key), Some(cert)) => Some(TlsConfig { key, cert }), (Some(key), Some(cert)) => Some(TlsConfig { key, cert }),
@ -207,8 +207,8 @@ impl Config {
let source_url = match Self::git_hash() { let source_url = match Self::git_hash() {
Some(hash) => format!( Some(hash) => format!(
"{}{}{}", "{}{}{hash}",
config.source_repo, config.repository_commit_base, hash config.source_repo, config.repository_commit_base
) )
.parse() .parse()
.expect("constructed source URL is valid"), .expect("constructed source URL is valid"),
@ -332,7 +332,7 @@ impl Config {
match AdminConfig::build(api_token) { match AdminConfig::build(api_token) {
Ok(conf) => Some(actix_web::web::Data::new(conf)), Ok(conf) => Some(actix_web::web::Data::new(conf)),
Err(e) => { Err(e) => {
tracing::error!("Error creating admin config: {}", e); tracing::error!("Error creating admin config: {e}");
None None
} }
} }
@ -371,7 +371,7 @@ impl Config {
pub(crate) fn software_version() -> String { pub(crate) fn software_version() -> String {
if let Some(git) = Self::git_version() { if let Some(git) = Self::git_version() {
return format!("v{}-{}", Self::version(), git); return format!("v{}-{git}", Self::version());
} }
format!("v{}", Self::version()) format!("v{}", Self::version())
@ -381,7 +381,7 @@ impl Config {
let branch = Self::git_branch()?; let branch = Self::git_branch()?;
let hash = Self::git_short_hash()?; let hash = Self::git_short_hash()?;
Some(format!("{}-{}", branch, hash)) Some(format!("{branch}-{hash}"))
} }
fn name() -> &'static str { fn name() -> &'static str {
@ -463,7 +463,7 @@ impl Config {
resolved resolved
} }
UrlKind::Media(uuid) => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::Media(uuid) => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new(&format!("media/{}", uuid))?.as_ref()) .resolve(IriRelativeStr::new(&format!("media/{uuid}"))?.as_ref())
.try_to_dedicated_string()?, .try_to_dedicated_string()?,
UrlKind::NodeInfo => FixedBaseResolver::new(self.base_uri.as_ref()) UrlKind::NodeInfo => FixedBaseResolver::new(self.base_uri.as_ref())
.resolve(IriRelativeStr::new("nodeinfo/2.0.json")?.as_ref()) .resolve(IriRelativeStr::new("nodeinfo/2.0.json")?.as_ref())

View File

@ -71,7 +71,7 @@ impl ActorCache {
id: &IriString, id: &IriString,
requests: &Requests, requests: &Requests,
) -> Result<Actor, Error> { ) -> Result<Actor, Error> {
let accepted_actor = requests.fetch::<AcceptedActors>(id.as_str()).await?; let accepted_actor = requests.fetch::<AcceptedActors>(id).await?;
let input_authority = id.authority_components().ok_or(ErrorKind::MissingDomain)?; let input_authority = id.authority_components().ok_or(ErrorKind::MissingDomain)?;
let accepted_actor_id = accepted_actor let accepted_actor_id = accepted_actor

View File

@ -182,7 +182,7 @@ impl Node {
let authority = url.authority_str().ok_or(ErrorKind::MissingDomain)?; let authority = url.authority_str().ok_or(ErrorKind::MissingDomain)?;
let scheme = url.scheme_str(); let scheme = url.scheme_str();
let base = iri!(format!("{}://{}", scheme, authority)); let base = iri!(format!("{scheme}://{authority}"));
Ok(Node { Ok(Node {
base, base,

104
src/db.rs
View File

@ -10,7 +10,10 @@ use rsa::{
use sled::{Batch, Tree}; use sled::{Batch, Tree};
use std::{ use std::{
collections::{BTreeMap, HashMap}, collections::{BTreeMap, HashMap},
sync::Arc, sync::{
atomic::{AtomicU64, Ordering},
Arc,
},
time::SystemTime, time::SystemTime,
}; };
use time::OffsetDateTime; use time::OffsetDateTime;
@ -22,6 +25,8 @@ pub(crate) struct Db {
} }
struct Inner { struct Inner {
healthz: Tree,
healthz_counter: Arc<AtomicU64>,
actor_id_actor: Tree, actor_id_actor: Tree,
public_key_id_actor_id: Tree, public_key_id_actor_id: Tree,
connected_actor_ids: Tree, connected_actor_ids: Tree,
@ -242,6 +247,8 @@ impl Db {
fn build_inner(restricted_mode: bool, db: sled::Db) -> Result<Self, Error> { fn build_inner(restricted_mode: bool, db: sled::Db) -> Result<Self, Error> {
Ok(Db { Ok(Db {
inner: Arc::new(Inner { inner: Arc::new(Inner {
healthz: db.open_tree("healthz")?,
healthz_counter: Arc::new(AtomicU64::new(0)),
actor_id_actor: db.open_tree("actor-id-actor")?, actor_id_actor: db.open_tree("actor-id-actor")?,
public_key_id_actor_id: db.open_tree("public-key-id-actor-id")?, public_key_id_actor_id: db.open_tree("public-key-id-actor-id")?,
connected_actor_ids: db.open_tree("connected-actor-ids")?, connected_actor_ids: db.open_tree("connected-actor-ids")?,
@ -273,6 +280,21 @@ impl Db {
Ok(t) Ok(t)
} }
pub(crate) async fn check_health(&self) -> Result<(), Error> {
let next = self.inner.healthz_counter.fetch_add(1, Ordering::Relaxed);
self.unblock(move |inner| {
inner
.healthz
.insert("healthz", &next.to_be_bytes()[..])
.map_err(Error::from)
})
.await?;
self.inner.healthz.flush_async().await?;
self.unblock(move |inner| inner.healthz.get("healthz").map_err(Error::from))
.await?;
Ok(())
}
pub(crate) async fn mark_last_seen( pub(crate) async fn mark_last_seen(
&self, &self,
nodes: HashMap<String, OffsetDateTime>, nodes: HashMap<String, OffsetDateTime>,
@ -334,12 +356,12 @@ impl Db {
pub(crate) async fn info(&self, actor_id: IriString) -> Result<Option<Info>, Error> { pub(crate) async fn info(&self, actor_id: IriString) -> Result<Option<Info>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_info.get(actor_id.as_str().as_bytes())? { inner
let info = serde_json::from_slice(&ivec)?; .actor_id_info
Ok(Some(info)) .get(actor_id.as_str().as_bytes())?
} else { .map(|ivec| serde_json::from_slice(&ivec))
Ok(None) .transpose()
} .map_err(Error::from)
}) })
.await .await
} }
@ -368,12 +390,12 @@ impl Db {
pub(crate) async fn instance(&self, actor_id: IriString) -> Result<Option<Instance>, Error> { pub(crate) async fn instance(&self, actor_id: IriString) -> Result<Option<Instance>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_instance.get(actor_id.as_str().as_bytes())? { inner
let instance = serde_json::from_slice(&ivec)?; .actor_id_instance
Ok(Some(instance)) .get(actor_id.as_str().as_bytes())?
} else { .map(|ivec| serde_json::from_slice(&ivec))
Ok(None) .transpose()
} .map_err(Error::from)
}) })
.await .await
} }
@ -402,12 +424,12 @@ impl Db {
pub(crate) async fn contact(&self, actor_id: IriString) -> Result<Option<Contact>, Error> { pub(crate) async fn contact(&self, actor_id: IriString) -> Result<Option<Contact>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_contact.get(actor_id.as_str().as_bytes())? { inner
let contact = serde_json::from_slice(&ivec)?; .actor_id_contact
Ok(Some(contact)) .get(actor_id.as_str().as_bytes())?
} else { .map(|ivec| serde_json::from_slice(&ivec))
Ok(None) .transpose()
} .map_err(Error::from)
}) })
.await .await
} }
@ -432,22 +454,20 @@ impl Db {
pub(crate) async fn media_id(&self, url: IriString) -> Result<Option<Uuid>, Error> { pub(crate) async fn media_id(&self, url: IriString) -> Result<Option<Uuid>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.media_url_media_id.get(url.as_str().as_bytes())? { Ok(inner
Ok(uuid_from_ivec(ivec)) .media_url_media_id
} else { .get(url.as_str().as_bytes())?
Ok(None) .and_then(uuid_from_ivec))
}
}) })
.await .await
} }
pub(crate) async fn media_url(&self, id: Uuid) -> Result<Option<IriString>, Error> { pub(crate) async fn media_url(&self, id: Uuid) -> Result<Option<IriString>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.media_id_media_url.get(id.as_bytes())? { Ok(inner
Ok(url_from_ivec(ivec)) .media_id_media_url
} else { .get(id.as_bytes())?
Ok(None) .and_then(url_from_ivec))
}
}) })
.await .await
} }
@ -468,7 +488,7 @@ impl Db {
pub(crate) async fn is_connected(&self, base_id: IriString) -> Result<bool, Error> { pub(crate) async fn is_connected(&self, base_id: IriString) -> Result<bool, Error> {
let scheme = base_id.scheme_str(); let scheme = base_id.scheme_str();
let authority = base_id.authority_str().ok_or(ErrorKind::MissingDomain)?; let authority = base_id.authority_str().ok_or(ErrorKind::MissingDomain)?;
let prefix = format!("{}://{}", scheme, authority); let prefix = format!("{scheme}://{authority}");
self.unblock(move |inner| { self.unblock(move |inner| {
let connected = inner let connected = inner
@ -487,26 +507,22 @@ impl Db {
public_key_id: IriString, public_key_id: IriString,
) -> Result<Option<IriString>, Error> { ) -> Result<Option<IriString>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner Ok(inner
.public_key_id_actor_id .public_key_id_actor_id
.get(public_key_id.as_str().as_bytes())? .get(public_key_id.as_str().as_bytes())?
{ .and_then(url_from_ivec))
Ok(url_from_ivec(ivec))
} else {
Ok(None)
}
}) })
.await .await
} }
pub(crate) async fn actor(&self, actor_id: IriString) -> Result<Option<Actor>, Error> { pub(crate) async fn actor(&self, actor_id: IriString) -> Result<Option<Actor>, Error> {
self.unblock(move |inner| { self.unblock(move |inner| {
if let Some(ivec) = inner.actor_id_actor.get(actor_id.as_str().as_bytes())? { inner
let actor = serde_json::from_slice(&ivec)?; .actor_id_actor
Ok(Some(actor)) .get(actor_id.as_str().as_bytes())?
} else { .map(|ivec| serde_json::from_slice(&ivec))
Ok(None) .transpose()
} .map_err(Error::from)
}) })
.await .await
} }
@ -528,7 +544,7 @@ impl Db {
} }
pub(crate) async fn remove_connection(&self, actor_id: IriString) -> Result<(), Error> { pub(crate) async fn remove_connection(&self, actor_id: IriString) -> Result<(), Error> {
tracing::debug!("Removing Connection: {}", actor_id); tracing::debug!("Removing Connection: {actor_id}");
self.unblock(move |inner| { self.unblock(move |inner| {
inner inner
.connected_actor_ids .connected_actor_ids
@ -540,7 +556,7 @@ impl Db {
} }
pub(crate) async fn add_connection(&self, actor_id: IriString) -> Result<(), Error> { pub(crate) async fn add_connection(&self, actor_id: IriString) -> Result<(), Error> {
tracing::debug!("Adding Connection: {}", actor_id); tracing::debug!("Adding Connection: {actor_id}");
self.unblock(move |inner| { self.unblock(move |inner| {
inner inner
.connected_actor_ids .connected_actor_ids

View File

@ -30,6 +30,10 @@ impl Error {
pub(crate) fn is_gone(&self) -> bool { pub(crate) fn is_gone(&self) -> bool {
matches!(self.kind, ErrorKind::Status(_, StatusCode::GONE)) matches!(self.kind, ErrorKind::Status(_, StatusCode::GONE))
} }
pub(crate) fn is_malformed_json(&self) -> bool {
matches!(self.kind, ErrorKind::Json(_))
}
} }
impl std::fmt::Debug for Error { impl std::fmt::Debug for Error {
@ -99,13 +103,13 @@ pub(crate) enum ErrorKind {
PrepareSign(#[from] PrepareSignError), PrepareSign(#[from] PrepareSignError),
#[error("Couldn't sign digest")] #[error("Couldn't sign digest")]
Signature(#[from] signature::Error), Signature(#[from] rsa::signature::Error),
#[error("Couldn't read signature")] #[error("Couldn't read signature")]
ReadSignature(signature::Error), ReadSignature(rsa::signature::Error),
#[error("Couldn't verify signature")] #[error("Couldn't verify signature")]
VerifySignature(signature::Error), VerifySignature(rsa::signature::Error),
#[error("Couldn't parse the signature header")] #[error("Couldn't parse the signature header")]
HeaderValidation(#[from] actix_web::http::header::InvalidHeaderValue), HeaderValidation(#[from] actix_web::http::header::InvalidHeaderValue),

View File

@ -42,7 +42,7 @@ impl QueryContact {
let contact = match state let contact = match state
.requests .requests
.fetch::<AcceptedActors>(self.contact_id.as_str()) .fetch::<AcceptedActors>(&self.contact_id)
.await .await
{ {
Ok(contact) => contact, Ok(contact) => contact,

View File

@ -35,7 +35,7 @@ impl Deliver {
#[tracing::instrument(name = "Deliver", skip(state))] #[tracing::instrument(name = "Deliver", skip(state))]
async fn permform(self, state: JobState) -> Result<(), Error> { async fn permform(self, state: JobState) -> Result<(), Error> {
if let Err(e) = state.requests.deliver(self.to, &self.data).await { if let Err(e) = state.requests.deliver(&self.to, &self.data).await {
if e.is_breaker() { if e.is_breaker() {
tracing::debug!("Not trying due to failed breaker"); tracing::debug!("Not trying due to failed breaker");
return Ok(()); return Ok(());

File diff suppressed because one or more lines are too long

View File

@ -39,11 +39,11 @@ impl QueryNodeinfo {
.authority_str() .authority_str()
.ok_or(ErrorKind::MissingDomain)?; .ok_or(ErrorKind::MissingDomain)?;
let scheme = self.actor_id.scheme_str(); let scheme = self.actor_id.scheme_str();
let well_known_uri = iri!(format!("{}://{}/.well-known/nodeinfo", scheme, authority)); let well_known_uri = iri!(format!("{scheme}://{authority}/.well-known/nodeinfo"));
let well_known = match state let well_known = match state
.requests .requests
.fetch_json::<WellKnown>(well_known_uri.as_str()) .fetch_json::<WellKnown>(&well_known_uri)
.await .await
{ {
Ok(well_known) => well_known, Ok(well_known) => well_known,
@ -55,7 +55,7 @@ impl QueryNodeinfo {
}; };
let href = if let Some(link) = well_known.links.into_iter().find(|l| l.rel.is_supported()) { let href = if let Some(link) = well_known.links.into_iter().find(|l| l.rel.is_supported()) {
link.href iri!(&link.href)
} else { } else {
return Ok(()); return Ok(());
}; };
@ -168,7 +168,7 @@ impl<'de> serde::de::Visitor<'de> for SupportedVersionVisitor {
type Value = SupportedVersion; type Value = SupportedVersion;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "a string starting with '{}'", SUPPORTED_VERSIONS) write!(f, "a string starting with '{SUPPORTED_VERSIONS}'")
} }
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E> fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>
@ -187,7 +187,7 @@ impl<'de> serde::de::Visitor<'de> for SupportedNodeinfoVisitor {
type Value = SupportedNodeinfo; type Value = SupportedNodeinfo;
fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { fn expecting(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "a string starting with '{}'", SUPPORTED_NODEINFO) write!(f, "a string starting with '{SUPPORTED_NODEINFO}'")
} }
fn visit_str<E>(self, s: &str) -> Result<Self::Value, E> fn visit_str<E>(self, s: &str) -> Result<Self::Value, E>

View File

@ -39,7 +39,7 @@ use self::{
db::Db, db::Db,
jobs::create_workers, jobs::create_workers,
middleware::{DebugPayload, MyVerify, RelayResolver, Timings}, middleware::{DebugPayload, MyVerify, RelayResolver, Timings},
routes::{actor, inbox, index, nodeinfo, nodeinfo_meta, statics}, routes::{actor, healthz, inbox, index, nodeinfo, nodeinfo_meta, statics},
}; };
fn init_subscriber( fn init_subscriber(
@ -273,6 +273,7 @@ async fn do_server_main(
app.wrap(Compress::default()) app.wrap(Compress::default())
.wrap(TracingLogger::default()) .wrap(TracingLogger::default())
.wrap(Timings) .wrap(Timings)
.route("/healthz", web::get().to(healthz))
.service(web::resource("/").route(web::get().to(index))) .service(web::resource("/").route(web::get().to(index)))
.service(web::resource("/media/{path}").route(web::get().to(routes::media))) .service(web::resource("/media/{path}").route(web::get().to(routes::media)))
.service( .service(

View File

@ -6,10 +6,12 @@ use crate::{
}; };
use activitystreams::{base::BaseExt, iri, iri_string::types::IriString}; use activitystreams::{base::BaseExt, iri, iri_string::types::IriString};
use actix_web::web; use actix_web::web;
use base64::{engine::general_purpose::STANDARD, Engine};
use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm}; use http_signature_normalization_actix::{prelude::*, verify::DeprecatedAlgorithm};
use rsa::{pkcs1v15::VerifyingKey, pkcs8::DecodePublicKey, RsaPublicKey}; use rsa::{
use sha2::{Digest, Sha256}; pkcs1v15::Signature, pkcs1v15::VerifyingKey, pkcs8::DecodePublicKey, sha2::Sha256,
use signature::{DigestVerifier, Signature}; signature::Verifier, RsaPublicKey,
};
use std::{future::Future, pin::Pin}; use std::{future::Future, pin::Pin};
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -65,15 +67,11 @@ impl MyVerify {
actor_id actor_id
} else { } else {
match self match self.0.fetch::<PublicKeyResponse>(&public_key_id).await {
.0
.fetch::<PublicKeyResponse>(public_key_id.as_str())
.await
{
Ok(res) => res.actor_id().ok_or(ErrorKind::MissingId), Ok(res) => res.actor_id().ok_or(ErrorKind::MissingId),
Err(e) => { Err(e) => {
if e.is_gone() { if e.is_gone() {
tracing::warn!("Actor gone: {}", public_key_id); tracing::warn!("Actor gone: {public_key_id}");
return Ok(false); return Ok(false);
} else { } else {
return Err(e); return Err(e);
@ -127,13 +125,13 @@ async fn do_verify(
let span = tracing::Span::current(); let span = tracing::Span::current();
web::block(move || { web::block(move || {
span.in_scope(|| { span.in_scope(|| {
let decoded = base64::decode(signature)?; let decoded = STANDARD.decode(signature)?;
let signature = Signature::from_bytes(&decoded).map_err(ErrorKind::ReadSignature)?; let signature =
let hashed = Sha256::new_with_prefix(signing_string.as_bytes()); Signature::try_from(decoded.as_slice()).map_err(ErrorKind::ReadSignature)?;
let verifying_key = VerifyingKey::new_with_prefix(public_key); let verifying_key = VerifyingKey::<Sha256>::new_with_prefix(public_key);
verifying_key verifying_key
.verify_digest(hashed, &signature) .verify(signing_string.as_bytes(), &signature)
.map_err(ErrorKind::VerifySignature)?; .map_err(ErrorKind::VerifySignature)?;
Ok(()) as Result<(), Error> Ok(()) as Result<(), Error>
@ -176,13 +174,13 @@ mod tests {
#[test] #[test]
fn handles_masto_keys() { fn handles_masto_keys() {
println!("{}", ASONIX_DOG_KEY); println!("{ASONIX_DOG_KEY}");
let _ = RsaPublicKey::from_public_key_pem(ASONIX_DOG_KEY.trim()).unwrap(); let _ = RsaPublicKey::from_public_key_pem(ASONIX_DOG_KEY.trim()).unwrap();
} }
#[test] #[test]
fn handles_pleromo_keys() { fn handles_pleromo_keys() {
println!("{}", KARJALAZET_KEY); println!("{KARJALAZET_KEY}");
let _ = RsaPublicKey::from_public_key_pem(KARJALAZET_KEY.trim()).unwrap(); let _ = RsaPublicKey::from_public_key_pem(KARJALAZET_KEY.trim()).unwrap();
} }

View File

@ -5,12 +5,16 @@ use crate::{
use activitystreams::iri_string::types::IriString; use activitystreams::iri_string::types::IriString;
use actix_web::http::header::Date; use actix_web::http::header::Date;
use awc::{error::SendRequestError, Client, ClientResponse}; use awc::{error::SendRequestError, Client, ClientResponse};
use base64::{engine::general_purpose::STANDARD, Engine};
use dashmap::DashMap; use dashmap::DashMap;
use http_signature_normalization_actix::prelude::*; use http_signature_normalization_actix::prelude::*;
use rand::thread_rng; use rand::thread_rng;
use rsa::{pkcs1v15::SigningKey, RsaPrivateKey}; use rsa::{
use sha2::{Digest, Sha256}; pkcs1v15::SigningKey,
use signature::RandomizedSigner; sha2::{Digest, Sha256},
signature::RandomizedSigner,
RsaPrivateKey,
};
use std::{ use std::{
cell::RefCell, cell::RefCell,
rc::Rc, rc::Rc,
@ -57,7 +61,7 @@ impl Breakers {
if let Some(mut breaker) = self.inner.get_mut(authority) { if let Some(mut breaker) = self.inner.get_mut(authority) {
breaker.fail(); breaker.fail();
if !breaker.should_try() { if !breaker.should_try() {
tracing::warn!("Failed breaker for {}", authority); tracing::warn!("Failed breaker for {authority}");
} }
false false
} else { } else {
@ -225,13 +229,13 @@ impl Requests {
self.reset_err(); self.reset_err();
if !res.status().is_success() { if res.status().is_server_error() {
self.breakers.fail(&parsed_url); self.breakers.fail(&parsed_url);
if let Ok(bytes) = res.body().await { if let Ok(bytes) = res.body().await {
if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) { if let Ok(s) = String::from_utf8(bytes.as_ref().to_vec()) {
if !s.is_empty() { if !s.is_empty() {
tracing::warn!("Response from {}, {}", parsed_url, s); tracing::warn!("Response from {parsed_url}, {s}");
} }
} }
} }
@ -246,52 +250,48 @@ impl Requests {
} }
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))] #[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch_json<T>(&self, url: &str) -> Result<T, Error> pub(crate) async fn fetch_json<T>(&self, url: &IriString) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
self.do_fetch(url, "application/json").await self.do_fetch(url, "application/json").await
} }
#[tracing::instrument(name = "Fetch Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch_json_msky<T>(&self, url: &IriString) -> Result<T, Error>
where
T: serde::de::DeserializeOwned,
{
let mut res = self
.do_deliver(
url,
&serde_json::json!({}),
"application/json",
"application/json",
)
.await?;
let body = res
.body()
.await
.map_err(|e| ErrorKind::ReceiveResponse(url.to_string(), e.to_string()))?;
Ok(serde_json::from_slice(body.as_ref())?)
}
#[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))] #[tracing::instrument(name = "Fetch Activity+Json", skip(self), fields(signing_string))]
pub(crate) async fn fetch<T>(&self, url: &str) -> Result<T, Error> pub(crate) async fn fetch<T>(&self, url: &IriString) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
self.do_fetch(url, "application/activity+json").await self.do_fetch(url, "application/activity+json").await
} }
async fn do_fetch<T>(&self, url: &str, accept: &str) -> Result<T, Error> async fn do_fetch<T>(&self, url: &IriString, accept: &str) -> Result<T, Error>
where where
T: serde::de::DeserializeOwned, T: serde::de::DeserializeOwned,
{ {
let parsed_url = url.parse::<IriString>()?; let mut res = self.do_fetch_response(url, accept).await?;
if !self.breakers.should_try(&parsed_url) {
return Err(ErrorKind::Breaker.into());
}
let signer = self.signer();
let span = tracing::Span::current();
let client: Client = self.client.borrow().clone();
let res = client
.get(url)
.insert_header(("Accept", accept))
.insert_header(Date(SystemTime::now().into()))
.signature(
self.config.clone(),
self.key_id.clone(),
move |signing_string| {
span.record("signing_string", signing_string);
span.in_scope(|| signer.sign(signing_string))
},
)
.await?
.send()
.await;
let mut res = self.check_response(&parsed_url, res).await?;
let body = res let body = res
.body() .body()
@ -302,8 +302,16 @@ impl Requests {
} }
#[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))] #[tracing::instrument(name = "Fetch response", skip(self), fields(signing_string))]
pub(crate) async fn fetch_response(&self, url: IriString) -> Result<ClientResponse, Error> { pub(crate) async fn fetch_response(&self, url: &IriString) -> Result<ClientResponse, Error> {
if !self.breakers.should_try(&url) { self.do_fetch_response(url, "*/*").await
}
pub(crate) async fn do_fetch_response(
&self,
url: &IriString,
accept: &str,
) -> Result<ClientResponse, Error> {
if !self.breakers.should_try(url) {
return Err(ErrorKind::Breaker.into()); return Err(ErrorKind::Breaker.into());
} }
@ -313,7 +321,7 @@ impl Requests {
let client: Client = self.client.borrow().clone(); let client: Client = self.client.borrow().clone();
let res = client let res = client
.get(url.as_str()) .get(url.as_str())
.insert_header(("Accept", "*/*")) .insert_header(("Accept", accept))
.insert_header(Date(SystemTime::now().into())) .insert_header(Date(SystemTime::now().into()))
.no_decompress() .no_decompress()
.signature( .signature(
@ -328,7 +336,7 @@ impl Requests {
.send() .send()
.await; .await;
let res = self.check_response(&url, res).await?; let res = self.check_response(url, res).await?;
Ok(res) Ok(res)
} }
@ -338,7 +346,27 @@ impl Requests {
skip_all, skip_all,
fields(inbox = inbox.to_string().as_str(), signing_string) fields(inbox = inbox.to_string().as_str(), signing_string)
)] )]
pub(crate) async fn deliver<T>(&self, inbox: IriString, item: &T) -> Result<(), Error> pub(crate) async fn deliver<T>(&self, inbox: &IriString, item: &T) -> Result<(), Error>
where
T: serde::ser::Serialize + std::fmt::Debug,
{
self.do_deliver(
inbox,
item,
"application/activity+json",
"application/activity+json",
)
.await?;
Ok(())
}
async fn do_deliver<T>(
&self,
inbox: &IriString,
item: &T,
content_type: &str,
accept: &str,
) -> Result<ClientResponse, Error>
where where
T: serde::ser::Serialize + std::fmt::Debug, T: serde::ser::Serialize + std::fmt::Debug,
{ {
@ -353,8 +381,8 @@ impl Requests {
let client: Client = self.client.borrow().clone(); let client: Client = self.client.borrow().clone();
let (req, body) = client let (req, body) = client
.post(inbox.as_str()) .post(inbox.as_str())
.insert_header(("Accept", "application/activity+json")) .insert_header(("Accept", accept))
.insert_header(("Content-Type", "application/activity+json")) .insert_header(("Content-Type", content_type))
.insert_header(Date(SystemTime::now().into())) .insert_header(Date(SystemTime::now().into()))
.signature_with_digest( .signature_with_digest(
self.config.clone(), self.config.clone(),
@ -371,9 +399,9 @@ impl Requests {
let res = req.send_body(body).await; let res = req.send_body(body).await;
self.check_response(&inbox, res).await?; let res = self.check_response(inbox, res).await?;
Ok(()) Ok(res)
} }
fn signer(&self) -> Signer { fn signer(&self) -> Signer {
@ -390,7 +418,8 @@ struct Signer {
impl Signer { impl Signer {
fn sign(&self, signing_string: &str) -> Result<String, Error> { fn sign(&self, signing_string: &str) -> Result<String, Error> {
let signing_key = SigningKey::<Sha256>::new_with_prefix(self.private_key.clone()); let signing_key = SigningKey::<Sha256>::new_with_prefix(self.private_key.clone());
let signature = signing_key.try_sign_with_rng(thread_rng(), signing_string.as_bytes())?; let signature =
Ok(base64::encode(signature.as_ref())) signing_key.try_sign_with_rng(&mut thread_rng(), signing_string.as_bytes())?;
Ok(STANDARD.encode(signature.as_ref()))
} }
} }

View File

@ -1,4 +1,5 @@
mod actor; mod actor;
mod healthz;
mod inbox; mod inbox;
mod index; mod index;
mod media; mod media;
@ -7,6 +8,7 @@ mod statics;
pub(crate) use self::{ pub(crate) use self::{
actor::route as actor, actor::route as actor,
healthz::route as healthz,
inbox::route as inbox, inbox::route as inbox,
index::route as index, index::route as index,
media::route as media, media::route as media,

7
src/routes/healthz.rs Normal file
View File

@ -0,0 +1,7 @@
use crate::{data::State, error::Error};
use actix_web::{web, HttpResponse};
pub(crate) async fn route(state: web::Data<State>) -> Result<HttpResponse, Error> {
state.db.check_health().await?;
Ok(HttpResponse::Ok().finish())
}

View File

@ -71,7 +71,7 @@ pub(crate) async fn route(
let mut buf = BufWriter::new(Vec::new()); let mut buf = BufWriter::new(Vec::new());
crate::templates::index(&mut buf, &local, &nodes, &config)?; crate::templates::index_html(&mut buf, &local, &nodes, &config)?;
let html = buf.into_inner().map_err(|e| { let html = buf.into_inner().map_err(|e| {
tracing::error!("Error rendering template, {}", e.error()); tracing::error!("Error rendering template, {}", e.error());
ErrorKind::FlushBuffer ErrorKind::FlushBuffer

View File

@ -11,7 +11,7 @@ pub(crate) async fn route(
let uuid = uuid.into_inner(); let uuid = uuid.into_inner();
if let Some(url) = media.get_url(uuid).await? { if let Some(url) = media.get_url(uuid).await? {
let res = requests.fetch_response(url).await?; let res = requests.fetch_response(&url).await?;
let mut response = HttpResponse::build(res.status()); let mut response = HttpResponse::build(res.status());

View File

@ -44,6 +44,8 @@ pub(crate) async fn route(
.map(|s| s.to_owned()) .map(|s| s.to_owned())
.collect(); .collect();
let open_registrations = !config.restricted_mode();
web::Json(NodeInfo { web::Json(NodeInfo {
version: NodeInfoVersion, version: NodeInfoVersion,
software: Software { software: Software {
@ -55,7 +57,7 @@ pub(crate) async fn route(
inbound: vec![], inbound: vec![],
outbound: vec![], outbound: vec![],
}, },
open_registrations: false, open_registrations,
usage: Usage { usage: Usage {
users: Users { users: Users {
total: 1, total: 1,

View File

@ -89,19 +89,19 @@ async fn answer(bot: Bot, msg: Message, cmd: Command, db: Db) -> ResponseResult<
.await?; .await?;
} }
Command::Block { domain } if db.add_blocks(vec![domain.clone()]).await.is_ok() => { Command::Block { domain } if db.add_blocks(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{} has been blocked", domain)) bot.send_message(msg.chat.id, format!("{domain} has been blocked"))
.await?; .await?;
} }
Command::Unblock { domain } if db.remove_blocks(vec![domain.clone()]).await.is_ok() => { Command::Unblock { domain } if db.remove_blocks(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{} has been unblocked", domain)) bot.send_message(msg.chat.id, format!("{domain} has been unblocked"))
.await?; .await?;
} }
Command::Allow { domain } if db.add_allows(vec![domain.clone()]).await.is_ok() => { Command::Allow { domain } if db.add_allows(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{} has been allowed", domain)) bot.send_message(msg.chat.id, format!("{domain} has been allowed"))
.await?; .await?;
} }
Command::Disallow { domain } if db.remove_allows(vec![domain.clone()]).await.is_ok() => { Command::Disallow { domain } if db.remove_allows(vec![domain.clone()]).await.is_ok() => {
bot.send_message(msg.chat.id, format!("{} has been disallowed", domain)) bot.send_message(msg.chat.id, format!("{domain} has been disallowed"))
.await?; .await?;
} }
Command::ListAllowed => { Command::ListAllowed => {

View File

@ -1,7 +1,7 @@
@use crate::{ @use crate::{
config::{Config, UrlKind}, config::{Config, UrlKind},
data::Node, data::Node,
templates::{info, instance, statics::index_css}, templates::{info_html, instance_html, statics::index_css},
}; };
@(local: &[Node], nodes: &[Node], config: &Config) @(local: &[Node], nodes: &[Node], config: &Config)
@ -9,7 +9,7 @@ templates::{info, instance, statics::index_css},
<!doctype html> <!doctype html>
<html> <html>
<head lang="en"> <head lang="fr">
<meta charset="utf-8" /> <meta charset="utf-8" />
<meta name="viewport" content="width=device-width, initial-scale=1" /> <meta name="viewport" content="width=device-width, initial-scale=1" />
<title>@config.hostname() | Relais ActivityPub</title> <title>@config.hostname() | Relais ActivityPub</title>
@ -39,13 +39,13 @@ templates::{info, instance, statics::index_css},
@for node in local { @for node in local {
@if let Some(inst) = node.instance.as_ref() { @if let Some(inst) = node.instance.as_ref() {
<li> <li>
@:instance(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(), @:instance_html(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(),
&node.base) &node.base)
</li> </li>
} else { } else {
@if let Some(inf) = node.info.as_ref() { @if let Some(inf) = node.info.as_ref() {
<li> <li>
@:info(inf, &node.base) @:info_html(inf, &node.base)
</li> </li>
} }
} }
@ -94,13 +94,13 @@ templates::{info, instance, statics::index_css},
@for node in nodes { @for node in nodes {
@if let Some(inst) = node.instance.as_ref() { @if let Some(inst) = node.instance.as_ref() {
<li> <li>
@:instance(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(), @:instance_html(inst, node.info.as_ref().map(|info| { info.software.as_ref() }), node.contact.as_ref(),
&node.base) &node.base)
</li> </li>
} else { } else {
@if let Some(inf) = node.info.as_ref() { @if let Some(inf) = node.info.as_ref() {
<li> <li>
@:info(inf, &node.base) @:info_html(inf, &node.base)
</li> </li>
} }
} }

View File

@ -1,4 +1,4 @@
@use crate::{db::{Contact, Instance}, templates::admin}; @use crate::{db::{Contact, Instance}, templates::admin_html};
@use activitystreams::iri_string::types::IriString; @use activitystreams::iri_string::types::IriString;
@(instance: &Instance, software: Option<&str>, contact: Option<&Contact>, base: &IriString) @(instance: &Instance, software: Option<&str>, contact: Option<&Contact>, base: &IriString)
@ -32,8 +32,8 @@
} }
</div> </div>
@if let Some(contact) = contact { @if let Some(contact) = contact {
<h5 class="instance-admin">Administré par:</h5> <h5 class="instance-admin">Administré par:</h5>
@:admin(contact, base) @:admin_html(contact, base)
} }
} }
</section> </section>