Start implementing event stream

This commit is contained in:
Maxime Augier 2024-08-02 18:57:54 +02:00
parent c5799902c1
commit 888c44834a
5 changed files with 126 additions and 3 deletions

View File

@ -20,4 +20,5 @@ serde_json = "1.0.121"
serde_repr = "0.1.19" serde_repr = "0.1.19"
thiserror = "1.0.63" thiserror = "1.0.63"
tracing = "0.1.40" tracing = "0.1.40"
tungstenite = { version = "0.23.0", optional = true, features = ["rustls-tls-native-roots"] }
ureq = { version = "2.10.0", features = ["json"] } ureq = { version = "2.10.0", features = ["json"] }

View File

@ -13,6 +13,7 @@ Work in progress.
- [x] Read charger status - [x] Read charger status
- [ ] Control charging (start/pause/resume/stop) - [ ] Control charging (start/pause/resume/stop)
- [ ] Control dynamic current limits - [ ] Control dynamic current limits
- [ ] Websocket event stream
- Ergonomics - Ergonomics
- [ ] Enums for protocol constants - [ ] Enums for protocol constants

View File

@ -71,6 +71,7 @@ pub enum ChargerOpMode {
#[derive(Clone,Copy,Debug,Deserialize_repr,Eq,Ord,PartialEq,PartialOrd)] #[derive(Clone,Copy,Debug,Deserialize_repr,Eq,Ord,PartialEq,PartialOrd)]
#[repr(u8)] #[repr(u8)]
pub enum OutputPhase { pub enum OutputPhase {
Unknown = 0,
L1ToN = 10, L1ToN = 10,
L2ToN = 12, L2ToN = 12,
L3ToN = 14, L3ToN = 14,
@ -285,6 +286,10 @@ impl Context {
Ok(()) Ok(())
} }
pub(crate) fn auth_token(&self) -> &str {
&self.auth_header[7..]
}
/// Use the refresh token to refresh credentials /// Use the refresh token to refresh credentials
pub fn refresh_token(&mut self) -> Result<(), ApiError> { pub fn refresh_token(&mut self) -> Result<(), ApiError> {
#[derive(Serialize)] #[derive(Serialize)]
@ -343,10 +348,14 @@ impl Context {
} }
} }
fn post<T: DeserializeOwned, P: Serialize>(&mut self, path: &str, params: &P) -> Result<T, ApiError> { pub(crate) fn post<T: DeserializeOwned, P: Serialize>(&mut self, path: &str, params: &P) -> Result<T, ApiError> {
self.check_expired()?;
let url: String = format!("{}{}", API_BASE, path); let url: String = format!("{}{}", API_BASE, path);
let req = ureq::post(&url) self.post_raw(&url, params)
}
pub(crate) fn post_raw<T: DeserializeOwned, P: Serialize>(&mut self, url: &str, params: &P) -> Result<T, ApiError> {
self.check_expired()?;
let req = ureq::post(url)
.set("Accept", "application/json") .set("Accept", "application/json")
.set("Authorization", &self.auth_header); .set("Authorization", &self.auth_header);

View File

@ -1 +1,4 @@
pub mod api; pub mod api;
#[cfg(feature = "tungstenite")]
pub mod stream;

109
src/stream.rs Normal file
View File

@ -0,0 +1,109 @@
use std::net::TcpStream;
use tungstenite::{stream::MaybeTlsStream, WebSocket, Message};
use serde::{Deserialize, Serialize};
use thiserror::Error;
use super::api::{Context, ApiError};
use serde_json::json;
const STREAM_API_NEGOTIATION_URL: &str = "https://streams.easee.com/hubs/products/negotiate?negotiateVersion=1";
const WSS_URL: &str = "wss://streams.easee.com/hubs/products";
#[derive(Clone,Debug,Deserialize,PartialEq,PartialOrd)]
#[serde(rename_all="camelCase")]
struct NegotiateResponse {
negotiate_version: u16,
connection_id: String,
connection_token: String,
}
#[derive(Debug,Error)]
pub enum NegotiateError {
#[error("API error: {0}")]
ApiError(#[from] ApiError),
#[error("WS error: {0}")]
TungsteniteError(#[from] tungstenite::Error),
}
#[derive(Debug,Error)]
pub enum RecvError {
#[error("Bad message type")]
BadMessageType,
#[error("Not a SignalR message: {0:?}")]
NotSignalRMessage(String),
#[error("Invalid json: {0}")]
InvalidJson(#[from] serde_json::Error),
#[error("WS error: {0}")]
TungsteniteError(#[from] tungstenite::Error),
}
pub struct Stream { sock: WebSocket<MaybeTlsStream<TcpStream>> }
impl Stream {
pub fn open(ctx: &mut Context) -> Result<Stream, NegotiateError> {
let r: NegotiateResponse = ctx.post_raw(STREAM_API_NEGOTIATION_URL, &())?;
dbg!(&r);
let token = ctx.auth_token();
let wss_url = format!("{}?id={}&access_token={}", WSS_URL,
r.connection_token, token);
dbg!(&wss_url);
/*
let req = tungstenite::http::Request::builder()
.uri(WSS_URL)
.header("Accept", "* / *")
.header("Host", "streams.easee.com")
.header("Origin", "https://portal.easee.com")
.header("Connection", "keep-alive, Upgrade")
.header("Upgrade", "websocket")
.header("Sec-WebSocket-Version", "13")
.header("Sec-WebSocket-Key", tungstenite::handshake::client::generate_key())
.header("Sec-Fetch-Dest", "websocket")
.header("Sec-Fetch-Mode", "websocket")
.header("Sec-Fetch-Site", "same-site")
.header("Cookie", format!("easee_skaat=\"{}\"", token))
.body(()).unwrap();
*/
let resp = tungstenite::client::connect(&wss_url);
if let Err(tungstenite::Error::Http(he)) = &resp {
eprintln!("Response: {}", std:: str::from_utf8(&he.body().as_ref().unwrap()).unwrap());
}
let mut stream = Stream { sock: resp?.0 };
stream.send(json!({ "protocol": "json", "version": 1 }))?;
Ok(stream)
}
fn send<T: Serialize>(&mut self, msg: T) -> Result<(), tungstenite::Error> {
let mut msg = serde_json::to_string(&msg).unwrap();
msg.push('\x1E');
self.sock.send(Message::Text(msg))
}
pub fn recv(&mut self) -> Result<serde_json::Value, RecvError> {
let msg = self.sock.read()?;
let Message::Text(txt) = msg else { return Err(RecvError::BadMessageType) };
let json: &str = match txt.strip_suffix("\x1E") {
None => return Err(RecvError::NotSignalRMessage(txt)),
Some(json) => json
};
dbg!(&json);
Ok(serde_json::from_str(json)?)
}
pub fn subscribe(&mut self, id: &str) -> Result<(), tungstenite::Error> {
self.send(json!( { "arguments": [id, true],
"invocationId": "0",
"target": "SubscribeWithCurrentState",
"type": 1} ))
}
}