diff --git a/Cargo.toml b/Cargo.toml index 555dcf3..fc6811a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,4 +20,5 @@ serde_json = "1.0.121" serde_repr = "0.1.19" thiserror = "1.0.63" tracing = "0.1.40" +tungstenite = { version = "0.23.0", optional = true, features = ["rustls-tls-native-roots"] } ureq = { version = "2.10.0", features = ["json"] } diff --git a/README.md b/README.md index b6d41ef..078056c 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ Work in progress. - [x] Read charger status - [ ] Control charging (start/pause/resume/stop) - [ ] Control dynamic current limits + - [ ] Websocket event stream - Ergonomics - [ ] Enums for protocol constants diff --git a/src/api.rs b/src/api.rs index 051dad5..e16d144 100644 --- a/src/api.rs +++ b/src/api.rs @@ -71,6 +71,7 @@ pub enum ChargerOpMode { #[derive(Clone,Copy,Debug,Deserialize_repr,Eq,Ord,PartialEq,PartialOrd)] #[repr(u8)] pub enum OutputPhase { + Unknown = 0, L1ToN = 10, L2ToN = 12, L3ToN = 14, @@ -285,6 +286,10 @@ impl Context { Ok(()) } + pub(crate) fn auth_token(&self) -> &str { + &self.auth_header[7..] + } + /// Use the refresh token to refresh credentials pub fn refresh_token(&mut self) -> Result<(), ApiError> { #[derive(Serialize)] @@ -343,10 +348,14 @@ impl Context { } } - fn post(&mut self, path: &str, params: &P) -> Result { - self.check_expired()?; + pub(crate) fn post(&mut self, path: &str, params: &P) -> Result { let url: String = format!("{}{}", API_BASE, path); - let req = ureq::post(&url) + self.post_raw(&url, params) + } + + pub(crate) fn post_raw(&mut self, url: &str, params: &P) -> Result { + self.check_expired()?; + let req = ureq::post(url) .set("Accept", "application/json") .set("Authorization", &self.auth_header); diff --git a/src/lib.rs b/src/lib.rs index e5fdf85..2fac595 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1 +1,4 @@ pub mod api; + +#[cfg(feature = "tungstenite")] +pub mod stream; \ No newline at end of file diff --git a/src/stream.rs b/src/stream.rs new file mode 100644 index 0000000..f09e2c0 --- /dev/null +++ b/src/stream.rs @@ -0,0 +1,109 @@ +use std::net::TcpStream; + +use tungstenite::{stream::MaybeTlsStream, WebSocket, Message}; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use super::api::{Context, ApiError}; +use serde_json::json; + +const STREAM_API_NEGOTIATION_URL: &str = "https://streams.easee.com/hubs/products/negotiate?negotiateVersion=1"; +const WSS_URL: &str = "wss://streams.easee.com/hubs/products"; + +#[derive(Clone,Debug,Deserialize,PartialEq,PartialOrd)] +#[serde(rename_all="camelCase")] +struct NegotiateResponse { + negotiate_version: u16, + connection_id: String, + connection_token: String, +} + +#[derive(Debug,Error)] +pub enum NegotiateError { + #[error("API error: {0}")] + ApiError(#[from] ApiError), + + #[error("WS error: {0}")] + TungsteniteError(#[from] tungstenite::Error), +} + +#[derive(Debug,Error)] +pub enum RecvError { + #[error("Bad message type")] + BadMessageType, + + #[error("Not a SignalR message: {0:?}")] + NotSignalRMessage(String), + + #[error("Invalid json: {0}")] + InvalidJson(#[from] serde_json::Error), + + #[error("WS error: {0}")] + TungsteniteError(#[from] tungstenite::Error), +} + +pub struct Stream { sock: WebSocket> } + +impl Stream { + pub fn open(ctx: &mut Context) -> Result { + let r: NegotiateResponse = ctx.post_raw(STREAM_API_NEGOTIATION_URL, &())?; + dbg!(&r); + + let token = ctx.auth_token(); + let wss_url = format!("{}?id={}&access_token={}", WSS_URL, + r.connection_token, token); + dbg!(&wss_url); + + /* + let req = tungstenite::http::Request::builder() + .uri(WSS_URL) + .header("Accept", "* / *") + .header("Host", "streams.easee.com") + .header("Origin", "https://portal.easee.com") + .header("Connection", "keep-alive, Upgrade") + .header("Upgrade", "websocket") + .header("Sec-WebSocket-Version", "13") + .header("Sec-WebSocket-Key", tungstenite::handshake::client::generate_key()) + .header("Sec-Fetch-Dest", "websocket") + .header("Sec-Fetch-Mode", "websocket") + .header("Sec-Fetch-Site", "same-site") + .header("Cookie", format!("easee_skaat=\"{}\"", token)) + .body(()).unwrap(); + */ + + let resp = tungstenite::client::connect(&wss_url); + + if let Err(tungstenite::Error::Http(he)) = &resp { + eprintln!("Response: {}", std:: str::from_utf8(&he.body().as_ref().unwrap()).unwrap()); + } + + let mut stream = Stream { sock: resp?.0 }; + stream.send(json!({ "protocol": "json", "version": 1 }))?; + + Ok(stream) + } + + fn send(&mut self, msg: T) -> Result<(), tungstenite::Error> { + let mut msg = serde_json::to_string(&msg).unwrap(); + msg.push('\x1E'); + self.sock.send(Message::Text(msg)) + } + + pub fn recv(&mut self) -> Result { + let msg = self.sock.read()?; + let Message::Text(txt) = msg else { return Err(RecvError::BadMessageType) }; + let json: &str = match txt.strip_suffix("\x1E") { + None => return Err(RecvError::NotSignalRMessage(txt)), + Some(json) => json + }; + dbg!(&json); + Ok(serde_json::from_str(json)?) + } + + pub fn subscribe(&mut self, id: &str) -> Result<(), tungstenite::Error> { + self.send(json!( { "arguments": [id, true], + "invocationId": "0", + "target": "SubscribeWithCurrentState", + "type": 1} )) + } + +}