solaredge_exporter/prom.py

58 lines
1.8 KiB
Python
Executable File

#!/usr/bin/python3
import bottle
import datetime
import logging
from collections import defaultdict
from sys import stderr
def format_metrics(metric_data, entries, timestamps=False):
""" metric_data = { 'metric_name' : ('gauge'|'counter', 'description')
entries = [ (datetime, 'metric_name', { 'attr': 'key' }, value) ]
"""
collected = defaultdict(list)
for entry in entries:
collected[entry[1]].append(entry)
for metric in collected:
md = metric_data.get(metric)
if md is not None:
yield "# TYPE {} {}\n".format(metric, md[0])
yield "# HELP {} {}\n".format(metric, md[1])
for (time, metric, attrs, value) in collected[metric]:
time_s = int(time.timestamp() * 1000)
attr_s = ""
if attrs:
attr_s = "{" + ','.join('{}="{}"'.format(k, attrs[k]) for k in attrs ) + "}"
if timestamps:
yield f"{metric} {attr_s} {value} {time_s}\n"
else:
yield f"{metric} {attr_s} {value}\n"
def run(collect, metric_data, description='Mini Prometheus Exporter',
host='localhost', port=9150, timestamps=False, debug=False):
@bottle.route('/')
def root():
return description
@bottle.route('/metrics')
def metrics():
return format_metrics(metric_data, collect(), timestamps)
bottle.run(host=host, port=port, debug=debug)
if __name__ == "__main__":
def collect_dummy():
now = datetime.datetime.now()
yield (now, 'pi', {'one': '1', 'two': '2' }, 3.14)
yield (now, 'e', {'three': '3'}, 2.72)
metric_data = { 'pi': ['gauge', 'Pi value'],
'e': ['counter', 'E counter'] }
run(collect_dummy, metric_data, port=9159)