From b7e8d2e46590d2308ba7fbc331b1ffc8d001e345 Mon Sep 17 00:00:00 2001
From: asonix <asonix@asonix.dog>
Date: Fri, 10 Jul 2020 15:34:18 -0500
Subject: [PATCH] Join upstream service with payload fut

---
 src/main.rs               |  2 +-
 src/middleware/payload.rs | 34 +++++++++++++++++++++++-----------
 2 files changed, 24 insertions(+), 12 deletions(-)

diff --git a/src/main.rs b/src/main.rs
index 7220dd9..c34a29b 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -132,9 +132,9 @@ async fn main() -> Result<(), anyhow::Error> {
             .service(web::resource("/media/{path}").route(web::get().to(routes::media)))
             .service(
                 web::resource("/inbox")
-                    .wrap(DebugPayload(config.debug()))
                     .wrap(config.digest_middleware())
                     .wrap(config.signature_middleware(state.requests(), actors.clone()))
+                    .wrap(DebugPayload(config.debug()))
                     .route(web::post().to(inbox)),
             )
             .service(web::resource("/actor").route(web::get().to(actor)))
diff --git a/src/middleware/payload.rs b/src/middleware/payload.rs
index 3db41b4..002fa69 100644
--- a/src/middleware/payload.rs
+++ b/src/middleware/payload.rs
@@ -5,10 +5,10 @@ use actix_web::{
 };
 use bytes::BytesMut;
 use futures::{
-    future::{ok, LocalBoxFuture, Ready},
+    future::{ok, try_join, LocalBoxFuture, Ready},
     stream::StreamExt,
 };
-use log::info;
+use log::{error, info};
 use std::task::{Context, Poll};
 use tokio::sync::mpsc::channel;
 
@@ -75,24 +75,36 @@ where
 
             let fut = self.1.call(req);
 
-            return Box::pin(async move {
+            let payload_fut = async move {
                 let mut bytes = BytesMut::new();
 
                 while let Some(res) = pl.next().await {
-                    let b = res.map_err(|_| DebugError)?;
+                    let b = res.map_err(|e| {
+                        error!("Payload error, {}", e);
+                        DebugError
+                    })?;
                     bytes.extend(b);
                 }
 
                 info!("{}", String::from_utf8_lossy(bytes.as_ref()));
 
-                tx.send(Ok(bytes.freeze())).await.map_err(|_| DebugError)?;
+                tx.send(Ok(bytes.freeze())).await.map_err(|e| {
+                    error!("Error sending bytes, {}", e);
+                    DebugError
+                })?;
 
-                fut.await
-            });
+                Ok(()) as Result<(), actix_web::Error>
+            };
+
+            Box::pin(async move {
+                let (res, _) = try_join(fut, payload_fut).await?;
+
+                Ok(res)
+            })
+        } else {
+            let fut = self.1.call(req);
+
+            Box::pin(async move { fut.await })
         }
-
-        let fut = self.1.call(req);
-
-        Box::pin(async move { fut.await })
     }
 }