Compare commits
No commits in common. "48080344ea18855ef0f71f8a46576bdaa914c950" and "9433f36cc55ed02555eaba6609b37cd98ecb4133" have entirely different histories.
48080344ea
...
9433f36cc5
1984
Cargo.lock
generated
1984
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
64
Cargo.toml
64
Cargo.toml
@ -1,11 +1,11 @@
|
|||||||
[package]
|
[package]
|
||||||
name = "ap-relay"
|
name = "ap-relay"
|
||||||
description = "A simple activitypub relay"
|
description = "A simple activitypub relay"
|
||||||
version = "0.3.108"
|
version = "0.3.104"
|
||||||
authors = ["asonix <asonix@asonix.dog>"]
|
authors = ["asonix <asonix@asonix.dog>"]
|
||||||
license = "AGPL-3.0"
|
license = "AGPL-3.0"
|
||||||
readme = "README.md"
|
readme = "README.md"
|
||||||
repository = "https://git.asonix.dog/asonix/relay"
|
repository = "https://git.asonix.dog/asonix/ap-relay"
|
||||||
keywords = ["activitypub", "relay"]
|
keywords = ["activitypub", "relay"]
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
build = "src/build.rs"
|
build = "src/build.rs"
|
||||||
@ -15,55 +15,51 @@ name = "relay"
|
|||||||
path = "src/main.rs"
|
path = "src/main.rs"
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
console = ["dep:console-subscriber"]
|
console = ["console-subscriber"]
|
||||||
default = []
|
default = []
|
||||||
|
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
actix-web = { version = "4.4.0", default-features = false, features = ["compress-brotli", "compress-gzip", "rustls-0_22"] }
|
anyhow = "1.0"
|
||||||
|
actix-rt = "2.7.0"
|
||||||
|
actix-web = { version = "4.4.0", default-features = false, features = ["compress-brotli", "compress-gzip", "rustls-0_21"] }
|
||||||
actix-webfinger = { version = "0.5.0", default-features = false }
|
actix-webfinger = { version = "0.5.0", default-features = false }
|
||||||
activitystreams = "0.7.0-alpha.25"
|
activitystreams = "0.7.0-alpha.25"
|
||||||
activitystreams-ext = "0.1.0-alpha.3"
|
activitystreams-ext = "0.1.0-alpha.3"
|
||||||
ammonia = "3.1.0"
|
ammonia = "3.1.0"
|
||||||
async-cpupool = "0.2.0"
|
|
||||||
bcrypt = "0.15"
|
bcrypt = "0.15"
|
||||||
base64 = "0.21"
|
base64 = "0.21"
|
||||||
clap = { version = "4.0.0", features = ["derive"] }
|
clap = { version = "4.0.0", features = ["derive"] }
|
||||||
color-eyre = "0.6.2"
|
config = "0.13.0"
|
||||||
config = { version = "0.14.0", default-features = false, features = ["toml", "json", "yaml"] }
|
console-subscriber = { version = "0.1", optional = true }
|
||||||
console-subscriber = { version = "0.2", optional = true }
|
|
||||||
dashmap = "5.1.0"
|
dashmap = "5.1.0"
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
flume = "0.11.0"
|
flume = "0.11.0"
|
||||||
lru = "0.12.0"
|
lru = "0.11.0"
|
||||||
metrics = "0.22.0"
|
metrics = "0.21.0"
|
||||||
metrics-exporter-prometheus = { version = "0.13.0", default-features = false, features = [
|
metrics-exporter-prometheus = { version = "0.12.0", default-features = false, features = [
|
||||||
"http-listener",
|
"http-listener",
|
||||||
] }
|
] }
|
||||||
metrics-util = "0.16.0"
|
metrics-util = "0.15.0"
|
||||||
mime = "0.3.16"
|
mime = "0.3.16"
|
||||||
minify-html = "0.15.0"
|
minify-html = "0.11.0"
|
||||||
opentelemetry = "0.21"
|
opentelemetry = { version = "0.20", features = ["rt-tokio"] }
|
||||||
opentelemetry_sdk = { version = "0.21", features = ["rt-tokio"] }
|
opentelemetry-otlp = "0.13"
|
||||||
opentelemetry-otlp = "0.14"
|
|
||||||
pin-project-lite = "0.2.9"
|
pin-project-lite = "0.2.9"
|
||||||
# pinned to metrics-util
|
quanta = "0.11.0"
|
||||||
quanta = "0.12.0"
|
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "stream"]}
|
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "stream"]}
|
||||||
reqwest-middleware = "0.2"
|
reqwest-middleware = "0.2"
|
||||||
reqwest-tracing = "0.4.5"
|
reqwest-tracing = "0.4.5"
|
||||||
ring = "0.17.5"
|
ring = "0.16.20"
|
||||||
rsa = { version = "0.9" }
|
rsa = { version = "0.9" }
|
||||||
rsa-magic-public-key = "0.8.0"
|
rsa-magic-public-key = "0.8.0"
|
||||||
rustls = "0.22.0"
|
rustls = "0.21.0"
|
||||||
rustls-channel-resolver = "0.2.0"
|
rustls-pemfile = "1.0.1"
|
||||||
rustls-pemfile = "2"
|
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
sled = "0.34.7"
|
sled = "0.34.7"
|
||||||
streem = "0.2.0"
|
|
||||||
teloxide = { version = "0.12.0", default-features = false, features = [
|
teloxide = { version = "0.12.0", default-features = false, features = [
|
||||||
"ctrlc_handler",
|
"ctrlc_handler",
|
||||||
"macros",
|
"macros",
|
||||||
@ -73,39 +69,41 @@ thiserror = "1.0"
|
|||||||
time = { version = "0.3.17", features = ["serde"] }
|
time = { version = "0.3.17", features = ["serde"] }
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-error = "0.2"
|
tracing-error = "0.2"
|
||||||
tracing-log = "0.2"
|
tracing-futures = "0.2"
|
||||||
tracing-opentelemetry = "0.22"
|
tracing-log = "0.1"
|
||||||
|
tracing-opentelemetry = "0.21"
|
||||||
tracing-subscriber = { version = "0.3", features = [
|
tracing-subscriber = { version = "0.3", features = [
|
||||||
"ansi",
|
"ansi",
|
||||||
"env-filter",
|
"env-filter",
|
||||||
"fmt",
|
"fmt",
|
||||||
] }
|
] }
|
||||||
tokio = { version = "1", features = ["full", "tracing"] }
|
tokio = { version = "1", features = ["macros", "sync"] }
|
||||||
uuid = { version = "1", features = ["v4", "serde"] }
|
uuid = { version = "1", features = ["v4", "serde"] }
|
||||||
|
streem = "0.1.0"
|
||||||
|
|
||||||
[dependencies.background-jobs]
|
[dependencies.background-jobs]
|
||||||
version = "0.18.0"
|
version = "0.15.0"
|
||||||
default-features = false
|
default-features = false
|
||||||
features = ["error-logging", "metrics", "tokio"]
|
features = ["background-jobs-actix", "error-logging"]
|
||||||
|
|
||||||
[dependencies.http-signature-normalization-actix]
|
[dependencies.http-signature-normalization-actix]
|
||||||
version = "0.11.0"
|
version = "0.10.1"
|
||||||
default-features = false
|
default-features = false
|
||||||
features = ["server", "ring"]
|
features = ["server", "ring"]
|
||||||
|
|
||||||
[dependencies.http-signature-normalization-reqwest]
|
[dependencies.http-signature-normalization-reqwest]
|
||||||
version = "0.11.0"
|
version = "0.10.0"
|
||||||
default-features = false
|
default-features = false
|
||||||
features = ["middleware", "ring"]
|
features = ["middleware", "ring"]
|
||||||
|
|
||||||
[dependencies.tracing-actix-web]
|
[dependencies.tracing-actix-web]
|
||||||
version = "0.7.9"
|
version = "0.7.6"
|
||||||
|
|
||||||
[build-dependencies]
|
[build-dependencies]
|
||||||
color-eyre = "0.6.2"
|
anyhow = "1.0"
|
||||||
dotenv = "0.15.0"
|
dotenv = "0.15.0"
|
||||||
ructe = { version = "0.17.0", features = ["sass", "mime03"] }
|
ructe = { version = "0.17.0", features = ["sass", "mime03"] }
|
||||||
toml = "0.8.0"
|
toml = "0.7.0"
|
||||||
|
|
||||||
[profile.dev.package.rsa]
|
[profile.dev.package.rsa]
|
||||||
opt-level = 3
|
opt-level = 3
|
||||||
|
12
flake.lock
12
flake.lock
@ -5,11 +5,11 @@
|
|||||||
"systems": "systems"
|
"systems": "systems"
|
||||||
},
|
},
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1701680307,
|
"lastModified": 1692799911,
|
||||||
"narHash": "sha256-kAuep2h5ajznlPMD9rnQyffWG8EM/C73lejGofXvdM8=",
|
"narHash": "sha256-3eihraek4qL744EvQXsK1Ha6C3CR7nnT8X2qWap4RNk=",
|
||||||
"owner": "numtide",
|
"owner": "numtide",
|
||||||
"repo": "flake-utils",
|
"repo": "flake-utils",
|
||||||
"rev": "4022d587cbbfd70fe950c1e2083a02621806a725",
|
"rev": "f9e7cf818399d17d347f847525c5a5a8032e4e44",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
@ -20,11 +20,11 @@
|
|||||||
},
|
},
|
||||||
"nixpkgs": {
|
"nixpkgs": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1705133751,
|
"lastModified": 1693003285,
|
||||||
"narHash": "sha256-rCIsyE80jgiOU78gCWN3A0wE0tR2GI5nH6MlS+HaaSQ=",
|
"narHash": "sha256-5nm4yrEHKupjn62MibENtfqlP6pWcRTuSKrMiH9bLkc=",
|
||||||
"owner": "NixOS",
|
"owner": "NixOS",
|
||||||
"repo": "nixpkgs",
|
"repo": "nixpkgs",
|
||||||
"rev": "9b19f5e77dd906cb52dade0b7bd280339d2a1f3d",
|
"rev": "5690c4271f2998c304a45c91a0aeb8fb69feaea7",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
|
@ -1,14 +1,17 @@
|
|||||||
{ lib
|
{ lib
|
||||||
, nixosTests
|
, nixosTests
|
||||||
|
, protobuf
|
||||||
, rustPlatform
|
, rustPlatform
|
||||||
}:
|
}:
|
||||||
|
|
||||||
rustPlatform.buildRustPackage {
|
rustPlatform.buildRustPackage {
|
||||||
pname = "relay";
|
pname = "relay";
|
||||||
version = "0.3.108";
|
version = "0.3.104";
|
||||||
src = ./.;
|
src = ./.;
|
||||||
cargoLock.lockFile = ./Cargo.lock;
|
cargoLock.lockFile = ./Cargo.lock;
|
||||||
|
|
||||||
|
PROTOC = "${protobuf}/bin/protoc";
|
||||||
|
PROTOC_INCLUDE = "${protobuf}/include";
|
||||||
RUSTFLAGS = "--cfg tokio_unstable";
|
RUSTFLAGS = "--cfg tokio_unstable";
|
||||||
|
|
||||||
nativeBuildInputs = [ ];
|
nativeBuildInputs = [ ];
|
||||||
|
@ -21,7 +21,7 @@ fn git_info() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn version_info() -> color_eyre::Result<()> {
|
fn version_info() -> Result<(), anyhow::Error> {
|
||||||
let cargo_toml = Path::new(&std::env::var("CARGO_MANIFEST_DIR")?).join("Cargo.toml");
|
let cargo_toml = Path::new(&std::env::var("CARGO_MANIFEST_DIR")?).join("Cargo.toml");
|
||||||
|
|
||||||
let mut file = File::open(cargo_toml)?;
|
let mut file = File::open(cargo_toml)?;
|
||||||
@ -42,7 +42,7 @@ fn version_info() -> color_eyre::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() -> color_eyre::Result<()> {
|
fn main() -> Result<(), anyhow::Error> {
|
||||||
dotenv::dotenv().ok();
|
dotenv::dotenv().ok();
|
||||||
|
|
||||||
git_info();
|
git_info();
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use metrics::{Key, Metadata, Recorder, SetRecorderError};
|
use metrics::{Key, Recorder, SetRecorderError};
|
||||||
use metrics_util::{
|
use metrics_util::{
|
||||||
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
|
registry::{AtomicStorage, GenerationalStorage, Recency, Registry},
|
||||||
MetricKindMask, Summary,
|
MetricKindMask, Summary,
|
||||||
@ -289,7 +289,7 @@ impl Inner {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let mut d = self.distributions.write().unwrap();
|
let mut d = self.distributions.write().unwrap();
|
||||||
let outer_entry = d.entry(name.clone()).or_default();
|
let outer_entry = d.entry(name.clone()).or_insert_with(BTreeMap::new);
|
||||||
|
|
||||||
let entry = outer_entry
|
let entry = outer_entry
|
||||||
.entry(labels)
|
.entry(labels)
|
||||||
@ -360,8 +360,8 @@ impl MemoryCollector {
|
|||||||
d.entry(key.as_str().to_owned()).or_insert(description);
|
d.entry(key.as_str().to_owned()).or_insert(description);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn install(&self) -> Result<(), SetRecorderError<Self>> {
|
pub(crate) fn install(&self) -> Result<(), SetRecorderError> {
|
||||||
metrics::set_global_recorder(self.clone())
|
metrics::set_boxed_recorder(Box::new(self.clone()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -393,19 +393,19 @@ impl Recorder for MemoryCollector {
|
|||||||
self.add_description_if_missing(&key, description)
|
self.add_description_if_missing(&key, description)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn register_counter(&self, key: &Key, _: &Metadata<'_>) -> metrics::Counter {
|
fn register_counter(&self, key: &Key) -> metrics::Counter {
|
||||||
self.inner
|
self.inner
|
||||||
.registry
|
.registry
|
||||||
.get_or_create_counter(key, |c| c.clone().into())
|
.get_or_create_counter(key, |c| c.clone().into())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn register_gauge(&self, key: &Key, _: &Metadata<'_>) -> metrics::Gauge {
|
fn register_gauge(&self, key: &Key) -> metrics::Gauge {
|
||||||
self.inner
|
self.inner
|
||||||
.registry
|
.registry
|
||||||
.get_or_create_gauge(key, |c| c.clone().into())
|
.get_or_create_gauge(key, |c| c.clone().into())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn register_histogram(&self, key: &Key, _: &Metadata<'_>) -> metrics::Histogram {
|
fn register_histogram(&self, key: &Key) -> metrics::Histogram {
|
||||||
self.inner
|
self.inner
|
||||||
.registry
|
.registry
|
||||||
.get_or_create_histogram(key, |c| c.clone().into())
|
.get_or_create_histogram(key, |c| c.clone().into())
|
||||||
|
@ -12,8 +12,9 @@ use activitystreams::{
|
|||||||
};
|
};
|
||||||
use config::Environment;
|
use config::Environment;
|
||||||
use http_signature_normalization_actix::{digest::ring::Sha256, prelude::VerifyDigest};
|
use http_signature_normalization_actix::{digest::ring::Sha256, prelude::VerifyDigest};
|
||||||
use rustls::sign::CertifiedKey;
|
use rustls::{Certificate, PrivateKey};
|
||||||
use std::{
|
use std::{
|
||||||
|
io::BufReader,
|
||||||
net::{IpAddr, SocketAddr},
|
net::{IpAddr, SocketAddr},
|
||||||
path::PathBuf,
|
path::PathBuf,
|
||||||
};
|
};
|
||||||
@ -311,34 +312,43 @@ impl Config {
|
|||||||
Some((config.addr, config.port).into())
|
Some((config.addr, config.port).into())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn open_keys(&self) -> Result<Option<CertifiedKey>, Error> {
|
pub(crate) fn open_keys(&self) -> Result<Option<(Vec<Certificate>, PrivateKey)>, Error> {
|
||||||
let tls = if let Some(tls) = &self.tls {
|
let tls = if let Some(tls) = &self.tls {
|
||||||
tls
|
tls
|
||||||
} else {
|
} else {
|
||||||
tracing::info!("No TLS config present");
|
tracing::warn!("No TLS config present");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|
||||||
let certs_bytes = tokio::fs::read(&tls.cert).await?;
|
let mut certs_reader = BufReader::new(std::fs::File::open(&tls.cert)?);
|
||||||
let certs =
|
let certs = rustls_pemfile::certs(&mut certs_reader)?;
|
||||||
rustls_pemfile::certs(&mut certs_bytes.as_slice()).collect::<Result<Vec<_>, _>>()?;
|
|
||||||
|
|
||||||
if certs.is_empty() {
|
if certs.is_empty() {
|
||||||
tracing::warn!("No certs read from certificate file");
|
tracing::warn!("No certs read from certificate file");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
let key_bytes = tokio::fs::read(&tls.key).await?;
|
let mut key_reader = BufReader::new(std::fs::File::open(&tls.key)?);
|
||||||
let key = if let Some(key) = rustls_pemfile::private_key(&mut key_bytes.as_slice())? {
|
let key = rustls_pemfile::read_one(&mut key_reader)?;
|
||||||
key
|
|
||||||
|
let certs = certs.into_iter().map(Certificate).collect();
|
||||||
|
|
||||||
|
let key = if let Some(key) = key {
|
||||||
|
match key {
|
||||||
|
rustls_pemfile::Item::RSAKey(der) => PrivateKey(der),
|
||||||
|
rustls_pemfile::Item::PKCS8Key(der) => PrivateKey(der),
|
||||||
|
rustls_pemfile::Item::ECKey(der) => PrivateKey(der),
|
||||||
|
_ => {
|
||||||
|
tracing::warn!("Unknown key format: {:?}", key);
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
tracing::warn!("Failed to read private key");
|
tracing::warn!("Failed to read private key");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
};
|
};
|
||||||
|
|
||||||
let key = rustls::crypto::ring::sign::any_supported_type(&key)?;
|
Ok(Some((certs, key)))
|
||||||
|
|
||||||
Ok(Some(CertifiedKey::new(certs, key)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn footer_blurb(&self) -> Option<crate::templates::Html<String>> {
|
pub(crate) fn footer_blurb(&self) -> Option<crate::templates::Html<String>> {
|
||||||
|
@ -750,11 +750,6 @@ mod tests {
|
|||||||
{
|
{
|
||||||
let db =
|
let db =
|
||||||
Db::build_inner(true, sled::Config::new().temporary(true).open().unwrap()).unwrap();
|
Db::build_inner(true, sled::Config::new().temporary(true).open().unwrap()).unwrap();
|
||||||
|
actix_rt::System::new().block_on((f)(db));
|
||||||
tokio::runtime::Builder::new_current_thread()
|
|
||||||
.enable_all()
|
|
||||||
.build()
|
|
||||||
.unwrap()
|
|
||||||
.block_on((f)(db));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
106
src/error.rs
106
src/error.rs
@ -1,85 +1,57 @@
|
|||||||
use activitystreams::checked::CheckError;
|
use activitystreams::checked::CheckError;
|
||||||
|
use actix_rt::task::JoinError;
|
||||||
use actix_web::{
|
use actix_web::{
|
||||||
error::{BlockingError, ResponseError},
|
error::{BlockingError, ResponseError},
|
||||||
http::StatusCode,
|
http::StatusCode,
|
||||||
HttpResponse,
|
HttpResponse,
|
||||||
};
|
};
|
||||||
use background_jobs::BoxError;
|
|
||||||
use color_eyre::eyre::Error as Report;
|
|
||||||
use http_signature_normalization_reqwest::SignError;
|
use http_signature_normalization_reqwest::SignError;
|
||||||
use std::{convert::Infallible, io, sync::Arc};
|
use std::{convert::Infallible, fmt::Debug, io};
|
||||||
use tokio::task::JoinError;
|
use tracing_error::SpanTrace;
|
||||||
|
|
||||||
#[derive(Clone)]
|
|
||||||
struct ArcKind {
|
|
||||||
kind: Arc<ErrorKind>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Debug for ArcKind {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
self.kind.fmt(f)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for ArcKind {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
self.kind.fmt(f)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::error::Error for ArcKind {
|
|
||||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
|
||||||
self.kind.source()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) struct Error {
|
pub(crate) struct Error {
|
||||||
kind: ArcKind,
|
context: String,
|
||||||
display: Box<str>,
|
kind: ErrorKind,
|
||||||
debug: Box<str>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Error {
|
impl Error {
|
||||||
fn kind(&self) -> &ErrorKind {
|
|
||||||
&self.kind.kind
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn is_breaker(&self) -> bool {
|
pub(crate) fn is_breaker(&self) -> bool {
|
||||||
matches!(self.kind(), ErrorKind::Breaker)
|
matches!(self.kind, ErrorKind::Breaker)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn is_not_found(&self) -> bool {
|
pub(crate) fn is_not_found(&self) -> bool {
|
||||||
matches!(self.kind(), ErrorKind::Status(_, StatusCode::NOT_FOUND))
|
matches!(self.kind, ErrorKind::Status(_, StatusCode::NOT_FOUND))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn is_bad_request(&self) -> bool {
|
pub(crate) fn is_bad_request(&self) -> bool {
|
||||||
matches!(self.kind(), ErrorKind::Status(_, StatusCode::BAD_REQUEST))
|
matches!(self.kind, ErrorKind::Status(_, StatusCode::BAD_REQUEST))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn is_gone(&self) -> bool {
|
pub(crate) fn is_gone(&self) -> bool {
|
||||||
matches!(self.kind(), ErrorKind::Status(_, StatusCode::GONE))
|
matches!(self.kind, ErrorKind::Status(_, StatusCode::GONE))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn is_malformed_json(&self) -> bool {
|
pub(crate) fn is_malformed_json(&self) -> bool {
|
||||||
matches!(self.kind(), ErrorKind::Json(_))
|
matches!(self.kind, ErrorKind::Json(_))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for Error {
|
impl std::fmt::Debug for Error {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
f.write_str(&self.debug)
|
writeln!(f, "{:?}", self.kind)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for Error {
|
impl std::fmt::Display for Error {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
f.write_str(&self.display)
|
writeln!(f, "{}", self.kind)?;
|
||||||
|
std::fmt::Display::fmt(&self.context, f)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::error::Error for Error {
|
impl std::error::Error for Error {
|
||||||
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
|
||||||
self.kind().source()
|
self.kind.source()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -88,36 +60,25 @@ where
|
|||||||
ErrorKind: From<T>,
|
ErrorKind: From<T>,
|
||||||
{
|
{
|
||||||
fn from(error: T) -> Self {
|
fn from(error: T) -> Self {
|
||||||
let kind = ArcKind {
|
|
||||||
kind: Arc::new(ErrorKind::from(error)),
|
|
||||||
};
|
|
||||||
let report = Report::new(kind.clone());
|
|
||||||
let display = format!("{report}");
|
|
||||||
let debug = format!("{report:?}");
|
|
||||||
|
|
||||||
Error {
|
Error {
|
||||||
kind,
|
context: SpanTrace::capture().to_string(),
|
||||||
display: Box::from(display),
|
kind: error.into(),
|
||||||
debug: Box::from(debug),
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub(crate) enum ErrorKind {
|
pub(crate) enum ErrorKind {
|
||||||
#[error("Error in extractor")]
|
#[error("Error queueing job, {0}")]
|
||||||
Extractor(#[from] crate::extractors::ErrorKind),
|
Queue(anyhow::Error),
|
||||||
|
|
||||||
#[error("Error queueing job")]
|
#[error("Error in configuration, {0}")]
|
||||||
Queue(#[from] BoxError),
|
|
||||||
|
|
||||||
#[error("Error in configuration")]
|
|
||||||
Config(#[from] config::ConfigError),
|
Config(#[from] config::ConfigError),
|
||||||
|
|
||||||
#[error("Couldn't parse key")]
|
#[error("Couldn't parse key, {0}")]
|
||||||
Pkcs8(#[from] rsa::pkcs8::Error),
|
Pkcs8(#[from] rsa::pkcs8::Error),
|
||||||
|
|
||||||
#[error("Couldn't encode public key")]
|
#[error("Couldn't encode public key, {0}")]
|
||||||
Spki(#[from] rsa::pkcs8::spki::Error),
|
Spki(#[from] rsa::pkcs8::spki::Error),
|
||||||
|
|
||||||
#[error("Couldn't sign request")]
|
#[error("Couldn't sign request")]
|
||||||
@ -126,36 +87,33 @@ pub(crate) enum ErrorKind {
|
|||||||
#[error("Couldn't make request")]
|
#[error("Couldn't make request")]
|
||||||
Reqwest(#[from] reqwest::Error),
|
Reqwest(#[from] reqwest::Error),
|
||||||
|
|
||||||
#[error("Couldn't make request")]
|
#[error("Couldn't build client")]
|
||||||
ReqwestMiddleware(#[from] reqwest_middleware::Error),
|
ReqwestMiddleware(#[from] reqwest_middleware::Error),
|
||||||
|
|
||||||
#[error("Couldn't parse IRI")]
|
#[error("Couldn't parse IRI, {0}")]
|
||||||
ParseIri(#[from] activitystreams::iri_string::validate::Error),
|
ParseIri(#[from] activitystreams::iri_string::validate::Error),
|
||||||
|
|
||||||
#[error("Couldn't normalize IRI")]
|
#[error("Couldn't normalize IRI, {0}")]
|
||||||
NormalizeIri(#[from] std::collections::TryReserveError),
|
NormalizeIri(#[from] std::collections::TryReserveError),
|
||||||
|
|
||||||
#[error("Couldn't perform IO")]
|
#[error("Couldn't perform IO, {0}")]
|
||||||
Io(#[from] io::Error),
|
Io(#[from] io::Error),
|
||||||
|
|
||||||
#[error("Couldn't sign string, {0}")]
|
#[error("Couldn't sign string, {0}")]
|
||||||
Rsa(rsa::errors::Error),
|
Rsa(rsa::errors::Error),
|
||||||
|
|
||||||
#[error("Couldn't use db")]
|
#[error("Couldn't use db, {0}")]
|
||||||
Sled(#[from] sled::Error),
|
Sled(#[from] sled::Error),
|
||||||
|
|
||||||
#[error("Couldn't do the json thing")]
|
#[error("Couldn't do the json thing, {0}")]
|
||||||
Json(#[from] serde_json::Error),
|
Json(#[from] serde_json::Error),
|
||||||
|
|
||||||
#[error("Couldn't sign request")]
|
#[error("Couldn't sign request, {0}")]
|
||||||
Sign(#[from] SignError),
|
Sign(#[from] SignError),
|
||||||
|
|
||||||
#[error("Couldn't sign digest")]
|
#[error("Couldn't sign digest")]
|
||||||
Signature(#[from] rsa::signature::Error),
|
Signature(#[from] rsa::signature::Error),
|
||||||
|
|
||||||
#[error("Couldn't prepare TLS private key")]
|
|
||||||
PrepareKey(#[from] rustls::Error),
|
|
||||||
|
|
||||||
#[error("Couldn't verify signature")]
|
#[error("Couldn't verify signature")]
|
||||||
VerifySignature,
|
VerifySignature,
|
||||||
|
|
||||||
@ -186,10 +144,10 @@ pub(crate) enum ErrorKind {
|
|||||||
#[error("Wrong ActivityPub kind, {0}")]
|
#[error("Wrong ActivityPub kind, {0}")]
|
||||||
Kind(String),
|
Kind(String),
|
||||||
|
|
||||||
#[error("Too many CPUs")]
|
#[error("Too many CPUs, {0}")]
|
||||||
CpuCount(#[from] std::num::TryFromIntError),
|
CpuCount(#[from] std::num::TryFromIntError),
|
||||||
|
|
||||||
#[error("Host mismatch")]
|
#[error("{0}")]
|
||||||
HostMismatch(#[from] CheckError),
|
HostMismatch(#[from] CheckError),
|
||||||
|
|
||||||
#[error("Couldn't flush buffer")]
|
#[error("Couldn't flush buffer")]
|
||||||
@ -243,7 +201,7 @@ pub(crate) enum ErrorKind {
|
|||||||
|
|
||||||
impl ResponseError for Error {
|
impl ResponseError for Error {
|
||||||
fn status_code(&self) -> StatusCode {
|
fn status_code(&self) -> StatusCode {
|
||||||
match self.kind() {
|
match self.kind {
|
||||||
ErrorKind::NotAllowed(_) | ErrorKind::WrongActor(_) | ErrorKind::BadActor(_, _) => {
|
ErrorKind::NotAllowed(_) | ErrorKind::WrongActor(_) | ErrorKind::BadActor(_, _) => {
|
||||||
StatusCode::FORBIDDEN
|
StatusCode::FORBIDDEN
|
||||||
}
|
}
|
||||||
@ -263,7 +221,7 @@ impl ResponseError for Error {
|
|||||||
.insert_header(("Content-Type", "application/activity+json"))
|
.insert_header(("Content-Type", "application/activity+json"))
|
||||||
.body(
|
.body(
|
||||||
serde_json::to_string(&serde_json::json!({
|
serde_json::to_string(&serde_json::json!({
|
||||||
"error": self.kind().to_string(),
|
"error": self.kind.to_string(),
|
||||||
}))
|
}))
|
||||||
.unwrap_or_else(|_| "{}".to_string()),
|
.unwrap_or_else(|_| "{}".to_string()),
|
||||||
)
|
)
|
||||||
|
@ -1,15 +1,19 @@
|
|||||||
use actix_web::{
|
use actix_web::{
|
||||||
dev::Payload,
|
dev::Payload,
|
||||||
error::ParseError,
|
error::ParseError,
|
||||||
http::header::{from_one_raw_str, Header, HeaderName, HeaderValue, TryIntoHeaderValue},
|
http::{
|
||||||
|
header::{from_one_raw_str, Header, HeaderName, HeaderValue, TryIntoHeaderValue},
|
||||||
|
StatusCode,
|
||||||
|
},
|
||||||
web::Data,
|
web::Data,
|
||||||
FromRequest, HttpMessage, HttpRequest,
|
FromRequest, HttpMessage, HttpRequest, HttpResponse, ResponseError,
|
||||||
};
|
};
|
||||||
use bcrypt::{BcryptError, DEFAULT_COST};
|
use bcrypt::{BcryptError, DEFAULT_COST};
|
||||||
use http_signature_normalization_actix::{prelude::InvalidHeaderValue, Canceled, Spawn};
|
use http_signature_normalization_actix::{prelude::InvalidHeaderValue, Canceled, Spawn};
|
||||||
use std::{convert::Infallible, str::FromStr, time::Instant};
|
use std::{convert::Infallible, str::FromStr, time::Instant};
|
||||||
|
use tracing_error::SpanTrace;
|
||||||
|
|
||||||
use crate::{db::Db, error::Error, future::LocalBoxFuture, spawner::Spawner};
|
use crate::{db::Db, future::LocalBoxFuture, spawner::Spawner};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub(crate) struct AdminConfig {
|
pub(crate) struct AdminConfig {
|
||||||
@ -24,7 +28,7 @@ impl AdminConfig {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn verify(&self, token: XApiToken) -> Result<bool, Error> {
|
fn verify(&self, token: XApiToken) -> Result<bool, Error> {
|
||||||
bcrypt::verify(token.0, &self.hashed_api_token).map_err(Error::bcrypt_verify)
|
bcrypt::verify(&token.0, &self.hashed_api_token).map_err(Error::bcrypt_verify)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -79,42 +83,74 @@ impl Admin {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
#[error("Failed authentication")]
|
||||||
|
pub(crate) struct Error {
|
||||||
|
context: String,
|
||||||
|
#[source]
|
||||||
|
kind: ErrorKind,
|
||||||
|
}
|
||||||
|
|
||||||
impl Error {
|
impl Error {
|
||||||
fn invalid() -> Self {
|
fn invalid() -> Self {
|
||||||
Error::from(ErrorKind::Invalid)
|
Error {
|
||||||
|
context: SpanTrace::capture().to_string(),
|
||||||
|
kind: ErrorKind::Invalid,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn missing_config() -> Self {
|
fn missing_config() -> Self {
|
||||||
Error::from(ErrorKind::MissingConfig)
|
Error {
|
||||||
|
context: SpanTrace::capture().to_string(),
|
||||||
|
kind: ErrorKind::MissingConfig,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn missing_db() -> Self {
|
fn missing_db() -> Self {
|
||||||
Error::from(ErrorKind::MissingDb)
|
Error {
|
||||||
|
context: SpanTrace::capture().to_string(),
|
||||||
|
kind: ErrorKind::MissingDb,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn missing_spawner() -> Self {
|
fn missing_spawner() -> Self {
|
||||||
Error::from(ErrorKind::MissingSpawner)
|
Error {
|
||||||
|
context: SpanTrace::capture().to_string(),
|
||||||
|
kind: ErrorKind::MissingSpawner,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn bcrypt_verify(e: BcryptError) -> Self {
|
fn bcrypt_verify(e: BcryptError) -> Self {
|
||||||
Error::from(ErrorKind::BCryptVerify(e))
|
Error {
|
||||||
|
context: SpanTrace::capture().to_string(),
|
||||||
|
kind: ErrorKind::BCryptVerify(e),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn bcrypt_hash(e: BcryptError) -> Self {
|
fn bcrypt_hash(e: BcryptError) -> Self {
|
||||||
Error::from(ErrorKind::BCryptHash(e))
|
Error {
|
||||||
|
context: SpanTrace::capture().to_string(),
|
||||||
|
kind: ErrorKind::BCryptHash(e),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parse_header(e: ParseError) -> Self {
|
fn parse_header(e: ParseError) -> Self {
|
||||||
Error::from(ErrorKind::ParseHeader(e))
|
Error {
|
||||||
|
context: SpanTrace::capture().to_string(),
|
||||||
|
kind: ErrorKind::ParseHeader(e),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn canceled(_: Canceled) -> Self {
|
fn canceled(_: Canceled) -> Self {
|
||||||
Error::from(ErrorKind::Canceled)
|
Error {
|
||||||
|
context: SpanTrace::capture().to_string(),
|
||||||
|
kind: ErrorKind::Canceled,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, thiserror::Error)]
|
#[derive(Debug, thiserror::Error)]
|
||||||
pub(crate) enum ErrorKind {
|
enum ErrorKind {
|
||||||
#[error("Invalid API Token")]
|
#[error("Invalid API Token")]
|
||||||
Invalid,
|
Invalid,
|
||||||
|
|
||||||
@ -140,6 +176,20 @@ pub(crate) enum ErrorKind {
|
|||||||
ParseHeader(#[source] ParseError),
|
ParseHeader(#[source] ParseError),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl ResponseError for Error {
|
||||||
|
fn status_code(&self) -> StatusCode {
|
||||||
|
match self.kind {
|
||||||
|
ErrorKind::Invalid | ErrorKind::ParseHeader(_) => StatusCode::BAD_REQUEST,
|
||||||
|
_ => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn error_response(&self) -> HttpResponse {
|
||||||
|
HttpResponse::build(self.status_code())
|
||||||
|
.json(serde_json::json!({ "msg": self.kind.to_string() }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl FromRequest for Admin {
|
impl FromRequest for Admin {
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;
|
type Future = LocalBoxFuture<'static, Result<Self, Self::Error>>;
|
||||||
@ -150,8 +200,10 @@ impl FromRequest for Admin {
|
|||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
let (db, c, s, t) = res?;
|
let (db, c, s, t) = res?;
|
||||||
Self::verify(c, s, t).await?;
|
Self::verify(c, s, t).await?;
|
||||||
metrics::histogram!("relay.admin.verify")
|
metrics::histogram!(
|
||||||
.record(now.elapsed().as_micros() as f64 / 1_000_000_f64);
|
"relay.admin.verify",
|
||||||
|
now.elapsed().as_micros() as f64 / 1_000_000_f64
|
||||||
|
);
|
||||||
Ok(Admin { db })
|
Ok(Admin { db })
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -1,4 +1,3 @@
|
|||||||
use std::{future::Future, pin::Pin};
|
use std::{future::Future, pin::Pin};
|
||||||
|
|
||||||
pub(crate) type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
|
pub(crate) type LocalBoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + 'a>>;
|
||||||
pub(crate) type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
|
|
||||||
|
37
src/jobs.rs
37
src/jobs.rs
@ -19,10 +19,8 @@ use crate::{
|
|||||||
jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline},
|
jobs::{process_listeners::Listeners, record_last_online::RecordLastOnline},
|
||||||
};
|
};
|
||||||
use background_jobs::{
|
use background_jobs::{
|
||||||
memory_storage::{Storage, TokioTimer},
|
memory_storage::{ActixTimer, Storage},
|
||||||
metrics::MetricsStorage,
|
Job, QueueHandle, WorkerConfig,
|
||||||
tokio::{QueueHandle, WorkerConfig},
|
|
||||||
Job,
|
|
||||||
};
|
};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
@ -45,21 +43,18 @@ pub(crate) fn create_workers(
|
|||||||
actors: ActorCache,
|
actors: ActorCache,
|
||||||
media: MediaCache,
|
media: MediaCache,
|
||||||
config: Config,
|
config: Config,
|
||||||
) -> std::io::Result<JobServer> {
|
) -> JobServer {
|
||||||
let deliver_concurrency = config.deliver_concurrency();
|
let deliver_concurrency = config.deliver_concurrency();
|
||||||
|
|
||||||
let queue_handle = WorkerConfig::new(
|
let queue_handle = WorkerConfig::new(Storage::new(ActixTimer), move |queue_handle| {
|
||||||
MetricsStorage::wrap(Storage::new(TokioTimer)),
|
JobState::new(
|
||||||
move |queue_handle| {
|
state.clone(),
|
||||||
JobState::new(
|
actors.clone(),
|
||||||
state.clone(),
|
JobServer::new(queue_handle),
|
||||||
actors.clone(),
|
media.clone(),
|
||||||
JobServer::new(queue_handle),
|
config.clone(),
|
||||||
media.clone(),
|
)
|
||||||
config.clone(),
|
})
|
||||||
)
|
|
||||||
},
|
|
||||||
)
|
|
||||||
.register::<Deliver>()
|
.register::<Deliver>()
|
||||||
.register::<DeliverMany>()
|
.register::<DeliverMany>()
|
||||||
.register::<QueryNodeinfo>()
|
.register::<QueryNodeinfo>()
|
||||||
@ -75,12 +70,12 @@ pub(crate) fn create_workers(
|
|||||||
.set_worker_count("maintenance", 2)
|
.set_worker_count("maintenance", 2)
|
||||||
.set_worker_count("apub", 2)
|
.set_worker_count("apub", 2)
|
||||||
.set_worker_count("deliver", deliver_concurrency)
|
.set_worker_count("deliver", deliver_concurrency)
|
||||||
.start()?;
|
.start();
|
||||||
|
|
||||||
queue_handle.every(Duration::from_secs(60 * 5), Listeners)?;
|
queue_handle.every(Duration::from_secs(60 * 5), Listeners);
|
||||||
queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline)?;
|
queue_handle.every(Duration::from_secs(60 * 10), RecordLastOnline);
|
||||||
|
|
||||||
Ok(JobServer::new(queue_handle))
|
JobServer::new(queue_handle)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
@ -2,14 +2,14 @@ use crate::{
|
|||||||
config::{Config, UrlKind},
|
config::{Config, UrlKind},
|
||||||
db::Actor,
|
db::Actor,
|
||||||
error::Error,
|
error::Error,
|
||||||
future::BoxFuture,
|
|
||||||
jobs::{
|
jobs::{
|
||||||
apub::{get_inboxes, prepare_activity},
|
apub::{get_inboxes, prepare_activity},
|
||||||
DeliverMany, JobState,
|
DeliverMany, JobState,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use activitystreams::{activity::Announce as AsAnnounce, iri_string::types::IriString};
|
use activitystreams::{activity::Announce as AsAnnounce, iri_string::types::IriString};
|
||||||
use background_jobs::Job;
|
use background_jobs::ActixJob;
|
||||||
|
use std::{future::Future, pin::Pin};
|
||||||
|
|
||||||
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct Announce {
|
pub(crate) struct Announce {
|
||||||
@ -62,15 +62,14 @@ fn generate_announce(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Job for Announce {
|
impl ActixJob for Announce {
|
||||||
type State = JobState;
|
type State = JobState;
|
||||||
type Error = Error;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
type Future = BoxFuture<'static, Result<(), Self::Error>>;
|
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::apub::Announce";
|
const NAME: &'static str = "relay::jobs::apub::Announce";
|
||||||
const QUEUE: &'static str = "apub";
|
const QUEUE: &'static str = "apub";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(self.perform(state))
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,7 +3,6 @@ use crate::{
|
|||||||
config::{Config, UrlKind},
|
config::{Config, UrlKind},
|
||||||
db::Actor,
|
db::Actor,
|
||||||
error::{Error, ErrorKind},
|
error::{Error, ErrorKind},
|
||||||
future::BoxFuture,
|
|
||||||
jobs::{apub::prepare_activity, Deliver, JobState, QueryInstance, QueryNodeinfo},
|
jobs::{apub::prepare_activity, Deliver, JobState, QueryInstance, QueryNodeinfo},
|
||||||
};
|
};
|
||||||
use activitystreams::{
|
use activitystreams::{
|
||||||
@ -11,7 +10,8 @@ use activitystreams::{
|
|||||||
iri_string::types::IriString,
|
iri_string::types::IriString,
|
||||||
prelude::*,
|
prelude::*,
|
||||||
};
|
};
|
||||||
use background_jobs::Job;
|
use background_jobs::ActixJob;
|
||||||
|
use std::{future::Future, pin::Pin};
|
||||||
|
|
||||||
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct Follow {
|
pub(crate) struct Follow {
|
||||||
@ -111,15 +111,14 @@ fn generate_accept_follow(
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Job for Follow {
|
impl ActixJob for Follow {
|
||||||
type State = JobState;
|
type State = JobState;
|
||||||
type Error = Error;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
type Future = BoxFuture<'static, Result<(), Self::Error>>;
|
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::apub::Follow";
|
const NAME: &'static str = "relay::jobs::apub::Follow";
|
||||||
const QUEUE: &'static str = "apub";
|
const QUEUE: &'static str = "apub";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(self.perform(state))
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,11 +2,11 @@ use crate::{
|
|||||||
apub::AcceptedActivities,
|
apub::AcceptedActivities,
|
||||||
db::Actor,
|
db::Actor,
|
||||||
error::{Error, ErrorKind},
|
error::{Error, ErrorKind},
|
||||||
future::BoxFuture,
|
|
||||||
jobs::{apub::get_inboxes, DeliverMany, JobState},
|
jobs::{apub::get_inboxes, DeliverMany, JobState},
|
||||||
};
|
};
|
||||||
use activitystreams::prelude::*;
|
use activitystreams::prelude::*;
|
||||||
use background_jobs::Job;
|
use background_jobs::ActixJob;
|
||||||
|
use std::{future::Future, pin::Pin};
|
||||||
|
|
||||||
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct Forward {
|
pub(crate) struct Forward {
|
||||||
@ -47,15 +47,14 @@ impl Forward {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Job for Forward {
|
impl ActixJob for Forward {
|
||||||
type State = JobState;
|
type State = JobState;
|
||||||
type Error = Error;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
type Future = BoxFuture<'static, Result<(), Self::Error>>;
|
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::apub::Forward";
|
const NAME: &'static str = "relay::jobs::apub::Forward";
|
||||||
const QUEUE: &'static str = "apub";
|
const QUEUE: &'static str = "apub";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(self.perform(state))
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,10 +2,10 @@ use crate::{
|
|||||||
config::UrlKind,
|
config::UrlKind,
|
||||||
db::Actor,
|
db::Actor,
|
||||||
error::Error,
|
error::Error,
|
||||||
future::BoxFuture,
|
|
||||||
jobs::{apub::generate_undo_follow, Deliver, JobState},
|
jobs::{apub::generate_undo_follow, Deliver, JobState},
|
||||||
};
|
};
|
||||||
use background_jobs::Job;
|
use background_jobs::ActixJob;
|
||||||
|
use std::{future::Future, pin::Pin};
|
||||||
|
|
||||||
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct Reject(pub(crate) Actor);
|
pub(crate) struct Reject(pub(crate) Actor);
|
||||||
@ -33,15 +33,14 @@ impl Reject {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Job for Reject {
|
impl ActixJob for Reject {
|
||||||
type State = JobState;
|
type State = JobState;
|
||||||
type Error = Error;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
type Future = BoxFuture<'static, Result<(), Self::Error>>;
|
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::apub::Reject";
|
const NAME: &'static str = "relay::jobs::apub::Reject";
|
||||||
const QUEUE: &'static str = "apub";
|
const QUEUE: &'static str = "apub";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(self.perform(state))
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3,11 +3,11 @@ use crate::{
|
|||||||
config::UrlKind,
|
config::UrlKind,
|
||||||
db::Actor,
|
db::Actor,
|
||||||
error::Error,
|
error::Error,
|
||||||
future::BoxFuture,
|
|
||||||
jobs::{apub::generate_undo_follow, Deliver, JobState},
|
jobs::{apub::generate_undo_follow, Deliver, JobState},
|
||||||
};
|
};
|
||||||
use activitystreams::prelude::BaseExt;
|
use activitystreams::prelude::BaseExt;
|
||||||
use background_jobs::Job;
|
use background_jobs::ActixJob;
|
||||||
|
use std::{future::Future, pin::Pin};
|
||||||
|
|
||||||
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct Undo {
|
pub(crate) struct Undo {
|
||||||
@ -48,15 +48,14 @@ impl Undo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Job for Undo {
|
impl ActixJob for Undo {
|
||||||
type State = JobState;
|
type State = JobState;
|
||||||
type Error = Error;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
type Future = BoxFuture<'static, Result<(), Self::Error>>;
|
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::apub::Undo";
|
const NAME: &'static str = "relay::jobs::apub::Undo";
|
||||||
const QUEUE: &'static str = "apub";
|
const QUEUE: &'static str = "apub";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(self.perform(state))
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,12 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
apub::AcceptedActors,
|
apub::AcceptedActors,
|
||||||
error::{Error, ErrorKind},
|
error::{Error, ErrorKind},
|
||||||
future::BoxFuture,
|
|
||||||
jobs::JobState,
|
jobs::JobState,
|
||||||
requests::BreakerStrategy,
|
requests::BreakerStrategy,
|
||||||
};
|
};
|
||||||
use activitystreams::{iri_string::types::IriString, object::Image, prelude::*};
|
use activitystreams::{iri_string::types::IriString, object::Image, prelude::*};
|
||||||
use background_jobs::Job;
|
use background_jobs::ActixJob;
|
||||||
|
use std::{future::Future, pin::Pin};
|
||||||
|
|
||||||
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct QueryContact {
|
pub(crate) struct QueryContact {
|
||||||
@ -85,16 +85,15 @@ fn to_contact(contact: AcceptedActors) -> Option<(String, String, IriString, Iri
|
|||||||
Some((username, display_name, url, avatar))
|
Some((username, display_name, url, avatar))
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Job for QueryContact {
|
impl ActixJob for QueryContact {
|
||||||
type State = JobState;
|
type State = JobState;
|
||||||
type Error = Error;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
type Future = BoxFuture<'static, Result<(), Self::Error>>;
|
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::QueryContact";
|
const NAME: &'static str = "relay::jobs::QueryContact";
|
||||||
const QUEUE: &'static str = "maintenance";
|
const QUEUE: &'static str = "maintenance";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(self.perform(state))
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,11 +1,11 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
error::Error,
|
error::Error,
|
||||||
future::BoxFuture,
|
|
||||||
jobs::{debug_object, JobState},
|
jobs::{debug_object, JobState},
|
||||||
requests::BreakerStrategy,
|
requests::BreakerStrategy,
|
||||||
};
|
};
|
||||||
use activitystreams::iri_string::types::IriString;
|
use activitystreams::iri_string::types::IriString;
|
||||||
use background_jobs::{Backoff, Job};
|
use background_jobs::{ActixJob, Backoff};
|
||||||
|
use std::{future::Future, pin::Pin};
|
||||||
|
|
||||||
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct Deliver {
|
pub(crate) struct Deliver {
|
||||||
@ -35,7 +35,7 @@ impl Deliver {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tracing::instrument(name = "Deliver", skip(state))]
|
#[tracing::instrument(name = "Deliver", skip(state))]
|
||||||
async fn perform(self, state: JobState) -> Result<(), Error> {
|
async fn permform(self, state: JobState) -> Result<(), Error> {
|
||||||
if let Err(e) = state
|
if let Err(e) = state
|
||||||
.state
|
.state
|
||||||
.requests
|
.requests
|
||||||
@ -56,16 +56,15 @@ impl Deliver {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Job for Deliver {
|
impl ActixJob for Deliver {
|
||||||
type State = JobState;
|
type State = JobState;
|
||||||
type Error = Error;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
type Future = BoxFuture<'static, Result<(), Self::Error>>;
|
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::Deliver";
|
const NAME: &'static str = "relay::jobs::Deliver";
|
||||||
const QUEUE: &'static str = "deliver";
|
const QUEUE: &'static str = "deliver";
|
||||||
const BACKOFF: Backoff = Backoff::Exponential(8);
|
const BACKOFF: Backoff = Backoff::Exponential(8);
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(self.perform(state))
|
Box::pin(async move { self.permform(state).await.map_err(Into::into) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,10 +1,10 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
error::Error,
|
error::Error,
|
||||||
future::BoxFuture,
|
future::LocalBoxFuture,
|
||||||
jobs::{debug_object, Deliver, JobState},
|
jobs::{debug_object, Deliver, JobState},
|
||||||
};
|
};
|
||||||
use activitystreams::iri_string::types::IriString;
|
use activitystreams::iri_string::types::IriString;
|
||||||
use background_jobs::Job;
|
use background_jobs::ActixJob;
|
||||||
|
|
||||||
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct DeliverMany {
|
pub(crate) struct DeliverMany {
|
||||||
@ -45,15 +45,14 @@ impl DeliverMany {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Job for DeliverMany {
|
impl ActixJob for DeliverMany {
|
||||||
type State = JobState;
|
type State = JobState;
|
||||||
type Error = Error;
|
type Future = LocalBoxFuture<'static, Result<(), anyhow::Error>>;
|
||||||
type Future = BoxFuture<'static, Result<(), Self::Error>>;
|
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::DeliverMany";
|
const NAME: &'static str = "relay::jobs::DeliverMany";
|
||||||
const QUEUE: &'static str = "deliver";
|
const QUEUE: &'static str = "deliver";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(self.perform(state))
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,12 +1,12 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
config::UrlKind,
|
config::UrlKind,
|
||||||
error::{Error, ErrorKind},
|
error::{Error, ErrorKind},
|
||||||
future::BoxFuture,
|
|
||||||
jobs::{Boolish, JobState},
|
jobs::{Boolish, JobState},
|
||||||
requests::BreakerStrategy,
|
requests::BreakerStrategy,
|
||||||
};
|
};
|
||||||
use activitystreams::{iri, iri_string::types::IriString};
|
use activitystreams::{iri, iri_string::types::IriString};
|
||||||
use background_jobs::Job;
|
use background_jobs::ActixJob;
|
||||||
|
use std::{future::Future, pin::Pin};
|
||||||
|
|
||||||
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct QueryInstance {
|
pub(crate) struct QueryInstance {
|
||||||
@ -165,16 +165,15 @@ impl QueryInstance {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Job for QueryInstance {
|
impl ActixJob for QueryInstance {
|
||||||
type State = JobState;
|
type State = JobState;
|
||||||
type Error = Error;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
type Future = BoxFuture<'static, Result<(), Self::Error>>;
|
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::QueryInstance";
|
const NAME: &'static str = "relay::jobs::QueryInstance";
|
||||||
const QUEUE: &'static str = "maintenance";
|
const QUEUE: &'static str = "maintenance";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(self.perform(state))
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,18 +1,18 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
error::{Error, ErrorKind},
|
error::{Error, ErrorKind},
|
||||||
future::BoxFuture,
|
|
||||||
jobs::{Boolish, JobState, QueryContact},
|
jobs::{Boolish, JobState, QueryContact},
|
||||||
requests::BreakerStrategy,
|
requests::BreakerStrategy,
|
||||||
};
|
};
|
||||||
use activitystreams::{iri, iri_string::types::IriString, primitives::OneOrMany};
|
use activitystreams::{iri, iri_string::types::IriString, primitives::OneOrMany};
|
||||||
use background_jobs::Job;
|
use background_jobs::ActixJob;
|
||||||
|
use std::{fmt::Debug, future::Future, pin::Pin};
|
||||||
|
|
||||||
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct QueryNodeinfo {
|
pub(crate) struct QueryNodeinfo {
|
||||||
actor_id: IriString,
|
actor_id: IriString,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for QueryNodeinfo {
|
impl Debug for QueryNodeinfo {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
f.debug_struct("QueryNodeinfo")
|
f.debug_struct("QueryNodeinfo")
|
||||||
.field("actor_id", &self.actor_id.to_string())
|
.field("actor_id", &self.actor_id.to_string())
|
||||||
@ -92,7 +92,7 @@ impl QueryNodeinfo {
|
|||||||
.metadata
|
.metadata
|
||||||
.and_then(|meta| meta.into_iter().next().and_then(|meta| meta.staff_accounts))
|
.and_then(|meta| meta.into_iter().next().and_then(|meta| meta.staff_accounts))
|
||||||
{
|
{
|
||||||
if let Some(contact_id) = accounts.first() {
|
if let Some(contact_id) = accounts.get(0) {
|
||||||
state
|
state
|
||||||
.job_server
|
.job_server
|
||||||
.queue(QueryContact::new(self.actor_id, contact_id.clone()))
|
.queue(QueryContact::new(self.actor_id, contact_id.clone()))
|
||||||
@ -104,16 +104,15 @@ impl QueryNodeinfo {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Job for QueryNodeinfo {
|
impl ActixJob for QueryNodeinfo {
|
||||||
type State = JobState;
|
type State = JobState;
|
||||||
type Error = Error;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
type Future = BoxFuture<'static, Result<(), Self::Error>>;
|
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::QueryNodeinfo";
|
const NAME: &'static str = "relay::jobs::QueryNodeinfo";
|
||||||
const QUEUE: &'static str = "maintenance";
|
const QUEUE: &'static str = "maintenance";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(self.perform(state))
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
use crate::{
|
use crate::{
|
||||||
error::Error,
|
error::Error,
|
||||||
future::BoxFuture,
|
|
||||||
jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState},
|
jobs::{instance::QueryInstance, nodeinfo::QueryNodeinfo, JobState},
|
||||||
};
|
};
|
||||||
use background_jobs::Job;
|
use background_jobs::ActixJob;
|
||||||
|
use std::{future::Future, pin::Pin};
|
||||||
|
|
||||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct Listeners;
|
pub(crate) struct Listeners;
|
||||||
@ -23,15 +23,14 @@ impl Listeners {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Job for Listeners {
|
impl ActixJob for Listeners {
|
||||||
type State = JobState;
|
type State = JobState;
|
||||||
type Error = Error;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
type Future = BoxFuture<'static, Result<(), Self::Error>>;
|
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::Listeners";
|
const NAME: &'static str = "relay::jobs::Listeners";
|
||||||
const QUEUE: &'static str = "maintenance";
|
const QUEUE: &'static str = "maintenance";
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(self.perform(state))
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
use crate::{error::Error, future::BoxFuture, jobs::JobState};
|
use crate::{error::Error, jobs::JobState};
|
||||||
use background_jobs::{Backoff, Job};
|
use background_jobs::{ActixJob, Backoff};
|
||||||
|
use std::{future::Future, pin::Pin};
|
||||||
|
|
||||||
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
#[derive(Clone, Debug, serde::Deserialize, serde::Serialize)]
|
||||||
pub(crate) struct RecordLastOnline;
|
pub(crate) struct RecordLastOnline;
|
||||||
@ -13,16 +14,15 @@ impl RecordLastOnline {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Job for RecordLastOnline {
|
impl ActixJob for RecordLastOnline {
|
||||||
type State = JobState;
|
type State = JobState;
|
||||||
type Error = Error;
|
type Future = Pin<Box<dyn Future<Output = Result<(), anyhow::Error>>>>;
|
||||||
type Future = BoxFuture<'static, Result<(), Self::Error>>;
|
|
||||||
|
|
||||||
const NAME: &'static str = "relay::jobs::RecordLastOnline";
|
const NAME: &'static str = "relay::jobs::RecordLastOnline";
|
||||||
const QUEUE: &'static str = "maintenance";
|
const QUEUE: &'static str = "maintenance";
|
||||||
const BACKOFF: Backoff = Backoff::Linear(1);
|
const BACKOFF: Backoff = Backoff::Linear(1);
|
||||||
|
|
||||||
fn run(self, state: Self::State) -> Self::Future {
|
fn run(self, state: Self::State) -> Self::Future {
|
||||||
Box::pin(self.perform(state))
|
Box::pin(async move { self.perform(state).await.map_err(Into::into) })
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
141
src/main.rs
141
src/main.rs
@ -4,6 +4,7 @@
|
|||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use activitystreams::iri_string::types::IriString;
|
use activitystreams::iri_string::types::IriString;
|
||||||
|
use actix_rt::task::JoinHandle;
|
||||||
use actix_web::{middleware::Compress, web, App, HttpServer};
|
use actix_web::{middleware::Compress, web, App, HttpServer};
|
||||||
use collector::MemoryCollector;
|
use collector::MemoryCollector;
|
||||||
#[cfg(feature = "console")]
|
#[cfg(feature = "console")]
|
||||||
@ -12,16 +13,14 @@ use error::Error;
|
|||||||
use http_signature_normalization_actix::middleware::VerifySignature;
|
use http_signature_normalization_actix::middleware::VerifySignature;
|
||||||
use metrics_exporter_prometheus::PrometheusBuilder;
|
use metrics_exporter_prometheus::PrometheusBuilder;
|
||||||
use metrics_util::layers::FanoutBuilder;
|
use metrics_util::layers::FanoutBuilder;
|
||||||
use opentelemetry::KeyValue;
|
use opentelemetry::{sdk::Resource, KeyValue};
|
||||||
use opentelemetry_otlp::WithExportConfig;
|
use opentelemetry_otlp::WithExportConfig;
|
||||||
use opentelemetry_sdk::Resource;
|
|
||||||
use reqwest_middleware::ClientWithMiddleware;
|
use reqwest_middleware::ClientWithMiddleware;
|
||||||
use rustls::ServerConfig;
|
use rustls::ServerConfig;
|
||||||
use tokio::task::JoinHandle;
|
|
||||||
use tracing_actix_web::TracingLogger;
|
use tracing_actix_web::TracingLogger;
|
||||||
use tracing_error::ErrorLayer;
|
use tracing_error::ErrorLayer;
|
||||||
use tracing_log::LogTracer;
|
use tracing_log::LogTracer;
|
||||||
use tracing_subscriber::{filter::Targets, layer::SubscriberExt, Layer};
|
use tracing_subscriber::{filter::Targets, fmt::format::FmtSpan, layer::SubscriberExt, Layer};
|
||||||
|
|
||||||
mod admin;
|
mod admin;
|
||||||
mod apub;
|
mod apub;
|
||||||
@ -56,15 +55,16 @@ use self::{
|
|||||||
fn init_subscriber(
|
fn init_subscriber(
|
||||||
software_name: &'static str,
|
software_name: &'static str,
|
||||||
opentelemetry_url: Option<&IriString>,
|
opentelemetry_url: Option<&IriString>,
|
||||||
) -> color_eyre::Result<()> {
|
) -> Result<(), anyhow::Error> {
|
||||||
LogTracer::init()?;
|
LogTracer::init()?;
|
||||||
color_eyre::install()?;
|
|
||||||
|
|
||||||
let targets: Targets = std::env::var("RUST_LOG")
|
let targets: Targets = std::env::var("RUST_LOG")
|
||||||
.unwrap_or_else(|_| "info".into())
|
.unwrap_or_else(|_| "warn,actix_web=debug,actix_server=debug,tracing_actix_web=info".into())
|
||||||
.parse()?;
|
.parse()?;
|
||||||
|
|
||||||
let format_layer = tracing_subscriber::fmt::layer().with_filter(targets.clone());
|
let format_layer = tracing_subscriber::fmt::layer()
|
||||||
|
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
|
||||||
|
.with_filter(targets.clone());
|
||||||
|
|
||||||
#[cfg(feature = "console")]
|
#[cfg(feature = "console")]
|
||||||
let console_layer = ConsoleLayer::builder()
|
let console_layer = ConsoleLayer::builder()
|
||||||
@ -81,19 +81,18 @@ fn init_subscriber(
|
|||||||
let subscriber = subscriber.with(console_layer);
|
let subscriber = subscriber.with(console_layer);
|
||||||
|
|
||||||
if let Some(url) = opentelemetry_url {
|
if let Some(url) = opentelemetry_url {
|
||||||
let tracer = opentelemetry_otlp::new_pipeline()
|
let tracer =
|
||||||
.tracing()
|
opentelemetry_otlp::new_pipeline()
|
||||||
.with_trace_config(
|
.tracing()
|
||||||
opentelemetry_sdk::trace::config().with_resource(Resource::new(vec![
|
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
|
||||||
KeyValue::new("service.name", software_name),
|
Resource::new(vec![KeyValue::new("service.name", software_name)]),
|
||||||
])),
|
))
|
||||||
)
|
.with_exporter(
|
||||||
.with_exporter(
|
opentelemetry_otlp::new_exporter()
|
||||||
opentelemetry_otlp::new_exporter()
|
.tonic()
|
||||||
.tonic()
|
.with_endpoint(url.as_str()),
|
||||||
.with_endpoint(url.as_str()),
|
)
|
||||||
)
|
.install_batch(opentelemetry::runtime::Tokio)?;
|
||||||
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
|
|
||||||
|
|
||||||
let otel_layer = tracing_opentelemetry::layer()
|
let otel_layer = tracing_opentelemetry::layer()
|
||||||
.with_tracer(tracer)
|
.with_tracer(tracer)
|
||||||
@ -140,8 +139,8 @@ fn build_client(
|
|||||||
Ok(client_with_middleware)
|
Ok(client_with_middleware)
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[actix_rt::main]
|
||||||
async fn main() -> color_eyre::Result<()> {
|
async fn main() -> Result<(), anyhow::Error> {
|
||||||
dotenv::dotenv().ok();
|
dotenv::dotenv().ok();
|
||||||
|
|
||||||
let config = Config::build()?;
|
let config = Config::build()?;
|
||||||
@ -151,8 +150,7 @@ async fn main() -> color_eyre::Result<()> {
|
|||||||
let args = Args::new();
|
let args = Args::new();
|
||||||
|
|
||||||
if args.any() {
|
if args.any() {
|
||||||
client_main(config, args).await??;
|
return client_main(config, args).await?;
|
||||||
return Ok(());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let collector = MemoryCollector::new();
|
let collector = MemoryCollector::new();
|
||||||
@ -162,35 +160,35 @@ async fn main() -> color_eyre::Result<()> {
|
|||||||
.with_http_listener(bind_addr)
|
.with_http_listener(bind_addr)
|
||||||
.build()?;
|
.build()?;
|
||||||
|
|
||||||
tokio::spawn(exporter);
|
actix_rt::spawn(exporter);
|
||||||
let recorder = FanoutBuilder::default()
|
let recorder = FanoutBuilder::default()
|
||||||
.add_recorder(recorder)
|
.add_recorder(recorder)
|
||||||
.add_recorder(collector.clone())
|
.add_recorder(collector.clone())
|
||||||
.build();
|
.build();
|
||||||
metrics::set_global_recorder(recorder).map_err(|e| color_eyre::eyre::eyre!("{e}"))?;
|
metrics::set_boxed_recorder(Box::new(recorder))?;
|
||||||
} else {
|
} else {
|
||||||
collector.install()?;
|
collector.install()?;
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!("Opening DB");
|
tracing::warn!("Opening DB");
|
||||||
let db = Db::build(&config)?;
|
let db = Db::build(&config)?;
|
||||||
|
|
||||||
tracing::info!("Building caches");
|
tracing::warn!("Building caches");
|
||||||
let actors = ActorCache::new(db.clone());
|
let actors = ActorCache::new(db.clone());
|
||||||
let media = MediaCache::new(db.clone());
|
let media = MediaCache::new(db.clone());
|
||||||
|
|
||||||
server_main(db, actors, media, collector, config).await?;
|
server_main(db, actors, media, collector, config).await??;
|
||||||
|
|
||||||
tracing::info!("Application exit");
|
tracing::warn!("Application exit");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn client_main(config: Config, args: Args) -> JoinHandle<color_eyre::Result<()>> {
|
fn client_main(config: Config, args: Args) -> JoinHandle<Result<(), anyhow::Error>> {
|
||||||
tokio::spawn(do_client_main(config, args))
|
actix_rt::spawn(do_client_main(config, args))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn do_client_main(config: Config, args: Args) -> color_eyre::Result<()> {
|
async fn do_client_main(config: Config, args: Args) -> Result<(), anyhow::Error> {
|
||||||
let client = build_client(
|
let client = build_client(
|
||||||
&config.user_agent(),
|
&config.user_agent(),
|
||||||
config.client_timeout(),
|
config.client_timeout(),
|
||||||
@ -273,22 +271,32 @@ async fn do_client_main(config: Config, args: Args) -> color_eyre::Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
const VERIFY_RATIO: usize = 7;
|
fn server_main(
|
||||||
|
|
||||||
async fn server_main(
|
|
||||||
db: Db,
|
db: Db,
|
||||||
actors: ActorCache,
|
actors: ActorCache,
|
||||||
media: MediaCache,
|
media: MediaCache,
|
||||||
collector: MemoryCollector,
|
collector: MemoryCollector,
|
||||||
config: Config,
|
config: Config,
|
||||||
) -> color_eyre::Result<()> {
|
) -> JoinHandle<Result<(), anyhow::Error>> {
|
||||||
|
actix_rt::spawn(do_server_main(db, actors, media, collector, config))
|
||||||
|
}
|
||||||
|
|
||||||
|
const VERIFY_RATIO: usize = 7;
|
||||||
|
|
||||||
|
async fn do_server_main(
|
||||||
|
db: Db,
|
||||||
|
actors: ActorCache,
|
||||||
|
media: MediaCache,
|
||||||
|
collector: MemoryCollector,
|
||||||
|
config: Config,
|
||||||
|
) -> Result<(), anyhow::Error> {
|
||||||
let client = build_client(
|
let client = build_client(
|
||||||
&config.user_agent(),
|
&config.user_agent(),
|
||||||
config.client_timeout(),
|
config.client_timeout(),
|
||||||
config.proxy_config(),
|
config.proxy_config(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
tracing::info!("Creating state");
|
tracing::warn!("Creating state");
|
||||||
|
|
||||||
let (signature_threads, verify_threads) = match config.signature_threads() {
|
let (signature_threads, verify_threads) = match config.signature_threads() {
|
||||||
0 | 1 => (1, 1),
|
0 | 1 => (1, 1),
|
||||||
@ -301,30 +309,23 @@ async fn server_main(
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let verify_spawner = Spawner::build("verify-cpu", verify_threads.try_into()?)?;
|
let verify_spawner = Spawner::build("verify-cpu", verify_threads)?;
|
||||||
let sign_spawner = Spawner::build("sign-cpu", signature_threads.try_into()?)?;
|
let sign_spawner = Spawner::build("sign-cpu", signature_threads)?;
|
||||||
|
|
||||||
let key_id = config.generate_url(UrlKind::MainKey).to_string();
|
let key_id = config.generate_url(UrlKind::MainKey).to_string();
|
||||||
let state = State::build(db.clone(), key_id, sign_spawner.clone(), client).await?;
|
let state = State::build(db.clone(), key_id, sign_spawner, client).await?;
|
||||||
|
|
||||||
if let Some((token, admin_handle)) = config.telegram_info() {
|
if let Some((token, admin_handle)) = config.telegram_info() {
|
||||||
tracing::info!("Creating telegram handler");
|
tracing::warn!("Creating telegram handler");
|
||||||
telegram::start(admin_handle.to_owned(), db.clone(), token);
|
telegram::start(admin_handle.to_owned(), db.clone(), token);
|
||||||
}
|
}
|
||||||
|
|
||||||
let cert_resolver = config
|
let keys = config.open_keys()?;
|
||||||
.open_keys()
|
|
||||||
.await?
|
|
||||||
.map(rustls_channel_resolver::channel::<32>);
|
|
||||||
|
|
||||||
let bind_address = config.bind_address();
|
let bind_address = config.bind_address();
|
||||||
let sign_spawner2 = sign_spawner.clone();
|
|
||||||
let verify_spawner2 = verify_spawner.clone();
|
|
||||||
let config2 = config.clone();
|
|
||||||
let server = HttpServer::new(move || {
|
let server = HttpServer::new(move || {
|
||||||
let job_server =
|
let job_server =
|
||||||
create_workers(state.clone(), actors.clone(), media.clone(), config.clone())
|
create_workers(state.clone(), actors.clone(), media.clone(), config.clone());
|
||||||
.expect("Failed to create job server");
|
|
||||||
|
|
||||||
let app = App::new()
|
let app = App::new()
|
||||||
.app_data(web::Data::new(db.clone()))
|
.app_data(web::Data::new(db.clone()))
|
||||||
@ -390,42 +391,24 @@ async fn server_main(
|
|||||||
)
|
)
|
||||||
});
|
});
|
||||||
|
|
||||||
if let Some((cert_tx, cert_rx)) = cert_resolver {
|
if let Some((certs, key)) = keys {
|
||||||
let handle = tokio::spawn(async move {
|
tracing::warn!("Binding to {}:{} with TLS", bind_address.0, bind_address.1);
|
||||||
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
|
||||||
interval.tick().await;
|
|
||||||
|
|
||||||
loop {
|
|
||||||
interval.tick().await;
|
|
||||||
|
|
||||||
match config2.open_keys().await {
|
|
||||||
Ok(Some(key)) => cert_tx.update(key),
|
|
||||||
Ok(None) => tracing::warn!("Missing TLS keys"),
|
|
||||||
Err(e) => tracing::error!("Failed to read TLS keys {e}"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
tracing::info!("Binding to {}:{} with TLS", bind_address.0, bind_address.1);
|
|
||||||
let server_config = ServerConfig::builder()
|
let server_config = ServerConfig::builder()
|
||||||
|
.with_safe_default_cipher_suites()
|
||||||
|
.with_safe_default_kx_groups()
|
||||||
|
.with_safe_default_protocol_versions()?
|
||||||
.with_no_client_auth()
|
.with_no_client_auth()
|
||||||
.with_cert_resolver(cert_rx);
|
.with_single_cert(certs, key)?;
|
||||||
server
|
server
|
||||||
.bind_rustls_0_22(bind_address, server_config)?
|
.bind_rustls_021(bind_address, server_config)?
|
||||||
.run()
|
.run()
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
handle.abort();
|
|
||||||
let _ = handle.await;
|
|
||||||
} else {
|
} else {
|
||||||
tracing::info!("Binding to {}:{}", bind_address.0, bind_address.1);
|
tracing::warn!("Binding to {}:{}", bind_address.0, bind_address.1);
|
||||||
server.bind(bind_address)?.run().await?;
|
server.bind(bind_address)?.run().await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
sign_spawner2.close().await;
|
tracing::warn!("Server closed");
|
||||||
verify_spawner2.close().await;
|
|
||||||
|
|
||||||
tracing::info!("Server closed");
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -40,7 +40,7 @@ impl Drop for LogOnDrop {
|
|||||||
fn drop(&mut self) {
|
fn drop(&mut self) {
|
||||||
if self.arm {
|
if self.arm {
|
||||||
let duration = self.begin.elapsed();
|
let duration = self.begin.elapsed();
|
||||||
metrics::histogram!("relay.request.complete", "path" => self.path.clone(), "method" => self.method.clone()).record(duration);
|
metrics::histogram!("relay.request.complete", duration, "path" => self.path.clone(), "method" => self.method.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -432,7 +432,7 @@ struct Signer {
|
|||||||
|
|
||||||
impl Signer {
|
impl Signer {
|
||||||
fn sign(&self, signing_string: &str) -> Result<String, Error> {
|
fn sign(&self, signing_string: &str) -> Result<String, Error> {
|
||||||
let mut signature = vec![0; self.private_key.public().modulus_len()];
|
let mut signature = vec![0; self.private_key.public_modulus_len()];
|
||||||
|
|
||||||
self.private_key
|
self.private_key
|
||||||
.sign(
|
.sign(
|
||||||
|
@ -14,11 +14,10 @@ const MINIFY_CONFIG: minify_html::Cfg = minify_html::Cfg {
|
|||||||
keep_html_and_head_opening_tags: false,
|
keep_html_and_head_opening_tags: false,
|
||||||
keep_spaces_between_attributes: true,
|
keep_spaces_between_attributes: true,
|
||||||
keep_comments: false,
|
keep_comments: false,
|
||||||
keep_input_type_text_attr: true,
|
|
||||||
keep_ssi_comments: false,
|
|
||||||
preserve_brace_template_syntax: false,
|
|
||||||
preserve_chevron_percent_template_syntax: false,
|
|
||||||
minify_css: true,
|
minify_css: true,
|
||||||
|
minify_css_level_1: true,
|
||||||
|
minify_css_level_2: false,
|
||||||
|
minify_css_level_3: false,
|
||||||
minify_js: true,
|
minify_js: true,
|
||||||
remove_bangs: true,
|
remove_bangs: true,
|
||||||
remove_processing_instructions: true,
|
remove_processing_instructions: true,
|
||||||
|
149
src/spawner.rs
149
src/spawner.rs
@ -1,30 +1,107 @@
|
|||||||
use async_cpupool::CpuPool;
|
|
||||||
use http_signature_normalization_actix::{Canceled, Spawn};
|
use http_signature_normalization_actix::{Canceled, Spawn};
|
||||||
use std::time::Duration;
|
use std::{
|
||||||
|
panic::AssertUnwindSafe,
|
||||||
|
sync::Arc,
|
||||||
|
thread::JoinHandle,
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Clone)]
|
fn spawner_thread(
|
||||||
|
receiver: flume::Receiver<Box<dyn FnOnce() + Send>>,
|
||||||
|
name: &'static str,
|
||||||
|
id: usize,
|
||||||
|
) {
|
||||||
|
let guard = MetricsGuard::guard(name, id);
|
||||||
|
|
||||||
|
while let Ok(f) = receiver.recv() {
|
||||||
|
let start = Instant::now();
|
||||||
|
metrics::increment_counter!(format!("relay.{name}.operation.start"), "id" => id.to_string());
|
||||||
|
let res = std::panic::catch_unwind(AssertUnwindSafe(f));
|
||||||
|
metrics::increment_counter!(format!("relay.{name}.operation.end"), "complete" => res.is_ok().to_string(), "id" => id.to_string());
|
||||||
|
metrics::histogram!(format!("relay.{name}.operation.duration"), start.elapsed().as_secs_f64(), "complete" => res.is_ok().to_string(), "id" => id.to_string());
|
||||||
|
|
||||||
|
if let Err(e) = res {
|
||||||
|
tracing::warn!("{name} fn panicked: {e:?}");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
guard.disarm();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
pub(crate) struct Spawner {
|
pub(crate) struct Spawner {
|
||||||
pool: CpuPool,
|
name: &'static str,
|
||||||
|
sender: Option<flume::Sender<Box<dyn FnOnce() + Send>>>,
|
||||||
|
threads: Option<Arc<Vec<JoinHandle<()>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct MetricsGuard {
|
||||||
|
name: &'static str,
|
||||||
|
id: usize,
|
||||||
|
start: Instant,
|
||||||
|
armed: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl MetricsGuard {
|
||||||
|
fn guard(name: &'static str, id: usize) -> Self {
|
||||||
|
metrics::increment_counter!(format!("relay.{name}.launched"), "id" => id.to_string());
|
||||||
|
|
||||||
|
Self {
|
||||||
|
name,
|
||||||
|
id,
|
||||||
|
start: Instant::now(),
|
||||||
|
armed: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn disarm(mut self) {
|
||||||
|
self.armed = false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Drop for MetricsGuard {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
metrics::increment_counter!(format!("relay.{}.closed", self.name), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
|
||||||
|
metrics::histogram!(format!("relay.{}.duration", self.name), self.start.elapsed().as_secs_f64(), "clean" => (!self.armed).to_string(), "id" => self.id.to_string());
|
||||||
|
tracing::warn!("Stopping {} - {}", self.name, self.id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Spawner {
|
impl Spawner {
|
||||||
pub(crate) fn build(name: &'static str, threads: u16) -> color_eyre::Result<Self> {
|
pub(crate) fn build(name: &'static str, threads: usize) -> std::io::Result<Self> {
|
||||||
let pool = CpuPool::configure()
|
let (sender, receiver) = flume::bounded(8);
|
||||||
.name(name)
|
|
||||||
.max_threads(threads)
|
|
||||||
.build()?;
|
|
||||||
|
|
||||||
Ok(Spawner { pool })
|
tracing::warn!("Launching {threads} {name}s");
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn close(self) {
|
let threads = (0..threads)
|
||||||
self.pool.close().await;
|
.map(|i| {
|
||||||
|
let receiver = receiver.clone();
|
||||||
|
std::thread::Builder::new()
|
||||||
|
.name(format!("{name}-{i}"))
|
||||||
|
.spawn(move || {
|
||||||
|
spawner_thread(receiver, name, i);
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<_>, _>>()?;
|
||||||
|
|
||||||
|
Ok(Spawner {
|
||||||
|
name,
|
||||||
|
sender: Some(sender),
|
||||||
|
threads: Some(Arc::new(threads)),
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Debug for Spawner {
|
impl Drop for Spawner {
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
fn drop(&mut self) {
|
||||||
f.debug_struct("Spawner").finish()
|
self.sender.take();
|
||||||
|
|
||||||
|
if let Some(threads) = self.threads.take().and_then(Arc::into_inner) {
|
||||||
|
tracing::warn!("Joining {}s", self.name);
|
||||||
|
for thread in threads {
|
||||||
|
let _ = thread.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -34,9 +111,9 @@ where
|
|||||||
{
|
{
|
||||||
let id = uuid::Uuid::new_v4();
|
let id = uuid::Uuid::new_v4();
|
||||||
|
|
||||||
metrics::counter!("relay.spawner.wait-timer.start").increment(1);
|
metrics::increment_counter!("relay.spawner.wait-timer.start");
|
||||||
|
|
||||||
let mut interval = tokio::time::interval(Duration::from_secs(5));
|
let mut interval = actix_rt::time::interval(Duration::from_secs(5));
|
||||||
|
|
||||||
// pass the first tick (instant)
|
// pass the first tick (instant)
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
@ -47,12 +124,12 @@ where
|
|||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
out = &mut fut => {
|
out = &mut fut => {
|
||||||
metrics::counter!("relay.spawner.wait-timer.end").increment(1);
|
metrics::increment_counter!("relay.spawner.wait-timer.end");
|
||||||
return out;
|
return out;
|
||||||
}
|
}
|
||||||
_ = interval.tick() => {
|
_ = interval.tick() => {
|
||||||
counter += 1;
|
counter += 1;
|
||||||
metrics::counter!("relay.spawner.wait-timer.pending").increment(1);
|
metrics::increment_counter!("relay.spawner.wait-timer.pending");
|
||||||
tracing::warn!("Blocking operation {id} is taking a long time, {} seconds", counter * 5);
|
tracing::warn!("Blocking operation {id} is taking a long time, {} seconds", counter * 5);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -67,9 +144,22 @@ impl Spawn for Spawner {
|
|||||||
Func: FnOnce() -> Out + Send + 'static,
|
Func: FnOnce() -> Out + Send + 'static,
|
||||||
Out: Send + 'static,
|
Out: Send + 'static,
|
||||||
{
|
{
|
||||||
let pool = self.pool.clone();
|
let sender = self.sender.as_ref().expect("Sender exists").clone();
|
||||||
|
|
||||||
Box::pin(async move { timer(pool.spawn(func)).await.map_err(|_| Canceled) })
|
Box::pin(async move {
|
||||||
|
let (tx, rx) = flume::bounded(1);
|
||||||
|
|
||||||
|
let _ = sender
|
||||||
|
.send_async(Box::new(move || {
|
||||||
|
if tx.try_send((func)()).is_err() {
|
||||||
|
tracing::warn!("Requestor hung up");
|
||||||
|
metrics::increment_counter!("relay.spawner.disconnected");
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
timer(rx.recv_async()).await.map_err(|_| Canceled)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -81,10 +171,21 @@ impl http_signature_normalization_reqwest::Spawn for Spawner {
|
|||||||
Func: FnOnce() -> Out + Send + 'static,
|
Func: FnOnce() -> Out + Send + 'static,
|
||||||
Out: Send + 'static,
|
Out: Send + 'static,
|
||||||
{
|
{
|
||||||
let pool = self.pool.clone();
|
let sender = self.sender.as_ref().expect("Sender exists").clone();
|
||||||
|
|
||||||
Box::pin(async move {
|
Box::pin(async move {
|
||||||
timer(pool.spawn(func))
|
let (tx, rx) = flume::bounded(1);
|
||||||
|
|
||||||
|
let _ = sender
|
||||||
|
.send_async(Box::new(move || {
|
||||||
|
if tx.try_send((func)()).is_err() {
|
||||||
|
tracing::warn!("Requestor hung up");
|
||||||
|
metrics::increment_counter!("relay.spawner.disconnected");
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.await;
|
||||||
|
|
||||||
|
timer(rx.recv_async())
|
||||||
.await
|
.await
|
||||||
.map_err(|_| http_signature_normalization_reqwest::Canceled)
|
.map_err(|_| http_signature_normalization_reqwest::Canceled)
|
||||||
})
|
})
|
||||||
|
@ -46,7 +46,7 @@ pub(crate) fn start(admin_handle: String, db: Db, token: &str) {
|
|||||||
let bot = Bot::new(token);
|
let bot = Bot::new(token);
|
||||||
let admin_handle = Arc::new(admin_handle);
|
let admin_handle = Arc::new(admin_handle);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
actix_rt::spawn(async move {
|
||||||
let command_handler = teloxide::filter_command::<Command, _>().endpoint(
|
let command_handler = teloxide::filter_command::<Command, _>().endpoint(
|
||||||
move |bot: Bot, msg: Message, cmd: Command| {
|
move |bot: Bot, msg: Message, cmd: Command| {
|
||||||
let admin_handle = admin_handle.clone();
|
let admin_handle = admin_handle.clone();
|
||||||
|
Loading…
Reference in New Issue
Block a user