Implement SignalR layer decoding, run cargo fmt

This commit is contained in:
Maxime Augier 2024-08-07 19:50:37 +02:00
parent b4dedb3728
commit d1702d69a9
5 changed files with 278 additions and 76 deletions

View File

@ -1,4 +1,7 @@
use std::{io, time::{Duration, Instant}}; use std::{
io,
time::{Duration, Instant},
};
use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize}; use serde::{de::DeserializeOwned, Deserialize, Deserializer, Serialize};
use serde_repr::Deserialize_repr; use serde_repr::Deserialize_repr;
@ -16,12 +19,11 @@ pub struct Context {
const API_BASE: &str = "https://api.easee.com/api/"; const API_BASE: &str = "https://api.easee.com/api/";
const REFRESH_TOKEN_DELAY: Duration = Duration::from_secs(600); const REFRESH_TOKEN_DELAY: Duration = Duration::from_secs(600);
#[derive(Clone,Copy,Debug,Eq,Ord,PartialEq,PartialOrd)] #[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub struct NaiveDateTime(pub chrono::NaiveDateTime); pub struct NaiveDateTime(pub chrono::NaiveDateTime);
impl<'de> Deserialize<'de> for NaiveDateTime { impl<'de> Deserialize<'de> for NaiveDateTime {
fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
{
use serde::de::Error; use serde::de::Error;
let s = <&str as Deserialize>::deserialize(d)?; let s = <&str as Deserialize>::deserialize(d)?;
let dt = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f") let dt = chrono::NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f")
@ -30,12 +32,11 @@ impl<'de> Deserialize<'de> for NaiveDateTime {
} }
} }
#[derive(Clone,Copy,Debug,Eq,Ord,PartialEq,PartialOrd)] #[derive(Clone, Copy, Debug, Eq, Ord, PartialEq, PartialOrd)]
pub struct UtcDateTime(pub chrono::DateTime<chrono::Utc>); pub struct UtcDateTime(pub chrono::DateTime<chrono::Utc>);
impl<'de> Deserialize<'de> for UtcDateTime { impl<'de> Deserialize<'de> for UtcDateTime {
fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> fn deserialize<D: Deserializer<'de>>(d: D) -> Result<Self, D::Error> {
{
use serde::de::Error; use serde::de::Error;
let s = <&str as Deserialize>::deserialize(d)?; let s = <&str as Deserialize>::deserialize(d)?;
let dt = chrono::DateTime::parse_from_str(s, "%+") let dt = chrono::DateTime::parse_from_str(s, "%+")
@ -45,8 +46,8 @@ impl<'de> Deserialize<'de> for UtcDateTime {
} }
} }
#[derive(Clone,Debug,Deserialize,Eq,Ord,PartialEq,PartialOrd)] #[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd)]
#[serde(rename_all="camelCase")] #[serde(rename_all = "camelCase")]
pub struct Charger { pub struct Charger {
pub id: String, pub id: String,
pub name: String, pub name: String,
@ -57,7 +58,7 @@ pub struct Charger {
pub level_of_access: u32, pub level_of_access: u32,
} }
#[derive(Clone,Copy,Debug,Deserialize_repr,Eq,Ord,PartialEq,PartialOrd)] #[derive(Clone, Copy, Debug, Deserialize_repr, Eq, Ord, PartialEq, PartialOrd)]
#[repr(u8)] #[repr(u8)]
pub enum ChargerOpMode { pub enum ChargerOpMode {
Disconnected = 1, Disconnected = 1,
@ -68,7 +69,7 @@ pub enum ChargerOpMode {
Ready = 6, Ready = 6,
} }
#[derive(Clone,Copy,Debug,Deserialize_repr,Eq,Ord,PartialEq,PartialOrd)] #[derive(Clone, Copy, Debug, Deserialize_repr, Eq, Ord, PartialEq, PartialOrd)]
#[repr(u8)] #[repr(u8)]
pub enum OutputPhase { pub enum OutputPhase {
Unknown = 0, Unknown = 0,
@ -84,8 +85,8 @@ pub enum OutputPhase {
L1L2L3ToN = 30, L1L2L3ToN = 30,
} }
#[derive(Clone,Debug,Deserialize,PartialEq,PartialOrd)] #[derive(Clone, Debug, Deserialize, PartialEq, PartialOrd)]
#[serde(rename_all="camelCase")] #[serde(rename_all = "camelCase")]
pub struct ChargerState { pub struct ChargerState {
pub smart_charging: bool, pub smart_charging: bool,
pub cable_locked: bool, pub cable_locked: bool,
@ -94,13 +95,13 @@ pub struct ChargerState {
pub session_energy: f64, pub session_energy: f64,
pub energy_per_hour: f64, pub energy_per_hour: f64,
#[serde(rename="wiFiRSSI")] #[serde(rename = "wiFiRSSI")]
pub wifi_rssi: Option<i32>, pub wifi_rssi: Option<i32>,
#[serde(rename="cellRSSI")] #[serde(rename = "cellRSSI")]
pub cell_rssi: Option<i32>, pub cell_rssi: Option<i32>,
#[serde(rename="localRSSI")] #[serde(rename = "localRSSI")]
pub local_rssi: Option<i32>, pub local_rssi: Option<i32>,
pub output_phase: OutputPhase, pub output_phase: OutputPhase,
pub dynamic_circuit_current_p1: u32, pub dynamic_circuit_current_p1: u32,
@ -111,7 +112,7 @@ pub struct ChargerState {
pub charger_firmware: u32, pub charger_firmware: u32,
pub voltage: f64, pub voltage: f64,
#[serde(rename="chargerRAT")] #[serde(rename = "chargerRAT")]
pub charger_rat: u32, pub charger_rat: u32,
pub lock_cable_permanently: bool, pub lock_cable_permanently: bool,
pub in_current_t2: Option<f64>, pub in_current_t2: Option<f64>,
@ -141,7 +142,7 @@ pub struct ChargerState {
pub circuit_total_phase_conductor_current_l3: f64, pub circuit_total_phase_conductor_current_l3: f64,
pub reason_for_no_current: u32, pub reason_for_no_current: u32,
#[serde(rename="wiFiAPEnabled")] #[serde(rename = "wiFiAPEnabled")]
pub wifi_ap_enabled: bool, pub wifi_ap_enabled: bool,
pub lifetime_energy: f64, pub lifetime_energy: f64,
pub offline_max_circuit_current_p1: u32, pub offline_max_circuit_current_p1: u32,
@ -155,10 +156,9 @@ pub struct ChargerState {
pub derated_current: Option<f64>, pub derated_current: Option<f64>,
pub derating_active: bool, pub derating_active: bool,
pub connected_to_cloud: bool, pub connected_to_cloud: bool,
} }
#[derive(Clone,Debug,Deserialize,PartialEq,PartialOrd)] #[derive(Clone, Debug, Deserialize, PartialEq, PartialOrd)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct ChargingSession { pub struct ChargingSession {
pub charger_id: Option<String>, pub charger_id: Option<String>,
@ -176,16 +176,13 @@ pub struct ChargingSession {
pub currency_id: Option<String>, pub currency_id: Option<String>,
pub cost_including_vat: Option<f64>, pub cost_including_vat: Option<f64>,
pub cost_excluding_vat: Option<f64>, pub cost_excluding_vat: Option<f64>,
} }
#[derive(Debug,Deserialize)] #[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Address { pub struct Address {}
} #[derive(Clone, Debug, Deserialize, Eq, Ord, PartialEq, PartialOrd)]
#[derive(Clone,Debug,Deserialize,Eq,Ord,PartialEq,PartialOrd)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Site { pub struct Site {
pub uuid: Option<String>, pub uuid: Option<String>,
@ -194,20 +191,28 @@ pub struct Site {
pub name: Option<String>, pub name: Option<String>,
pub level_of_access: u32, pub level_of_access: u32,
//pub address: Address, //pub address: Address,
pub installer_alias: Option<String> pub installer_alias: Option<String>,
} }
#[derive(Clone,Debug,Deserialize)] #[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct LoginResponse { pub struct LoginResponse {
pub access_token: String, pub access_token: String,
pub expires_in: u32, pub expires_in: u32,
pub access_claims: Vec<Option<String>>, pub access_claims: Vec<Option<String>>,
pub token_type: Option<String>, pub token_type: Option<String>,
pub refresh_token: String pub refresh_token: String,
} }
#[derive(Debug,Error)] #[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct CommandReply {
command_id: u64,
device: String,
ticks: u64,
}
#[derive(Debug, Error)]
pub enum ApiError { pub enum ApiError {
/// HTTP call caused an IO error /// HTTP call caused an IO error
#[error("io: {0}")] #[error("io: {0}")]
@ -227,7 +232,7 @@ pub enum ApiError {
/// A JSON datetime field could not be parsed /// A JSON datetime field could not be parsed
#[error("format error: {0}")] #[error("format error: {0}")]
FormatError(#[from] chrono::ParseError) FormatError(#[from] chrono::ParseError),
} }
impl From<ureq::Error> for ApiError { impl From<ureq::Error> for ApiError {
@ -250,12 +255,14 @@ impl JsonExplicitError for ureq::Response {
} }
impl Context { impl Context {
/// Build a context from provided acess tokens /// Build a context from provided acess tokens
pub fn from_tokens(access_token: &str, refresh_token: String, expires_in: u32) -> Self { pub fn from_tokens(access_token: &str, refresh_token: String, expires_in: u32) -> Self {
Self { auth_header: format!("Bearer {}", access_token), Self {
refresh_token, auth_header: format!("Bearer {}", access_token),
token_expiration: Instant::now() + Duration::from_secs(expires_in as u64) - REFRESH_TOKEN_DELAY } refresh_token,
token_expiration: Instant::now() + Duration::from_secs(expires_in as u64)
- REFRESH_TOKEN_DELAY,
}
} }
fn from_login_response(resp: LoginResponse) -> Self { fn from_login_response(resp: LoginResponse) -> Self {
@ -265,13 +272,19 @@ impl Context {
/// Retrieve access tokens online, by logging in with the provided credentials /// Retrieve access tokens online, by logging in with the provided credentials
pub fn from_login(user: &str, password: &str) -> Result<Self, ApiError> { pub fn from_login(user: &str, password: &str) -> Result<Self, ApiError> {
#[derive(Serialize)] #[derive(Serialize)]
#[serde(rename_all="camelCase")] #[serde(rename_all = "camelCase")]
struct Params<'t> { user_name: &'t str, password: &'t str } struct Params<'t> {
user_name: &'t str,
password: &'t str,
}
info!("Logging into API"); info!("Logging into API");
let url: String = format!("{}accounts/login", API_BASE); let url: String = format!("{}accounts/login", API_BASE);
let resp: LoginResponse = ureq::post(&url) let resp: LoginResponse = ureq::post(&url)
.send_json(Params { user_name: user, password } )? .send_json(Params {
user_name: user,
password,
})?
.into_json_with_error()?; .into_json_with_error()?;
Ok(Self::from_login_response(resp)) Ok(Self::from_login_response(resp))
@ -293,11 +306,15 @@ impl Context {
/// Use the refresh token to refresh credentials /// Use the refresh token to refresh credentials
pub fn refresh_token(&mut self) -> Result<(), ApiError> { pub fn refresh_token(&mut self) -> Result<(), ApiError> {
#[derive(Serialize)] #[derive(Serialize)]
#[serde(rename_all="camelCase")] #[serde(rename_all = "camelCase")]
struct Params<'t> { refresh_token: &'t str } struct Params<'t> {
refresh_token: &'t str,
}
info!("Refreshing access token"); info!("Refreshing access token");
let params = Params { refresh_token: &self.refresh_token }; let params = Params {
refresh_token: &self.refresh_token,
};
let url = format!("{}accounts/refresh_token", API_BASE); let url = format!("{}accounts/refresh_token", API_BASE);
let resp: LoginResponse = ureq::post(&url) let resp: LoginResponse = ureq::post(&url)
.set("Content-type", "application/json") .set("Content-type", "application/json")
@ -306,7 +323,6 @@ impl Context {
*self = Self::from_login_response(resp); *self = Self::from_login_response(resp);
Ok(()) Ok(())
} }
/// List all sites available to the user /// List all sites available to the user
@ -341,19 +357,27 @@ impl Context {
match self.get(path) { match self.get(path) {
Ok(r) => Ok(Some(r)), Ok(r) => Ok(Some(r)),
Err(ApiError::Ureq(e)) => match &*e { Err(ApiError::Ureq(e)) => match &*e {
ureq::Error::Status(404, _ ) => Ok(None), ureq::Error::Status(404, _) => Ok(None),
_ => Err(ApiError::Ureq(e)) _ => Err(ApiError::Ureq(e)),
}, },
Err(other) => Err(other) Err(other) => Err(other),
} }
} }
pub(crate) fn post<T: DeserializeOwned, P: Serialize>(&mut self, path: &str, params: &P) -> Result<T, ApiError> { pub(crate) fn post<T: DeserializeOwned, P: Serialize>(
&mut self,
path: &str,
params: &P,
) -> Result<T, ApiError> {
let url: String = format!("{}{}", API_BASE, path); let url: String = format!("{}{}", API_BASE, path);
self.post_raw(&url, params) self.post_raw(&url, params)
} }
pub(crate) fn post_raw<T: DeserializeOwned, P: Serialize>(&mut self, url: &str, params: &P) -> Result<T, ApiError> { pub(crate) fn post_raw<T: DeserializeOwned, P: Serialize>(
&mut self,
url: &str,
params: &P,
) -> Result<T, ApiError> {
self.check_expired()?; self.check_expired()?;
let req = ureq::post(url) let req = ureq::post(url)
.set("Accept", "application/json") .set("Accept", "application/json")
@ -368,12 +392,11 @@ impl Context {
resp.into_json_with_error() resp.into_json_with_error()
} }
} }
/// Energy meter reading /// Energy meter reading
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
#[serde(rename_all="camelCase")] #[serde(rename_all = "camelCase")]
pub struct MeterReading { pub struct MeterReading {
/// ID of the charger /// ID of the charger
pub charger_id: String, pub charger_id: String,
@ -412,26 +435,27 @@ impl Charger {
ctx.maybe_get(&format!("chargers/{}/sessions/latest", &self.id)) ctx.maybe_get(&format!("chargers/{}/sessions/latest", &self.id))
} }
fn command(&self, ctx: &mut Context, command: &str) -> Result<(), ApiError> { fn command(&self, ctx: &mut Context, command: &str) -> Result<CommandReply, ApiError> {
ctx.post(&format!("chargers/{}/commands/{}", self.id, command), &()) ctx.post(&format!("chargers/{}/commands/{}", self.id, command), &())
} }
pub fn start(&self, ctx: &mut Context) -> Result<(), ApiError> { pub fn start(&self, ctx: &mut Context) -> Result<(), ApiError> {
self.command(ctx, "start_charging") self.command(ctx, "start_charging")?;
Ok(())
} }
pub fn pause(&self, ctx: &mut Context) -> Result<(), ApiError> { pub fn pause(&self, ctx: &mut Context) -> Result<(), ApiError> {
self.command(ctx, "pause_charging") self.command(ctx, "pause_charging")?;
Ok(())
} }
pub fn resume(&self, ctx: &mut Context) -> Result<(), ApiError> { pub fn resume(&self, ctx: &mut Context) -> Result<(), ApiError> {
self.command(ctx, "resume_charging") self.command(ctx, "resume_charging")?;
Ok(())
} }
pub fn stop(&self, ctx: &mut Context) -> Result<(), ApiError> { pub fn stop(&self, ctx: &mut Context) -> Result<(), ApiError> {
self.command(ctx, "stop_charging") self.command(ctx, "stop_charging")?;
Ok(())
} }
} }

View File

@ -2,3 +2,8 @@ pub mod api;
#[cfg(feature = "tungstenite")] #[cfg(feature = "tungstenite")]
pub mod stream; pub mod stream;
#[cfg(feature = "tungstenite")]
pub mod signalr;
pub mod observation;

40
src/observation.rs Normal file
View File

@ -0,0 +1,40 @@
use crate::api::ChargerOpMode;
#[repr(u8)]
pub enum PilotMode {
Disconnected = b'A',
Connected = b'B',
Charging = b'C',
NeedsVentilation = b'D',
FaultDetected = b'F',
}
#[repr(u8)]
pub enum PhaseMode {
Ignore = 0,
Phase1 = 1,
Auto = 2,
Phase2 = 3,
}
pub enum ReasonForNoCurrent {}
pub enum Observation {
SelfTestResult(String),
SelfTestDetails(serde_json::Value),
WifiEvent(u64),
ChargerOfflineReason(u64),
CircuitMaxCurrent { phase: u8, amperes: u64 },
SiteID(String),
IsEnabled(bool),
Temperature(u64),
TriplePhase(bool),
DynamicChargerCurrent(f64),
ReasonForNoCurrent(ReasonForNoCurrent),
PilotMode(PilotMode),
SmartCharging(bool),
CableLocked(bool),
CableRating(f64),
UserId(String),
ChargerOpMode(ChargerOpMode),
}

125
src/signalr.rs Normal file
View File

@ -0,0 +1,125 @@
use serde_json::Value;
use thiserror::Error;
use crate::stream::RecvError;
/* This entire module can be rewritten in two lines when
https://github.com/serde-rs/serde/issues/745
is merged */
#[derive(Debug)]
pub enum Message {
Empty,
Invocation {
target: String,
arguments: Vec<Value>,
},
InvocationResult {
id: String,
result: serde_json::Value,
},
Ping,
Other(serde_json::Value),
}
#[derive(Debug, Error)]
pub enum ParseError {
#[error("Expecting object, received {0}")]
ExpectingObject(Value),
#[error("Missing `type` key")]
MissingTypeKey,
#[error("`type` is not a number")]
TypeNotANumber,
#[error("Unknown type {0}")]
UnknownType(u64),
#[error("Missing expected key {0}")]
MissingKey(&'static str),
#[error("Expecting string")]
ExpectingString,
#[error("Expecting array")]
ExpectingArray,
}
impl Message {
pub fn from_json(msg: Value) -> Result<Self, ParseError> {
let Some(obj) = msg.as_object() else {
return Err(ParseError::ExpectingObject(msg));
};
if obj.is_empty() {
return Ok(Message::Empty);
}
let typ = obj
.get("type")
.ok_or(ParseError::MissingTypeKey)?
.as_number()
.and_then(|n| n.as_u64())
.ok_or(ParseError::TypeNotANumber)?;
match typ {
1 => Ok(Message::Invocation {
target: obj
.get("target")
.ok_or(ParseError::MissingKey("target"))?
.as_str()
.ok_or(ParseError::ExpectingString)?
.to_owned(),
arguments: obj
.get("arguments")
.ok_or(ParseError::MissingKey("arguments"))?
.as_array()
.ok_or(ParseError::ExpectingArray)?
.to_owned(),
}),
3 => Ok(Message::InvocationResult {
id: obj
.get("invocationId")
.ok_or(ParseError::MissingKey("invocationId"))?
.as_str()
.ok_or(ParseError::ExpectingString)?
.to_owned(),
result: obj
.get("result")
.ok_or(ParseError::MissingKey("result"))?
.to_owned(),
}),
6 => Ok(Message::Ping),
_ => Ok(Message::Other(msg)),
}
}
}
#[derive(Debug, Error)]
pub enum StreamError {
#[error("Parse error: {0}")]
ParseError(#[from] ParseError),
#[error("Recv error: {0}")]
StreamError(#[from] RecvError),
}
pub struct Stream {
buffer: Vec<serde_json::Value>,
ws: super::stream::Stream,
}
impl Stream {
pub fn from_ws(ws: super::stream::Stream) -> Self {
Self { ws, buffer: vec![] }
}
pub fn recv(&mut self) -> Result<Message, StreamError> {
while self.buffer.is_empty() {
self.buffer = self.ws.recv()?;
self.buffer.reverse();
}
let json = self.buffer.pop().unwrap();
Ok(Message::from_json(json)?)
}
}

View File

@ -1,23 +1,24 @@
use std::net::TcpStream; use std::net::TcpStream;
use tungstenite::{stream::MaybeTlsStream, WebSocket, Message}; use super::api::{ApiError, Context};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use thiserror::Error;
use super::api::{Context, ApiError};
use serde_json::json; use serde_json::json;
use thiserror::Error;
use tungstenite::{stream::MaybeTlsStream, Message, WebSocket};
const STREAM_API_NEGOTIATION_URL: &str = "https://streams.easee.com/hubs/products/negotiate?negotiateVersion=1"; const STREAM_API_NEGOTIATION_URL: &str =
"https://streams.easee.com/hubs/products/negotiate?negotiateVersion=1";
const WSS_URL: &str = "wss://streams.easee.com/hubs/products"; const WSS_URL: &str = "wss://streams.easee.com/hubs/products";
#[derive(Clone,Debug,Deserialize,PartialEq,PartialOrd)] #[derive(Clone, Debug, Deserialize, PartialEq, PartialOrd)]
#[serde(rename_all="camelCase")] #[serde(rename_all = "camelCase")]
struct NegotiateResponse { struct NegotiateResponse {
negotiate_version: u16, negotiate_version: u16,
connection_id: String, connection_id: String,
connection_token: String, connection_token: String,
} }
#[derive(Debug,Error)] #[derive(Debug, Error)]
pub enum NegotiateError { pub enum NegotiateError {
#[error("API error: {0}")] #[error("API error: {0}")]
ApiError(#[from] ApiError), ApiError(#[from] ApiError),
@ -26,7 +27,7 @@ pub enum NegotiateError {
TungsteniteError(#[from] tungstenite::Error), TungsteniteError(#[from] tungstenite::Error),
} }
#[derive(Debug,Error)] #[derive(Debug, Error)]
pub enum RecvError { pub enum RecvError {
#[error("Bad message type")] #[error("Bad message type")]
BadMessageType, BadMessageType,
@ -48,8 +49,10 @@ impl Stream {
dbg!(&r); dbg!(&r);
let token = ctx.auth_token(); let token = ctx.auth_token();
let wss_url = format!("{}?id={}&access_token={}", WSS_URL, let wss_url = format!(
r.connection_token, token); "{}?id={}&access_token={}",
WSS_URL, r.connection_token, token
);
dbg!(&wss_url); dbg!(&wss_url);
/* /*
@ -72,7 +75,10 @@ impl Stream {
let resp = tungstenite::client::connect(&wss_url); let resp = tungstenite::client::connect(&wss_url);
if let Err(tungstenite::Error::Http(he)) = &resp { if let Err(tungstenite::Error::Http(he)) = &resp {
eprintln!("Response: {}", std:: str::from_utf8(&he.body().as_ref().unwrap()).unwrap()); eprintln!(
"Response: {}",
std::str::from_utf8(&he.body().as_ref().unwrap()).unwrap()
);
} }
let mut stream = Stream { sock: resp?.0 }; let mut stream = Stream { sock: resp?.0 };
@ -89,11 +95,14 @@ impl Stream {
pub fn recv(&mut self) -> Result<Vec<serde_json::Value>, RecvError> { pub fn recv(&mut self) -> Result<Vec<serde_json::Value>, RecvError> {
let msg = self.sock.read()?; let msg = self.sock.read()?;
let Message::Text(txt) = msg else { return Err(RecvError::BadMessageType) }; let Message::Text(txt) = msg else {
return Err(RecvError::BadMessageType);
};
let msgs = txt.split_terminator('\x1E') let msgs = txt
.filter_map(|s| serde_json::from_str(s).ok()) .split_terminator('\x1E')
.collect(); .filter_map(|s| serde_json::from_str(s).ok())
.collect();
Ok(msgs) Ok(msgs)
} }
@ -104,5 +113,4 @@ impl Stream {
"target": "SubscribeWithCurrentState", "target": "SubscribeWithCurrentState",
"type": 1} )) "type": 1} ))
} }
} }