From 65ce77898ac84d6953772b79f9f7ab3241ae10f3 Mon Sep 17 00:00:00 2001
From: asonix <asonix@asonix.dog>
Date: Fri, 20 Mar 2020 10:09:42 -0500
Subject: [PATCH] Handle notify dying, add env_logger option

---
 Cargo.lock    |  1 +
 Cargo.toml    |  1 +
 README.md     |  2 ++
 src/config.rs |  6 ++++++
 src/main.rs   |  6 +++++-
 src/notify.rs | 24 +++++++++++++++++++-----
 6 files changed, 34 insertions(+), 6 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 8eceb21..1e63911 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1759,6 +1759,7 @@ dependencies = [
  "bb8-postgres",
  "config",
  "dotenv",
+ "env_logger",
  "futures",
  "http-signature-normalization-actix",
  "log",
diff --git a/Cargo.toml b/Cargo.toml
index 1e90da8..6bd4503 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -22,6 +22,7 @@ base64 = "0.12"
 bb8-postgres = "0.4.0"
 config = "0.10.1"
 dotenv = "0.15.0"
+env_logger = "0.7.1"
 futures = "0.3.4"
 http-signature-normalization-actix = { version = "0.3.0-alpha.7", default-features = false, features = ["sha-2"] }
 log = "0.4"
diff --git a/README.md b/README.md
index 480a3c3..008940a 100644
--- a/README.md
+++ b/README.md
@@ -58,6 +58,7 @@ WHITELIST_MODE=false
 VALIDATE_SIGNATURES=false
 HTTPS=false
 DATABASE_URL=
+PRETTY_LOG=true
 ```
 To run this server in production, you'll likely want to set most of them
 ```env
@@ -69,6 +70,7 @@ WHITELIST_MODE=false
 VALIDATE_SIGNATURES=true
 HTTPS=true
 DATABASE_URL=postgres://pg_user:pg_pass@pg_host:pg_port/pg_database
+PRETTY_LOG=false
 ```
 
 ### Contributing
diff --git a/src/config.rs b/src/config.rs
index 69e2a15..f6677e0 100644
--- a/src/config.rs
+++ b/src/config.rs
@@ -15,6 +15,7 @@ pub struct Config {
     validate_signatures: bool,
     https: bool,
     database_url: String,
+    pretty_log: bool,
 }
 
 pub enum UrlKind {
@@ -39,11 +40,16 @@ impl Config {
             .set_default("whitelist_mode", false)?
             .set_default("validate_signatures", false)?
             .set_default("https", false)?
+            .set_default("pretty_log", true)?
             .merge(Environment::new())?;
 
         Ok(config.try_into()?)
     }
 
+    pub fn pretty_log(&self) -> bool {
+        self.pretty_log
+    }
+
     pub fn validate_signatures(&self) -> bool {
         self.validate_signatures
     }
diff --git a/src/main.rs b/src/main.rs
index ad1ac1f..6932d30 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -56,7 +56,11 @@ async fn main() -> Result<(), anyhow::Error> {
         std::env::set_var("RUST_LOG", "info")
     }
 
-    pretty_env_logger::init();
+    if config.pretty_log() {
+        pretty_env_logger::init();
+    } else {
+        env_logger::init();
+    }
 
     let pg_config: tokio_postgres::Config = config.database_url().parse()?;
     let db = Db::build(pg_config.clone()).await?;
diff --git a/src/notify.rs b/src/notify.rs
index c3c6086..4a42ff9 100644
--- a/src/notify.rs
+++ b/src/notify.rs
@@ -6,12 +6,15 @@ use futures::{
     future::ready,
     stream::{poll_fn, StreamExt},
 };
-use log::{debug, error, info};
+use log::{debug, error, info, warn};
 use tokio::sync::mpsc;
 
 #[derive(Message)]
 #[rtype(result = "()")]
-pub struct Notify(Notification);
+pub enum Notify {
+    Msg(Notification),
+    Done,
+}
 
 pub struct NotifyHandler {
     client: Option<Client>,
@@ -37,6 +40,7 @@ impl Actor for NotifyHandler {
     type Context = Context<Self>;
 
     fn started(&mut self, ctx: &mut Self::Context) {
+        info!("Starting notify handler");
         let config = self.config.clone();
 
         let fut = async move {
@@ -51,7 +55,7 @@ impl Actor for NotifyHandler {
             let mut stream = poll_fn(move |cx| conn.poll_message(cx)).filter_map(|m| match m {
                 Ok(AsyncMessage::Notification(n)) => {
                     debug!("Handling Notification, {:?}", n);
-                    ready(Some(Notify(n)))
+                    ready(Some(Notify::Msg(n)))
                 }
                 Ok(AsyncMessage::Notice(e)) => {
                     debug!("Handling Notice, {:?}", e);
@@ -77,7 +81,8 @@ impl Actor for NotifyHandler {
                         _ => (),
                     };
                 }
-                debug!("Stream handler ended");
+                warn!("Stream handler ended");
+                let _ = tx.send(Notify::Done).await;
             });
 
             Ok((client, rx))
@@ -116,7 +121,16 @@ impl Actor for NotifyHandler {
 }
 
 impl StreamHandler<Notify> for NotifyHandler {
-    fn handle(&mut self, Notify(notif): Notify, ctx: &mut Self::Context) {
+    fn handle(&mut self, notify: Notify, ctx: &mut Self::Context) {
+        let notif = match notify {
+            Notify::Msg(notif) => notif,
+            Notify::Done => {
+                warn!("Stopping notify handler");
+                ctx.stop();
+                return;
+            }
+        };
+
         let state = self.state.clone();
 
         let fut = async move {