solaredge_exporter/sunspec.py

183 lines
6.8 KiB
Python

from pymodbus.client import ModbusTcpClient
class ConnectionFailed(Exception): pass
class InvalidAddress(Exception): pass
class Registers:
def __init__(self, lower, count=1):
self.count = count
self.lower = lower
self.range = range(count)
def check_address(self, address):
if address not in self.range:
raise InvalidAddress(f"requested {address} for {self.range}")
def scale(self, value, scale):
if scale is None:
return value
return value * self.read_scale(scale)
def read_uint16(self, address, scale=None):
self.check_address(address)
value = self.lower.getRegister(address)
return self.scale(value, scale)
def read_int16(self, address, scale=None):
value = self.read_uint16(address)
if value & 0x8000:
value = value - 0x10000
return self.scale(value, scale)
def read_uint32(self, address, scale=None):
value = self.read_uint16(address) * 65536 + self.read_uint16(address + 1)
return self.scale(value, scale)
def read_bytes(self, address, length):
string = bytearray(length)
for n in range(length//2):
nibble = self.read_uint16(address + n)
string[2*n] = nibble >> 8
string[2*n+1] = nibble & 0xFF
return bytes(string).split(b'\x00', 1)[0]
def read_scale(self, address):
return 10 ** self.read_int16(address)
class Header:
def __init__(self, regs, base):
self.did = regs.read_uint16(0)
self.len = regs.read_uint16(1)
self.base = base
self.end = base + self.len
def __str__(self):
return f"SunSpec(DID={self.did}, [{self.base}:{self.end}]"
class CommonBlock:
def __init__(self, regs):
self.did = regs.read_uint16(0)
self.info_len = regs.read_uint16(1)
self.manufacturer = regs.read_bytes(2, 32)
self.model = regs.read_bytes(18, 32)
self.version = '.'.join(s.lstrip('0') for s in regs.read_bytes(42, 16).decode('ascii').split('.'))
self.serial = regs.read_bytes(50, 32)
self.device = regs.read_uint16(66)
meter_events_bits = { 2: 'Power Failure',
3: 'Under Voltage',
4: 'Low PF',
5: 'Over Current',
6: 'Over Voltage',
7: 'Missing Sensor' }
class Meter:
def __init__(self, regs):
blocks16 = (('ac_current', 0, ('total', 'a', 'b', 'c')),
('ac_voltage', 5, ('average', 'a', 'b', 'c',
'll', 'ab', 'bc', 'ca')),
('ac_real_power', 16, ('total', 'a', 'b', 'c')),
('ac_apparent_power', 21, ('total', 'a', 'b', 'c')),
('ac_reactive_power', 26, ('total', 'a', 'b', 'c')),
('power_factor', 31, ('average', 'a', 'b', 'c')))
quadrants = ['_'.join((k,q,p)) for (k, qs) in (
('import', ('1','2')),
('export', ('3','4'))
)
for q in qs
for p in ['total', 'a', 'b', 'c']]
blocks32 = (('energy_real', 36, ('export', 'ex_a', 'ex_b', 'ex_c',
'import', 'im_a', 'im_b', 'im_c')),
('energy_apparent', 53, ('export', 'ex_a', 'ex_b', 'ex_c',
'import', 'im_a', 'im_b', 'im_c')),
('energy_reactive', 70, quadrants))
for (metric, offset, labels) in blocks16:
data = {}
scale = regs.read_scale(offset + len(labels))
for (i, key) in enumerate(labels):
data[key] = regs.read_int16(offset + i) * scale
self.__dict__[metric] = data
for (metric, offset, labels) in blocks32:
data = {}
scale = regs.read_scale(offset + len(labels)*2)
for (i, key) in enumerate(labels):
value = regs.read_uint32(offset + 2*i) * scale
data[key] = value if value > 1 else None
self.__dict__[metric] = data
self.frequency = regs.read_int16(14, scale=15)
event_bits = regs.read_uint32(103)
self.events = {}
for bit in meter_events_bits:
self.events[meter_events_bits[bit]] = bool(event_bits & (1 << bit))
status_codes = [None, 'off', 'sleep', 'start', 'on', 'throttled', 'stop', 'fault', 'setup']
class InverterBlock:
def __init__(self, regs):
current_scale = regs.read_scale(6)
self.ac_current = current_scale * regs.read_uint16(2)
self.ac_current_a = current_scale * regs.read_uint16(3)
self.ac_current_b = current_scale * regs.read_uint16(4)
self.ac_current_c = current_scale * regs.read_uint16(5)
voltage_scale = regs.read_scale(13)
self.ac_voltage_ab = voltage_scale * regs.read_uint16(7)
self.ac_voltage_bc = voltage_scale * regs.read_uint16(8)
self.ac_voltage_ca = voltage_scale * regs.read_uint16(9)
self.ac_power = regs.read_int16(14, scale=15)
self.frequency = regs.read_int16(16, scale=17)
self.ac_apparent_power = regs.read_int16(18, scale=19)
self.ac_reactive_power = regs.read_int16(20, scale=21)
self.power_factor = regs.read_int16(22, scale=23)
self.dc_current = regs.read_uint16(27, scale=28)
self.dc_voltage = regs.read_uint16(29, scale=30)
self.dc_power = regs.read_int16(31, scale=32)
self.temperature = regs.read_int16(34, scale=37)
self.status_code = regs.read_uint16(38)
self.status = status_codes[self.status_code]
self.status_vendor = regs.read_uint16(39)
ac_energy = regs.read_uint32(24, scale=26)
self.ac_energy = ac_energy if ac_energy > 1 else None
class Inverter:
def __init__(self, host, port=1502, device=1):
self._conn = ModbusTcpClient(host, port)
self._device = 1
def connect(self):
if not self._conn.connect():
raise ConnectionFailed()
def registers(self, addr, length, device=None):
if device is None:
device = self._device
self.connect()
rs = self._conn.read_holding_registers(addr, length, slave=device)
return Registers(rs, length)
def header(self, base):
return Header(self.registers(base, 2), base)
def common_block(self):
return CommonBlock(self.registers(40002, 67))
def data(self):
return InverterBlock(self.registers(40069, 40))
def meter(self, n):
base = 40190 + 174*n
return Meter(self.registers(base, 105))