diff --git a/Cargo.toml b/Cargo.toml index fc6811a..79fec1a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,10 +15,13 @@ categories = ["api-bindings"] [dependencies] chrono = { version = "0.4.38", features = ["serde"] } +futures-util = { version = "0.3.30", features = ["futures-sink"] } +reqwest = { version = "0.12.7", features = ["json"] } serde = { version = "1.0.204", features = ["derive"] } serde_json = "1.0.121" serde_repr = "0.1.19" thiserror = "1.0.63" +tokio = "1.39.3" +tokio-tungstenite = { version = "0.23.1", features = ["tokio-rustls", "rustls-tls-native-roots"] } tracing = "0.1.40" tungstenite = { version = "0.23.0", optional = true, features = ["rustls-tls-native-roots"] } -ureq = { version = "2.10.0", features = ["json"] } diff --git a/src/api.rs b/src/api.rs index e96f5f1..2973e5d 100644 --- a/src/api.rs +++ b/src/api.rs @@ -1,7 +1,5 @@ use std::{ - io, - ops::{Add, Mul, Sub}, - time::{Duration, Instant, SystemTime, UNIX_EPOCH}, + io, ops::{Add, Mul, Sub}, time::{Duration, Instant, SystemTime, UNIX_EPOCH} }; use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize}; @@ -9,11 +7,14 @@ use serde_repr::Deserialize_repr; use thiserror::Error; use tracing::{debug, info, instrument}; +pub use reqwest::{self, StatusCode}; + pub struct Context { auth_header: String, refresh_token: String, token_expiration: Instant, on_refresh: Option>, + client: reqwest::Client, } impl std::fmt::Debug for Context { @@ -316,7 +317,7 @@ pub enum ApiError { /// HTTP call failed (404, etc) #[error("ureq")] - Ureq(#[source] Box), + HTTP(#[source] Box), /// HTTP call succeeded but the returned JSON document didn't match the expected format #[error("unexpected data: {1} when processing {0}")] @@ -334,20 +335,20 @@ pub enum ApiError { InvalidID(String), } -impl From for ApiError { - fn from(value: ureq::Error) -> Self { - ApiError::Ureq(Box::new(value)) +impl From for ApiError { + fn from(value: reqwest::Error) -> Self { + ApiError::HTTP(Box::new(value)) } } trait JsonExplicitError { /// Explicitely report the received JSON object we failed to parse - fn into_json_with_error(self) -> Result; + async fn into_json_with_error(self) -> Result; } -impl JsonExplicitError for ureq::Response { - fn into_json_with_error(self) -> Result { - let resp: serde_json::Value = self.into_json()?; +impl JsonExplicitError for reqwest::Response { + async fn into_json_with_error(self) -> Result { + let resp: serde_json::Value = self.json().await?; let parsed = T::deserialize(&resp); parsed.map_err(|e| ApiError::UnexpectedData(resp, e)) } @@ -363,12 +364,13 @@ pub enum TokenParseError { } impl Context { - fn from_login_response(resp: LoginResponse) -> Self { + fn from_login_response(resp: LoginResponse, client: reqwest::Client) -> Self { Self { auth_header: format!("Bearer {}", &resp.access_token), refresh_token: resp.refresh_token, token_expiration: (Instant::now() + Duration::from_secs(resp.expires_in as u64)), on_refresh: None, + client, } } @@ -389,6 +391,7 @@ impl Context { refresh_token: refresh.to_owned(), token_expiration, on_refresh: None, + client: reqwest::Client::new(), }) } @@ -410,7 +413,7 @@ impl Context { } /// Retrieve access tokens online, by logging in with the provided credentials - pub fn from_login(user: &str, password: &str) -> Result { + pub async fn from_login(user: &str, password: &str) -> Result { #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct Params<'t> { @@ -418,23 +421,26 @@ impl Context { password: &'t str, } + let client = reqwest::Client::new(); + info!("Logging into API"); let url: String = format!("{}accounts/login", API_BASE); - let resp: LoginResponse = ureq::post(&url) - .send_json(Params { + let resp: LoginResponse = client.post(&url) + .json(&Params { user_name: user, password, - })? - .into_json_with_error()?; + }) + .send().await? + .into_json_with_error().await?; - Ok(Self::from_login_response(resp)) + Ok(Self::from_login_response(resp, client)) } /// Check if the token has reached its expiration date - fn check_expired(&mut self) -> Result<(), ApiError> { + async fn check_expired(&mut self) -> Result<(), ApiError> { if self.token_expiration < Instant::now() { debug!("Token has expired"); - self.refresh_token()?; + self.refresh_token().await?; } Ok(()) } @@ -444,7 +450,7 @@ impl Context { } /// Use the refresh token to refresh credentials - pub fn refresh_token(&mut self) -> Result<(), ApiError> { + pub async fn refresh_token(&mut self) -> Result<(), ApiError> { #[derive(Serialize)] #[serde(rename_all = "camelCase")] struct Params<'t> { @@ -456,51 +462,52 @@ impl Context { refresh_token: &self.refresh_token, }; let url = format!("{}accounts/refresh_token", API_BASE); - let resp: LoginResponse = ureq::post(&url) - .set("Content-type", "application/json") - .send_json(params)? - .into_json_with_error()?; + let resp: LoginResponse = self.client.post(&url) + .header("Content-type", "application/json") + .json(¶ms) + .send().await? + .into_json_with_error().await?; - *self = Self::from_login_response(resp); + *self = Self::from_login_response(resp, self.client.clone()); Ok(()) } /// List all sites available to the user - pub fn sites(&mut self) -> Result, ApiError> { - self.get("sites") + pub async fn sites(&mut self) -> Result, ApiError> { + self.get("sites").await } - pub fn site(&mut self, id: i32) -> Result { - self.get(&format!("sites/{id}")) + pub async fn site(&mut self, id: i32) -> Result { + self.get(&format!("sites/{id}")).await } /// List all chargers available to the user - pub fn chargers(&mut self) -> Result, ApiError> { - self.get("chargers") + pub async fn chargers(&mut self) -> Result, ApiError> { + self.get("chargers").await } - pub fn charger(&mut self, id: &str) -> Result { + pub async fn charger(&mut self, id: &str) -> Result { if !id.chars().all(char::is_alphanumeric) { return Err(ApiError::InvalidID(id.to_owned())); } - self.get(&format!("chargers/{}", id)) + self.get(&format!("chargers/{}", id)).await } - pub fn circuit(&mut self, site_id: u32, circuit_id: u32) -> Result { - self.get(&format!("site/{site_id}/circuit/{circuit_id}")) + pub async fn circuit(&mut self, site_id: u32, circuit_id: u32) -> Result { + self.get(&format!("site/{site_id}/circuit/{circuit_id}")).await } - pub fn circuit_dynamic_current( + pub async fn circuit_dynamic_current( &mut self, site_id: u32, circuit_id: u32, ) -> Result { self.get(&format!( "sites/{site_id}/circuits/{circuit_id}/dynamicCurrent" - )) + )).await } - pub fn set_circuit_dynamic_current( + pub async fn set_circuit_dynamic_current( &mut self, site_id: u32, circuit_id: u32, @@ -509,65 +516,67 @@ impl Context { self.post( &format!("sites/{site_id}/circuits/{circuit_id}/dynamicCurrent"), ¤t, - ) + ).await } #[instrument] - fn get(&mut self, path: &str) -> Result { - self.check_expired()?; + async fn get(&mut self, path: &str) -> Result { + self.check_expired().await?; let url: String = format!("{}{}", API_BASE, path); - let req = ureq::get(&url) - .set("Accept", "application/json") - .set("Authorization", &self.auth_header); - let mut resp = req.clone().call()?; + let req = self.client.get(url) + .header("Accept", "application/json") + .header("Authorization", &self.auth_header) + .build()?; + + let mut resp = self.client.execute(req.try_clone().unwrap()).await?; if resp.status() == 401 { - self.refresh_token()?; - resp = req.call()? + self.refresh_token().await?; + resp = self.client.execute(req).await? } - resp.into_json_with_error() + resp.into_json_with_error().await } - fn maybe_get(&mut self, path: &str) -> Result, ApiError> { - match self.get(path) { + async fn maybe_get(&mut self, path: &str) -> Result, ApiError> { + match self.get(path).await { Ok(r) => Ok(Some(r)), - Err(ApiError::Ureq(e)) => match &*e { - ureq::Error::Status(404, _) => Ok(None), - _ => Err(ApiError::Ureq(e)), - }, + Err(ApiError::HTTP(e)) if e.status() == Some(StatusCode::NOT_FOUND)=> Ok(None), Err(other) => Err(other), } } - pub(crate) fn post( + pub(crate) async fn post( &mut self, path: &str, params: &P, ) -> Result { let url: String = format!("{}{}", API_BASE, path); - self.post_raw(&url, params) + self.post_raw(&url, params).await } - pub(crate) fn post_raw( + pub(crate) async fn post_raw( &mut self, url: &str, params: &P, ) -> Result { - self.check_expired()?; - let req = ureq::post(url) - .set("Accept", "application/json") - .set("Authorization", &self.auth_header); + self.check_expired().await?; + let req = self.client.post(url) + .header("Accept", "application/json") + .header("Authorization", &self.auth_header) + .json(params); - let mut resp = req.clone().send_json(params)?; + let mut resp = req + .try_clone().unwrap() + .send().await?; if resp.status() == 401 { - self.refresh_token()?; - resp = req.send_json(params)? + self.refresh_token().await?; + resp = req.send().await? } - resp.into_json_with_error() + resp.into_json_with_error().await } } @@ -584,12 +593,12 @@ pub struct MeterReading { impl Site { /// Read all energy meters from the given site - pub fn lifetime_energy(&self, ctx: &mut Context) -> Result, ApiError> { - ctx.get(&format!("sites/{}/energy", self.id)) + pub async fn lifetime_energy(&self, ctx: &mut Context) -> Result, ApiError> { + ctx.get(&format!("sites/{}/energy", self.id)).await } - pub fn details(&self, ctx: &mut Context) -> Result { - ctx.get(&format!("sites/{}", self.id)) + pub async fn details(&self, ctx: &mut Context) -> Result { + ctx.get(&format!("sites/{}", self.id)).await } } @@ -598,63 +607,63 @@ impl Circuit { format!("sites/{}/circuits/{}/dynamicCurrent", self.site_id, self.id) } - pub fn dynamic_current(&self, ctx: &mut Context) -> Result { - ctx.circuit_dynamic_current(self.site_id, self.id) + pub async fn dynamic_current(&self, ctx: &mut Context) -> Result { + ctx.circuit_dynamic_current(self.site_id, self.id).await } - pub fn set_dynamic_current( + pub async fn set_dynamic_current( &self, ctx: &mut Context, current: SetCurrent, ) -> Result<(), ApiError> { - ctx.post(&self.dynamic_current_path(), ¤t) + ctx.post(&self.dynamic_current_path(), ¤t).await } } impl Charger { /// Enable "smart charging" on the charger. This just turns the LED blue, and disables basic charging plans. - pub fn enable_smart_charging(&self, ctx: &mut Context) -> Result<(), ApiError> { + pub async fn enable_smart_charging(&self, ctx: &mut Context) -> Result<(), ApiError> { let url = format!("chargers/{}/commands/smart_charging", &self.id); - ctx.post(&url, &()) + ctx.post(&url, &()).await } /// Read the state of a charger - pub fn state(&self, ctx: &mut Context) -> Result { + pub async fn state(&self, ctx: &mut Context) -> Result { let url = format!("chargers/{}/state", self.id); - ctx.get(&url) + ctx.get(&url).await } /// Read info about the ongoing charging session - pub fn ongoing_session(&self, ctx: &mut Context) -> Result, ApiError> { - ctx.maybe_get(&format!("chargers/{}/sessions/ongoing", &self.id)) + pub async fn ongoing_session(&self, ctx: &mut Context) -> Result, ApiError> { + ctx.maybe_get(&format!("chargers/{}/sessions/ongoing", &self.id)).await } /// Read info about the last charging session (not including ongoing one) - pub fn latest_session(&self, ctx: &mut Context) -> Result, ApiError> { - ctx.maybe_get(&format!("chargers/{}/sessions/latest", &self.id)) + pub async fn latest_session(&self, ctx: &mut Context) -> Result, ApiError> { + ctx.maybe_get(&format!("chargers/{}/sessions/latest", &self.id)).await } - fn command(&self, ctx: &mut Context, command: &str) -> Result { - ctx.post(&format!("chargers/{}/commands/{}", self.id, command), &()) + async fn command(&self, ctx: &mut Context, command: &str) -> Result { + ctx.post(&format!("chargers/{}/commands/{}", self.id, command), &()).await } - pub fn start(&self, ctx: &mut Context) -> Result<(), ApiError> { - self.command(ctx, "start_charging")?; + pub async fn start(&self, ctx: &mut Context) -> Result<(), ApiError> { + self.command(ctx, "start_charging").await?; Ok(()) } - pub fn pause(&self, ctx: &mut Context) -> Result<(), ApiError> { - self.command(ctx, "pause_charging")?; + pub async fn pause(&self, ctx: &mut Context) -> Result<(), ApiError> { + self.command(ctx, "pause_charging").await?; Ok(()) } - pub fn resume(&self, ctx: &mut Context) -> Result<(), ApiError> { - self.command(ctx, "resume_charging")?; + pub async fn resume(&self, ctx: &mut Context) -> Result<(), ApiError> { + self.command(ctx, "resume_charging").await?; Ok(()) } - pub fn stop(&self, ctx: &mut Context) -> Result<(), ApiError> { - self.command(ctx, "stop_charging")?; + pub async fn stop(&self, ctx: &mut Context) -> Result<(), ApiError> { + self.command(ctx, "stop_charging").await?; Ok(()) } } @@ -666,11 +675,15 @@ mod test { use super::Context; #[test] fn token_save() { + + let client = reqwest::Client::new(); + let ctx = Context { auth_header: "Bearer aaaaaaa0".to_owned(), refresh_token: "abcdef".to_owned(), token_expiration: Instant::now() + Duration::from_secs(1234), on_refresh: None, + client: client.clone(), }; let saved = ctx.save(); diff --git a/src/observation.rs b/src/observation.rs index 670de0c..de9d686 100644 --- a/src/observation.rs +++ b/src/observation.rs @@ -1,9 +1,9 @@ use serde::{de::{DeserializeOwned, IntoDeserializer}, Deserialize}; +use serde_json::json; use serde_repr::Deserialize_repr; use std::num::{ParseFloatError, ParseIntError}; use thiserror::Error; use tracing::info; -use ureq::json; use crate::{ api::{ChargerOpMode, Context, OutputPhase, UtcDateTime}, @@ -325,17 +325,17 @@ struct ProductUpdate { } impl Stream { - pub fn from_context(ctx: &mut Context) -> Result { + pub async fn from_context(ctx: &mut Context) -> Result { Ok(Self { - inner: signalr::Stream::from_ws(crate::stream::Stream::open(ctx)?), + inner: signalr::Stream::from_ws(crate::stream::Stream::open(ctx).await?), }) } - pub fn recv(&mut self) -> Result { + pub async fn recv(&mut self) -> Result { use signalr::Message::*; let de = |msg| -> Result { Err(ObservationError::Protocol(msg)) }; loop { - let msg = self.inner.recv()?; + let msg = self.inner.recv().await?; match &msg { Ping => continue, Empty | InvocationResult { .. } => info!("Skipped message: {msg:?}"), @@ -351,9 +351,9 @@ impl Stream { } } } - pub fn subscribe(&mut self, id: &str) -> Result<(), tungstenite::Error> { + pub async fn subscribe(&mut self, id: &str) -> Result<(), tungstenite::Error> { self.inner - .invoke("SubscribeWithCurrentState", json!([id, true])) + .invoke("SubscribeWithCurrentState", json!([id, true])).await } } diff --git a/src/signalr.rs b/src/signalr.rs index ab24548..eea700c 100644 --- a/src/signalr.rs +++ b/src/signalr.rs @@ -113,9 +113,9 @@ impl Stream { Self { ws, buffer: vec![] } } - pub fn recv(&mut self) -> Result { + pub async fn recv(&mut self) -> Result { while self.buffer.is_empty() { - self.buffer = self.ws.recv()?; + self.buffer = self.ws.recv().await?; self.buffer.reverse(); } @@ -123,7 +123,7 @@ impl Stream { Ok(Message::from_json(json)?) } - pub fn invoke( + pub async fn invoke( &mut self, target: &str, args: serde_json::Value, @@ -131,6 +131,6 @@ impl Stream { self.ws.send(json!( { "arguments": args, "invocationId": "0", "target": target, - "type": 1} )) + "type": 1} )).await } } diff --git a/src/stream.rs b/src/stream.rs index 721ad1b..92e3ae6 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,9 +1,11 @@ use super::api::{ApiError, Context}; use serde::{Deserialize, Serialize}; use serde_json::json; -use std::net::TcpStream; +use tokio::net::TcpStream; use thiserror::Error; -use tungstenite::{stream::MaybeTlsStream, Message, WebSocket}; +//use tungstenite::{stream::MaybeTlsStream, Message, WebSocket}; +use tokio_tungstenite::{MaybeTlsStream, tungstenite::Message, WebSocketStream}; +use futures_util::{SinkExt,StreamExt}; const STREAM_API_NEGOTIATION_URL: &str = "https://streams.easee.com/hubs/products/negotiate?negotiateVersion=1"; @@ -36,15 +38,18 @@ pub enum RecvError { #[error("WS error: {0}")] TungsteniteError(#[from] tungstenite::Error), + + #[error("End of stream")] + EndOfStream, } pub struct Stream { - sock: WebSocket>, + sock: WebSocketStream>, } impl Stream { - pub fn open(ctx: &mut Context) -> Result { - let r: NegotiateResponse = ctx.post_raw(STREAM_API_NEGOTIATION_URL, &())?; + pub async fn open(ctx: &mut Context) -> Result { + let r: NegotiateResponse = ctx.post_raw(STREAM_API_NEGOTIATION_URL, &()).await?; let token = ctx.auth_token(); let wss_url = format!( @@ -52,7 +57,8 @@ impl Stream { WSS_URL, r.connection_token, token ); - let resp = tungstenite::client::connect(&wss_url); + let resp = tokio_tungstenite::connect_async(wss_url).await; + //let resp = tungstenite::client::connect(&wss_url); if let Err(tungstenite::Error::Http(he)) = &resp { eprintln!( @@ -62,19 +68,20 @@ impl Stream { } let mut stream = Stream { sock: resp?.0 }; - stream.send(json!({ "protocol": "json", "version": 1 }))?; + stream.send(json!({ "protocol": "json", "version": 1 })).await?; Ok(stream) } - pub fn send(&mut self, msg: T) -> Result<(), tungstenite::Error> { + pub async fn send(&mut self, msg: T) -> Result<(), tungstenite::Error> { let mut msg = serde_json::to_string(&msg).unwrap(); msg.push('\x1E'); - self.sock.send(Message::Text(msg)) + self.sock.send(Message::Text(msg)).await } - pub fn recv(&mut self) -> Result, RecvError> { - let msg = self.sock.read()?; + pub async fn recv(&mut self) -> Result, RecvError> { + let msg = self.sock.next().await + .ok_or(RecvError::EndOfStream)??; let Message::Text(txt) = msg else { return Err(RecvError::BadMessageType); };