sensirion_exporter/src/main.rs

146 lines
4.5 KiB
Rust

use std::{collections::HashMap, convert::Infallible, sync::Arc, time::{SystemTime, UNIX_EPOCH}};
use bluer::{self, Adapter, AdapterEvent, Address, DiscoveryFilter, DiscoveryTransport};
use anyhow::{Result, anyhow};
use clap::Parser;
use futures::{Stream, StreamExt};
use tokio;
use warp::Filter;
use tokio::sync::Mutex;
const VENDOR_ID_SENSIRION: u16 = 0x06d5;
#[derive(Debug)]
struct Data {
/// ID
addr: Address,
/// Temperature in °C
temperature: f32,
/// Humidity in %
humidity: f32,
/// CO2 in parts per million
co2: u16,
}
impl Data {
fn from_pkt(addr: Address, b: &[u8]) -> Data {
let round = |v: f32| (v*100.0).round() / 100.0;
Data {
addr,
temperature: round((u16::from_le_bytes([b[2], b[3]]) as f32) * 175.0 / 65535.0 - 45.0),
humidity: round((u16::from_le_bytes([b[4], b[5]]) as f32) * 100.0 / 65535.0),
co2: u16::from_le_bytes([b[6], b[7]]),
}
}
}
#[derive(Debug,Default)]
struct DataStore {
devices: HashMap<Address, (Data, SystemTime)>,
}
impl DataStore {
pub fn insert(&mut self, data: Data, timestamp: SystemTime) {
self.devices.insert(data.addr, (data, timestamp));
}
pub fn scrape(&mut self) -> String {
let mut scrape = format!("# HELP sensirion_temperature_celsius Temperature reported by a Sensirion sensor, in °C\n");
scrape += "# TYPE sensirion_temperature_celsius gauge\n";
scrape += "# HELP sensirion_humidity_percent Humidity reported by a Sensirion sensor, in %H\n";
scrape += "# TYPE sensirion_humidity_percent gauge\n";
scrape += "# HELP sensirion_co2_ppm CO2 detected by a Sensirion sensor, in parts per million\n";
scrape += "# TYPE sensirion_co2_ppm gauge\n";
for (_, (data, point)) in self.devices.iter() {
let Data { addr, temperature, humidity, co2 } = data;
let time = point.duration_since(UNIX_EPOCH).expect("Time went backwards").as_millis();
scrape += &format!("sensirion_temperature_celsius{{id=\"{addr}\"}} {temperature} {time}\n");
scrape += &format!("sensirion_humidity_percent{{id=\"{addr}\"}} {humidity} {time}\n");
scrape += &format!("sensirion_co2_ppm{{id=\"{addr}\"}} {co2} {time}\n");
}
scrape
}
}
#[derive(Debug,clap::Parser)]
struct CLI {
/// Address to bind
#[arg(short, long, default_value = "127.0.0.1:9174")]
bind: String,
/// BLE HCI adapter to use
#[arg(short, long)]
interface: Option<String>,
}
#[tokio::main(flavor="current_thread")]
async fn main() -> Result<()> {
let args = CLI::parse();
let session = bluer::Session::new().await?;
let adapter = match &args.interface {
Some(hci) => session.adapter(&*hci),
None => session.default_adapter().await,
}?;
let adapter = Arc::new(adapter);
let store = Arc::new(Mutex::new(DataStore::default()));
let mut stream = Box::pin(sensor_reports(adapter).await?);
let sniffer_store = Arc::clone(&store);
tokio::spawn(async move {
while let Some(data) = stream.next().await {
eprintln!("Data point: {:?}", data);
sniffer_store.lock().await.insert(data, SystemTime::now());
}
});
let filter =
warp::path!("metrics")
.map(move || Arc::clone(&store))
.and_then(|store: Arc<Mutex<DataStore>>| async move { Ok::<_,Infallible>(store.lock().await.scrape()) })
.or(warp::path!().map(|| { warp::reply::html("<h1>Sensirion BLE exporter</h1>") }));
let host = tokio::net::lookup_host(&args.bind).await?
.next()
.ok_or(anyhow!("Cannot resolve host to bind {}", &args.bind))?;
Ok(warp::serve(filter).run(host).await)
}
async fn extract(adapter: Arc<Adapter>, evt: AdapterEvent) -> Option<Data> {
let AdapterEvent::DeviceAdded(addr) = evt else { return None };
let dev = adapter.device(addr).ok()?;
let data = dev.manufacturer_data().await.ok()??;
let payload = data.get(&VENDOR_ID_SENSIRION)?;
Some(Data::from_pkt(addr, &payload[2..]))
}
async fn sensor_reports<'a>(adapter: Arc<Adapter>) -> Result<impl Stream<Item=Data> + 'a> {
let filter = DiscoveryFilter {
transport: DiscoveryTransport::Le,
duplicate_data: true,
..Default::default() };
adapter.set_discovery_filter(filter).await?;
Ok(adapter.discover_devices_with_changes().await?
.filter_map(move |evt| { extract(Arc::clone(&adapter), evt) }))
}