from pymodbus.client import ModbusTcpClient class ConnectionFailed(Exception): pass class InvalidAddress(Exception): pass class Registers: def __init__(self, lower, count=1): self.count = count self.lower = lower self.range = range(count) def check_address(self, address): if address not in self.range: raise InvalidAddress(f"requested {address} for {self.range}") def scale(self, value, scale): if scale is None: return value return value * self.read_scale(scale) def read_uint16(self, address, scale=None): self.check_address(address) value = self.lower.getRegister(address) return self.scale(value, scale) def read_int16(self, address, scale=None): value = self.read_uint16(address) if value & 0x8000: value = value - 0x10000 return self.scale(value, scale) def read_uint32(self, address, scale=None): value = self.read_uint16(address) * 65536 + self.read_uint16(address + 1) return self.scale(value, scale) def read_bytes(self, address, length): string = bytearray(length) for n in range(length//2): nibble = self.read_uint16(address + n) string[2*n] = nibble >> 8 string[2*n+1] = nibble & 0xFF return bytes(string).split(b'\x00', 1)[0] def read_scale(self, address): return 10 ** self.read_int16(address) class CommonBlock: def __init__(self, regs): self.did = regs.read_uint16(0) self.info_len = regs.read_uint16(1) self.manufacturer = regs.read_bytes(2, 32) self.model = regs.read_bytes(18, 32) self.version = '.'.join(s.lstrip('0') for s in regs.read_bytes(42, 16).decode('ascii').split('.')) self.serial = regs.read_bytes(50, 32) self.device = regs.read_uint16(66) class Meter: def __init__(self, regs): blocks16 = (('ac_current', 0, ('total', 'a', 'b', 'c'), 0.1), ('ac_voltage', 5, ('average', 'a', 'b', 'c', 'll', 'ab', 'bc', 'ca'), 1), ('ac_real_power', 16, ('total', 'a', 'b', 'c'), 0.1), ('ac_apparent_power', 21, ('total', 'a', 'b', 'c'), 0.1), ('ac_reactive_power', 26, ('total', 'a', 'b', 'c'), 0.1), ('power_factor', 31, ('average', 'a', 'b', 'c'), 1)) quadrants = ['_'.join((k,q,p)) for (k, qs) in ( ('import', ('1','2')), ('export', ('3','4')) ) for q in qs for p in ['total', 'a', 'b', 'c']] blocks32 = (('energy_real', 36, ('export', 'ex_a', 'ex_b', 'ex_c', 'import', 'im_a', 'im_b', 'im_c')), ('energy_apparent', 53, ('export', 'ex_a', 'ex_b', 'ex_c', 'import', 'im_a', 'im_b', 'im_c')), ('energy_reactive', 70, quadrants)) for (metric, offset, labels, fix) in blocks16: data = {} scale = regs.read_scale(offset + len(labels)) * fix for (i, key) in enumerate(labels): data[key] = regs.read_int16(offset + i) * scale self.__dict__[metric] = data for (metric, offset, labels) in blocks32: data = {} scale = regs.read_scale(offset + len(labels)*2) for (i, key) in enumerate(labels): data[key] = regs.read_uint32(offset + 2*i) * scale / 10 self.__dict__[metric] = data self.frequency = regs.read_int16(14, scale=15) status_codes = [None, 'off', 'sleep', 'start', 'on', 'throttled', 'stop', 'fault', 'setup'] class InverterBlock: def __init__(self, regs): current_scale = regs.read_scale(6) self.ac_current = current_scale * regs.read_uint16(2) self.ac_current_a = current_scale * regs.read_uint16(3) self.ac_current_b = current_scale * regs.read_uint16(4) self.ac_current_c = current_scale * regs.read_uint16(5) voltage_scale = regs.read_scale(13) self.ac_voltage_ab = voltage_scale * regs.read_uint16(7) self.ac_voltage_bc = voltage_scale * regs.read_uint16(8) self.ac_voltage_ca = voltage_scale * regs.read_uint16(9) self.ac_power = regs.read_int16(14, scale=15) self.frequency = regs.read_int16(16, scale=17) self.ac_apparent_power = regs.read_int16(18, scale=19) self.ac_reactive_power = regs.read_int16(20, scale=21) self.power_factor = regs.read_int16(22, scale=23) self.ac_energy = regs.read_uint32(24, scale=26) self.dc_current = regs.read_uint16(27, scale=28) self.dc_voltage = regs.read_uint16(29, scale=30) self.dc_power = regs.read_int16(31, scale=32) self.temperature = regs.read_int16(34, scale=37) self.status_code = regs.read_uint16(38) self.status = status_codes[self.status_code] self.status_vendor = regs.read_uint16(39) class Inverter: def __init__(self, host, port=1502, device=1): self._conn = ModbusTcpClient(host, port) self._device = 1 def connect(self): if not self._conn.connect(): raise ConnectionFailed() def registers(self, addr, length): self.connect() rs = self._conn.read_holding_registers(addr, length, slave=self._device) return Registers(rs, length) def common_block(self): return CommonBlock(self.registers(40002, 67)) def data(self): return InverterBlock(self.registers(40069, 40)) def meter(self, n): base = 40190 + 174*n return Meter(self.registers(base, 105))