Make connections configurable
This commit is contained in:
		
							parent
							
								
									7510ab5b94
								
							
						
					
					
						commit
						dabeba43e2
					
				| @ -63,6 +63,7 @@ HTTPS=false | |||||||
| DATABASE_URL= | DATABASE_URL= | ||||||
| PRETTY_LOG=true | PRETTY_LOG=true | ||||||
| PUBLISH_BLOCKS=false | PUBLISH_BLOCKS=false | ||||||
|  | CONNECTIONS_PER_CORE=2 # how many postgres connections should be made for each core on the host | ||||||
| ``` | ``` | ||||||
| To run this server in production, you'll likely want to set most of them | To run this server in production, you'll likely want to set most of them | ||||||
| ```env | ```env | ||||||
| @ -76,6 +77,7 @@ HTTPS=true | |||||||
| DATABASE_URL=postgres://pg_user:pg_pass@pg_host:pg_port/pg_database | DATABASE_URL=postgres://pg_user:pg_pass@pg_host:pg_port/pg_database | ||||||
| PRETTY_LOG=false | PRETTY_LOG=false | ||||||
| PUBLISH_BLOCKS=true | PUBLISH_BLOCKS=true | ||||||
|  | CONNECTIONS_PER_CORE=4 | ||||||
| ``` | ``` | ||||||
| 
 | 
 | ||||||
| ### Contributing | ### Contributing | ||||||
|  | |||||||
| @ -17,6 +17,7 @@ pub struct Config { | |||||||
|     database_url: String, |     database_url: String, | ||||||
|     pretty_log: bool, |     pretty_log: bool, | ||||||
|     publish_blocks: bool, |     publish_blocks: bool, | ||||||
|  |     connections_per_core: u32, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub enum UrlKind { | pub enum UrlKind { | ||||||
| @ -44,6 +45,7 @@ impl Config { | |||||||
|             .set_default("https", false)? |             .set_default("https", false)? | ||||||
|             .set_default("pretty_log", true)? |             .set_default("pretty_log", true)? | ||||||
|             .set_default("publish_blocks", false)? |             .set_default("publish_blocks", false)? | ||||||
|  |             .set_default("connections_per_core", 2)? | ||||||
|             .merge(Environment::new())?; |             .merge(Environment::new())?; | ||||||
| 
 | 
 | ||||||
|         Ok(config.try_into()?) |         Ok(config.try_into()?) | ||||||
| @ -53,6 +55,10 @@ impl Config { | |||||||
|         self.pretty_log |         self.pretty_log | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     pub fn connections_per_core(&self) -> u32 { | ||||||
|  |         self.connections_per_core | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     pub fn validate_signatures(&self) -> bool { |     pub fn validate_signatures(&self) -> bool { | ||||||
|         self.validate_signatures |         self.validate_signatures | ||||||
|     } |     } | ||||||
|  | |||||||
| @ -22,11 +22,15 @@ pub struct Db { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl Db { | impl Db { | ||||||
|     pub async fn build(config: Config) -> Result<Self, MyError> { |     pub async fn build(config: &crate::config::Config) -> Result<Self, MyError> { | ||||||
|  |         let cpus: u32 = num_cpus::get().try_into()?; | ||||||
|  |         let max_conns = cpus * config.connections_per_core(); | ||||||
|  | 
 | ||||||
|  |         let config: Config = config.database_url().parse()?; | ||||||
|         let manager = PostgresConnectionManager::new(config, NoTls); |         let manager = PostgresConnectionManager::new(config, NoTls); | ||||||
| 
 | 
 | ||||||
|         let pool = bb8::Pool::builder() |         let pool = bb8::Pool::builder() | ||||||
|             .max_size((num_cpus::get() * 4).try_into()?) |             .max_size(max_conns) | ||||||
|             .build(manager) |             .build(manager) | ||||||
|             .await?; |             .await?; | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -3,7 +3,6 @@ use actix_web::{ | |||||||
|     middleware::Logger, |     middleware::Logger, | ||||||
|     web, App, HttpResponse, HttpServer, |     web, App, HttpResponse, HttpServer, | ||||||
| }; | }; | ||||||
| use bb8_postgres::tokio_postgres; |  | ||||||
| use log::error; | use log::error; | ||||||
| use std::{ | use std::{ | ||||||
|     io::BufWriter, |     io::BufWriter, | ||||||
| @ -33,7 +32,6 @@ use self::{ | |||||||
|     db::Db, |     db::Db, | ||||||
|     error::MyError, |     error::MyError, | ||||||
|     jobs::{create_server, create_workers}, |     jobs::{create_server, create_workers}, | ||||||
|     notify::notify_loop, |  | ||||||
|     state::State, |     state::State, | ||||||
|     templates::statics::StaticFile, |     templates::statics::StaticFile, | ||||||
|     webfinger::RelayResolver, |     webfinger::RelayResolver, | ||||||
| @ -90,8 +88,7 @@ async fn main() -> Result<(), anyhow::Error> { | |||||||
|         env_logger::init(); |         env_logger::init(); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     let pg_config: tokio_postgres::Config = config.database_url().parse()?; |     let db = Db::build(&config).await?; | ||||||
|     let db = Db::build(pg_config.clone()).await?; |  | ||||||
| 
 | 
 | ||||||
|     let args = Args::new(); |     let args = Args::new(); | ||||||
| 
 | 
 | ||||||
| @ -109,11 +106,10 @@ async fn main() -> Result<(), anyhow::Error> { | |||||||
|     let state = State::hydrate(config.clone(), &db).await?; |     let state = State::hydrate(config.clone(), &db).await?; | ||||||
| 
 | 
 | ||||||
|     rehydrate::spawn(db.clone(), state.clone()); |     rehydrate::spawn(db.clone(), state.clone()); | ||||||
|  |     notify::spawn(state.clone(), &config)?; | ||||||
| 
 | 
 | ||||||
|     let job_server = create_server(db.clone()); |     let job_server = create_server(db.clone()); | ||||||
| 
 | 
 | ||||||
|     let _ = notify_loop(state.clone(), pg_config.clone()); |  | ||||||
| 
 |  | ||||||
|     let bind_address = config.bind_address(); |     let bind_address = config.bind_address(); | ||||||
|     HttpServer::new(move || { |     HttpServer::new(move || { | ||||||
|         create_workers(state.clone(), job_server.clone()); |         create_workers(state.clone(), job_server.clone()); | ||||||
|  | |||||||
| @ -1,4 +1,4 @@ | |||||||
| use crate::{db::listen, state::State}; | use crate::{db::listen, error::MyError, state::State}; | ||||||
| use activitystreams::primitives::XsdAnyUri; | use activitystreams::primitives::XsdAnyUri; | ||||||
| use actix::clock::{delay_for, Duration}; | use actix::clock::{delay_for, Duration}; | ||||||
| use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Config, Notification}; | use bb8_postgres::tokio_postgres::{tls::NoTls, AsyncMessage, Config, Notification}; | ||||||
| @ -43,7 +43,9 @@ async fn handle_notification(state: &State, notif: Notification) { | |||||||
|     }; |     }; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub fn notify_loop(state: State, config: Config) { | pub fn spawn(state: State, config: &crate::config::Config) -> Result<(), MyError> { | ||||||
|  |     let config: Config = config.database_url().parse()?; | ||||||
|  | 
 | ||||||
|     actix::spawn(async move { |     actix::spawn(async move { | ||||||
|         let mut client; |         let mut client; | ||||||
| 
 | 
 | ||||||
| @ -93,4 +95,5 @@ pub fn notify_loop(state: State, config: Config) { | |||||||
|             warn!("Restarting listener task"); |             warn!("Restarting listener task"); | ||||||
|         } |         } | ||||||
|     }); |     }); | ||||||
|  |     Ok(()) | ||||||
| } | } | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user