diff --git a/src/deye_sensor.py b/src/deye_sensor.py index e18e319..f03ae1e 100644 --- a/src/deye_sensor.py +++ b/src/deye_sensor.py @@ -16,7 +16,8 @@ # under the License. import math -from abc import abstractmethod +from datetime import datetime +from abc import abstractmethod, abstractproperty class Sensor: @@ -26,13 +27,37 @@ class Sensor: This is an abstract class. Method 'read_value' must be provided by the extending subclass. """ - def __init__(self, name: str, mqtt_topic_suffix="", unit="", print_format="{:s}", groups=[]): - self.name = name - self.mqtt_topic_suffix = mqtt_topic_suffix - self.unit = unit - self.print_format = print_format - assert len(groups) > 0, f"Sensor {name} must belong to at least one group" - self.groups = groups + @abstractproperty + def reg_address(self) -> int: + pass + + @abstractproperty + def name(self) -> str: + pass + + @abstractproperty + def mqtt_topic_suffix(self) -> str: + pass + + @abstractproperty + def unit(self) -> str: + pass + + @abstractproperty + def print_format(self) -> str: + pass + + @abstractproperty + def groups(self) -> [str]: + pass + + @abstractproperty + def data_type(self) -> str: + pass + + @abstractproperty + def scale_factor(self) -> float: + pass @abstractmethod def read_value(self, registers: dict[int, bytearray]): @@ -66,7 +91,116 @@ def get_registers(self) -> list[int]: """Returns the list of Modbus registers read by this sensor""" -class SingleRegisterSensor(Sensor): +class DailyResetSensor(Sensor): + """ + Wraps other sensor and ensures that the value reported is reset daily. + This is useful to avoid the last value measured yesterday being reported as the first value of today. + Implemented to mitigate microinverter daily energy value "leak". + """ + + def __init__(self, delegate: Sensor): + self.__delegate = delegate + self.__last_value: float | None = None + self.__last_value_ts = datetime.now() + + @property + def reg_address(self) -> int: + return self.__delegate.reg_address + + @property + def name(self) -> str: + return self.__delegate.name + + @property + def mqtt_topic_suffix(self) -> str: + return self.__delegate.mqtt_topic_suffix + + @property + def unit(self) -> str: + return self.__delegate.unit + + @property + def print_format(self) -> str: + return self.__delegate.print_format + + @property + def groups(self) -> [str]: + return self.__delegate.groups + + @abstractproperty + def data_type(self) -> str: + return self.__delegate.data_type + + @abstractproperty + def scale_factor(self) -> float: + return self.__delegate.scale_factor + + def read_value(self, registers: dict[int, bytearray]): + now = datetime.now() + value = self.__delegate.read_value(registers) + if ( + value is not None + and self.__last_value is not None + and now.day != self.__last_value_ts.day + and value >= self.__last_value + ): + return 0 + self.__last_value = value + self.__last_value_ts = now + return value + + def write_value(self, value: str) -> dict[int, bytearray]: + return self.__delegate.write_value(value) + + def format_value(self, value): + return self.__delegate.format_value(value) + + def in_any_group(self, active_groups: set[str]) -> bool: + return self.__delegate.in_any_group(active_groups) + + def get_registers(self) -> list[int]: + return self.__delegate.get_registers() + + +class AbstractSensor(Sensor): + def __init__(self, name: str, mqtt_topic_suffix="", unit="", print_format="{:s}", groups=[]): + self.__name = name + self.__mqtt_topic_suffix = mqtt_topic_suffix + self.__unit = unit + self.__print_format = print_format + assert len(groups) > 0, f"Sensor {name} must belong to at least one group" + self.__groups = groups + + @property + def name(self) -> str: + return self.__name + + @property + def mqtt_topic_suffix(self) -> str: + return self.__mqtt_topic_suffix + + @property + def unit(self) -> str: + return self.__unit + + @property + def print_format(self) -> str: + return self.__print_format + + @property + def groups(self) -> [str]: + return self.__groups + + @property + def data_type(self) -> str: + return "n/a" + + @property + def scale_factor(self) -> float: + return 1 + + +class SingleRegisterSensor(AbstractSensor): """ Solar inverter sensor with value stored as 32-bit integer in a single Modbus register. """ @@ -84,28 +218,42 @@ def __init__( groups=[], ): super().__init__(name, mqtt_topic_suffix, unit, print_format, groups) - self.reg_address = reg_address - self.factor = factor - self.offset = offset - self.signed = signed + self.__reg_address = reg_address + self.__factor = factor + self.__offset = offset + self.__signed = signed def read_value(self, registers: dict[int, bytearray]): - if self.reg_address in registers: - reg_value = registers[self.reg_address] - return int.from_bytes(reg_value, "big", signed=self.signed) * self.factor + self.offset + if self.__reg_address in registers: + reg_value = registers[self.__reg_address] + return int.from_bytes(reg_value, "big", signed=self.__signed) * self.__factor + self.__offset else: return None def write_value(self, value: str) -> dict[int, bytearray]: - v = int((float(value) - self.offset) / self.factor) - return {self.reg_address: v.to_bytes(2, "big", signed=self.signed)} + v = int((float(value) - self.__offset) / self.__factor) + return {self.__reg_address: v.to_bytes(2, "big", signed=self.__signed)} - @abstractmethod def get_registers(self) -> list[int]: - return [self.reg_address] + return [self.__reg_address] + + def reset_daily(self) -> DailyResetSensor: + return DailyResetSensor(self) + + @property + def data_type(self) -> str: + return "S_WORD" if self.__signed else "U_WORD" + + @property + def scale_factor(self) -> float: + return self.__factor + + @property + def offset(self) -> float: + return self.__offset -class DoubleRegisterSensor(Sensor): +class DoubleRegisterSensor(AbstractSensor): """ Solar inverter sensor with value stored as 64-bit integer in two Modbus registers. """ @@ -124,25 +272,44 @@ def __init__( low_word_first=True, ): super().__init__(name, mqtt_topic_suffix, unit, print_format, groups) - self.reg_address = reg_address - self.factor = factor - self.offset = offset - self.signed = signed - self.low_word_first = low_word_first + self.__reg_address = reg_address + self.__factor = factor + self.__offset = offset + self.__signed = signed + self.__low_word_first = low_word_first def read_value(self, registers: dict[int, bytearray]): - low_word_reg_address = self.reg_address + (0 if self.low_word_first else 1) - high_word_reg_address = self.reg_address + (1 if self.low_word_first else 0) + low_word_reg_address = self.__reg_address + (0 if self.__low_word_first else 1) + high_word_reg_address = self.__reg_address + (1 if self.__low_word_first else 0) if low_word_reg_address in registers and high_word_reg_address in registers: low_word = registers[low_word_reg_address] high_word = registers[high_word_reg_address] - return int.from_bytes(high_word + low_word, "big", signed=self.signed) * self.factor + self.offset + return int.from_bytes(high_word + low_word, "big", signed=self.__signed) * self.__factor + self.__offset else: return None - @abstractmethod def get_registers(self) -> list[int]: - return [self.reg_address, self.reg_address + 1] + return [self.__reg_address, self.__reg_address + 1] + + def reset_daily(self) -> DailyResetSensor: + return DailyResetSensor(self) + + @property + def data_type(self) -> str: + data_type = "S_DWORD" if self.__signed else "U_DWORD" + return data_type + (" (LW,HW)" if self.__low_word_first else " (HW,LW)") + + @property + def scale_factor(self) -> float: + return self.__factor + + @property + def low_word_first(self) -> bool: + return self.__low_word_first + + @property + def offset(self) -> float: + return self.__offset class SignedMagnitudeSingleRegisterSensor(SingleRegisterSensor): @@ -152,16 +319,19 @@ class SignedMagnitudeSingleRegisterSensor(SingleRegisterSensor): """ def read_value(self, registers: dict[int, bytearray]): - if self.reg_address in registers: - reg_value = int.from_bytes(registers[self.reg_address], "big", signed=False) & 0x7FFF + reg_address = self.get_registers()[0] + if reg_address in registers: + reg_value = int.from_bytes(registers[reg_address], "big", signed=False) & 0x7FFF + result = reg_value * self.scale_factor + self.offset # If highest bit is set, we've got a negative value - if bool(registers[self.reg_address][0] & 0x80): - return -1 * reg_value * self.factor + self.offset - else: - return reg_value * self.factor + self.offset + return -result if bool(registers[reg_address][0] & 0x80) else result else: return None + @property + def data_type(self) -> str: + return "SM_WORD" + class SignedMagnitudeDoubleRegisterSensor(DoubleRegisterSensor): """ @@ -170,22 +340,23 @@ class SignedMagnitudeDoubleRegisterSensor(DoubleRegisterSensor): """ def read_value(self, registers: dict[int, bytearray]): - high_word_reg_address = self.reg_address - low_word_reg_address = self.reg_address + 1 + high_word_reg_address, low_word_reg_address = self.get_registers() if low_word_reg_address in registers and high_word_reg_address in registers: low_word = registers[low_word_reg_address] high_word = registers[high_word_reg_address] reg_value = int.from_bytes(high_word + low_word, "big", signed=False) & 0x7FFFFFFF + result = reg_value * self.scale_factor + self.offset # If highest bit is set, we've got a negative value - if bool(registers[self.reg_address][0] & 0x80): - return -1 * reg_value * self.factor + self.offset - else: - return reg_value * self.factor + self.offset + return -result if bool(high_word[0] & 0x80) else result else: return None + @property + def data_type(self) -> str: + return "SM_DWORD " + ("(LW,HW)" if self.low_word_first else "(HW,LW)") + -class ComputedPowerSensor(Sensor): +class ComputedPowerSensor(AbstractSensor): """ Electric Power sensor with value computed as multiplication of values read by voltage and current sensors. """ @@ -216,7 +387,7 @@ def get_registers(self) -> list[int]: return [] -class ComputedSumSensor(Sensor): +class ComputedSumSensor(AbstractSensor): """ Computes a sum of values read by given list of sensors. """ @@ -240,7 +411,7 @@ def get_registers(self) -> list[int]: return [] -class AggregatedValueSensor(Sensor): +class AggregatedValueSensor(AbstractSensor): """ Represents value computed as an aggregation in multi-inverter installation """ diff --git a/src/deye_sensors.py b/src/deye_sensors.py index 8ce6b51..7d6eb09 100644 --- a/src/deye_sensors.py +++ b/src/deye_sensors.py @@ -86,7 +86,7 @@ # Production today production_today_sensor = SingleRegisterSensor( "Production today", 0x3C, 0.1, mqtt_topic_suffix="day_energy", unit="kWh", groups=["string", "micro"] -) +).reset_daily() uptime_sensor = SingleRegisterSensor( "Uptime", 0x3E, 1, mqtt_topic_suffix="uptime", unit="minutes", groups=["string", "micro"] ) @@ -108,7 +108,7 @@ ) pv1_daily_sensor = SingleRegisterSensor( "PV1 Production today", 0x41, 0.1, mqtt_topic_suffix="dc/pv1/day_energy", unit="kWh", groups=["micro"] -) +).reset_daily() pv1_total_sensor = DoubleRegisterSensor( "PV1 Total", 0x45, 0.1, mqtt_topic_suffix="dc/pv1/total_energy", unit="kWh", groups=["micro"] ) @@ -130,7 +130,7 @@ ) pv2_daily_sensor = SingleRegisterSensor( "PV2 Production today", 0x42, 0.1, mqtt_topic_suffix="dc/pv2/day_energy", unit="kWh", groups=["micro"] -) +).reset_daily() pv2_total_sensor = DoubleRegisterSensor( "PV2 Total", 0x47, 0.1, mqtt_topic_suffix="dc/pv2/total_energy", unit="kWh", groups=["micro"] ) @@ -152,7 +152,7 @@ ) pv3_daily_sensor = SingleRegisterSensor( "PV3 Production today", 0x43, 0.1, mqtt_topic_suffix="dc/pv3/day_energy", unit="kWh", groups=["micro"] -) +).reset_daily() pv3_total_sensor = DoubleRegisterSensor( "PV3 Total", 0x4A, 0.1, mqtt_topic_suffix="dc/pv3/total_energy", unit="kWh", groups=["micro"] ) @@ -174,7 +174,7 @@ ) pv4_daily_sensor = SingleRegisterSensor( "PV4 Production today", 0x44, 0.1, mqtt_topic_suffix="dc/pv4/day_energy", unit="kWh", groups=["micro"] -) +).reset_daily() pv4_total_sensor = DoubleRegisterSensor( "PV4 Total", 0x4D, 0.1, mqtt_topic_suffix="dc/pv4/total_energy", unit="kWh", groups=["micro"] ) diff --git a/tests/deye_events_test.py b/tests/deye_events_test.py index f65fa0f..8e298b2 100644 --- a/tests/deye_events_test.py +++ b/tests/deye_events_test.py @@ -20,10 +20,10 @@ from deye_events import DeyeEventList, DeyeLoggerStatusEvent, DeyeObservationEvent from deye_observation import Observation -from deye_sensor import Sensor +from deye_sensor import AbstractSensor -class FakeSensor(Sensor): +class FakeSensor(AbstractSensor): def __init__(self, name: str, value: float): super().__init__(name, groups=["float"], print_format="{:0.1f}") self.value = value diff --git a/tests/deye_inverter_state_test.py b/tests/deye_inverter_state_test.py index d75f7d5..9b0b890 100644 --- a/tests/deye_inverter_state_test.py +++ b/tests/deye_inverter_state_test.py @@ -21,10 +21,10 @@ from deye_inverter_state import DeyeInverterState from deye_events import DeyeEventList, DeyeLoggerStatusEvent, DeyeObservationEvent, Observation -from deye_sensor import Sensor, SensorRegisterRanges +from deye_sensor import AbstractSensor, SensorRegisterRanges -class FakeSensor(Sensor): +class FakeSensor(AbstractSensor): def __init__(self, name: str, value: float): super().__init__(name, groups=["float"], print_format="{:0.1f}") self.value = value diff --git a/tests/deye_sensor_test.py b/tests/deye_sensor_test.py index 138438c..035e204 100644 --- a/tests/deye_sensor_test.py +++ b/tests/deye_sensor_test.py @@ -18,6 +18,7 @@ import unittest from deye_sensor import ( Sensor, + AbstractSensor, ComputedSumSensor, DoubleRegisterSensor, SingleRegisterSensor, @@ -28,7 +29,7 @@ ) -class FakeSensor(Sensor): +class FakeSensor(AbstractSensor): def __init__(self, name, value): super().__init__(name, groups=["string"]) self.value = value @@ -277,6 +278,19 @@ def test_registry_range_multiple_groups_names(self): self.assertTrue(sut.in_any_group({"b"})) self.assertFalse(sut.in_any_group({"c"})) + def test_reset_sensor_passthrough(self): + # given + sut = SingleRegisterSensor("test", 0x00, 1, signed=False, groups=["string"]).reset_daily() + + # and + registers = {0: bytearray.fromhex("0102")} + + # when + result = sut.read_value(registers) + + # then + self.assertEqual(result, 0x0102) + if __name__ == "__main__": unittest.main() diff --git a/tools/metric_group_doc_gen.py b/tools/metric_group_doc_gen.py index f429a1d..47faa57 100644 --- a/tools/metric_group_doc_gen.py +++ b/tools/metric_group_doc_gen.py @@ -9,22 +9,8 @@ def render_table(sensors: list[Sensor]): print('|Metric|MQTT topic suffix|Unit|Modbus address (dec)|Modbus address (hex)|Modbus data type|Scale factor|') print('|---|---|:-:|:-:|:-:|:-:|:-:|') for s in sensors: - data_type = 'n/a' - scale_factor = '1' - if isinstance(s, SignedMagnitudeSingleRegisterSensor): - data_type = 'SM_WORD' - scale_factor = s.factor - elif isinstance(s, SingleRegisterSensor): - data_type = 'S_WORD' if s.signed else 'U_WORD' - scale_factor = s.factor - elif isinstance(s, SignedMagnitudeDoubleRegisterSensor): - data_type = 'SM_DWORD' - data_type += ' (LW,HW)' if s.low_word_first else ' (HW,LW)' - scale_factor = s.factor - elif isinstance(s, DoubleRegisterSensor): - data_type = 'S_DWORD' if s.signed else 'U_DWORD' - data_type += ' (LW,HW)' if s.low_word_first else ' (HW,LW)' - scale_factor = s.factor + data_type = s.data_type + scale_factor = s.scale_factor regs_dec = ','.join(['{:d}'.format(r) for r in s.get_registers()]) regs_hex = ','.join(['{:x}'.format(r) for r in s.get_registers()])