Skip to content

Commit

Permalink
Merge pull request #199 from kbialek/feature/sensor-auto-reset
Browse files Browse the repository at this point in the history
Reset energy sensors daily
  • Loading branch information
kbialek authored Sep 19, 2024
2 parents 8a56a28 + bb261c8 commit 70a37d7
Show file tree
Hide file tree
Showing 6 changed files with 243 additions and 72 deletions.
263 changes: 217 additions & 46 deletions src/deye_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
# under the License.

import math
from abc import abstractmethod
from datetime import datetime
from abc import abstractmethod, abstractproperty


class Sensor:
Expand All @@ -26,13 +27,37 @@ class Sensor:
This is an abstract class. Method 'read_value' must be provided by the extending subclass.
"""

def __init__(self, name: str, mqtt_topic_suffix="", unit="", print_format="{:s}", groups=[]):
self.name = name
self.mqtt_topic_suffix = mqtt_topic_suffix
self.unit = unit
self.print_format = print_format
assert len(groups) > 0, f"Sensor {name} must belong to at least one group"
self.groups = groups
@abstractproperty
def reg_address(self) -> int:
pass

@abstractproperty
def name(self) -> str:
pass

@abstractproperty
def mqtt_topic_suffix(self) -> str:
pass

@abstractproperty
def unit(self) -> str:
pass

@abstractproperty
def print_format(self) -> str:
pass

@abstractproperty
def groups(self) -> [str]:
pass

@abstractproperty
def data_type(self) -> str:
pass

@abstractproperty
def scale_factor(self) -> float:
pass

@abstractmethod
def read_value(self, registers: dict[int, bytearray]):
Expand Down Expand Up @@ -66,7 +91,116 @@ def get_registers(self) -> list[int]:
"""Returns the list of Modbus registers read by this sensor"""


class SingleRegisterSensor(Sensor):
class DailyResetSensor(Sensor):
"""
Wraps other sensor and ensures that the value reported is reset daily.
This is useful to avoid the last value measured yesterday being reported as the first value of today.
Implemented to mitigate microinverter daily energy value "leak".
"""

def __init__(self, delegate: Sensor):
self.__delegate = delegate
self.__last_value: float | None = None
self.__last_value_ts = datetime.now()

@property
def reg_address(self) -> int:
return self.__delegate.reg_address

@property
def name(self) -> str:
return self.__delegate.name

@property
def mqtt_topic_suffix(self) -> str:
return self.__delegate.mqtt_topic_suffix

@property
def unit(self) -> str:
return self.__delegate.unit

@property
def print_format(self) -> str:
return self.__delegate.print_format

@property
def groups(self) -> [str]:
return self.__delegate.groups

@abstractproperty
def data_type(self) -> str:
return self.__delegate.data_type

@abstractproperty
def scale_factor(self) -> float:
return self.__delegate.scale_factor

def read_value(self, registers: dict[int, bytearray]):
now = datetime.now()
value = self.__delegate.read_value(registers)
if (
value is not None
and self.__last_value is not None
and now.day != self.__last_value_ts.day
and value >= self.__last_value
):
return 0
self.__last_value = value
self.__last_value_ts = now
return value

def write_value(self, value: str) -> dict[int, bytearray]:
return self.__delegate.write_value(value)

def format_value(self, value):
return self.__delegate.format_value(value)

def in_any_group(self, active_groups: set[str]) -> bool:
return self.__delegate.in_any_group(active_groups)

def get_registers(self) -> list[int]:
return self.__delegate.get_registers()


class AbstractSensor(Sensor):
def __init__(self, name: str, mqtt_topic_suffix="", unit="", print_format="{:s}", groups=[]):
self.__name = name
self.__mqtt_topic_suffix = mqtt_topic_suffix
self.__unit = unit
self.__print_format = print_format
assert len(groups) > 0, f"Sensor {name} must belong to at least one group"
self.__groups = groups

@property
def name(self) -> str:
return self.__name

@property
def mqtt_topic_suffix(self) -> str:
return self.__mqtt_topic_suffix

@property
def unit(self) -> str:
return self.__unit

@property
def print_format(self) -> str:
return self.__print_format

@property
def groups(self) -> [str]:
return self.__groups

@property
def data_type(self) -> str:
return "n/a"

@property
def scale_factor(self) -> float:
return 1


class SingleRegisterSensor(AbstractSensor):
"""
Solar inverter sensor with value stored as 32-bit integer in a single Modbus register.
"""
Expand All @@ -84,28 +218,42 @@ def __init__(
groups=[],
):
super().__init__(name, mqtt_topic_suffix, unit, print_format, groups)
self.reg_address = reg_address
self.factor = factor
self.offset = offset
self.signed = signed
self.__reg_address = reg_address
self.__factor = factor
self.__offset = offset
self.__signed = signed

def read_value(self, registers: dict[int, bytearray]):
if self.reg_address in registers:
reg_value = registers[self.reg_address]
return int.from_bytes(reg_value, "big", signed=self.signed) * self.factor + self.offset
if self.__reg_address in registers:
reg_value = registers[self.__reg_address]
return int.from_bytes(reg_value, "big", signed=self.__signed) * self.__factor + self.__offset
else:
return None

def write_value(self, value: str) -> dict[int, bytearray]:
v = int((float(value) - self.offset) / self.factor)
return {self.reg_address: v.to_bytes(2, "big", signed=self.signed)}
v = int((float(value) - self.__offset) / self.__factor)
return {self.__reg_address: v.to_bytes(2, "big", signed=self.__signed)}

@abstractmethod
def get_registers(self) -> list[int]:
return [self.reg_address]
return [self.__reg_address]

def reset_daily(self) -> DailyResetSensor:
return DailyResetSensor(self)

@property
def data_type(self) -> str:
return "S_WORD" if self.__signed else "U_WORD"

@property
def scale_factor(self) -> float:
return self.__factor

@property
def offset(self) -> float:
return self.__offset


class DoubleRegisterSensor(Sensor):
class DoubleRegisterSensor(AbstractSensor):
"""
Solar inverter sensor with value stored as 64-bit integer in two Modbus registers.
"""
Expand All @@ -124,25 +272,44 @@ def __init__(
low_word_first=True,
):
super().__init__(name, mqtt_topic_suffix, unit, print_format, groups)
self.reg_address = reg_address
self.factor = factor
self.offset = offset
self.signed = signed
self.low_word_first = low_word_first
self.__reg_address = reg_address
self.__factor = factor
self.__offset = offset
self.__signed = signed
self.__low_word_first = low_word_first

def read_value(self, registers: dict[int, bytearray]):
low_word_reg_address = self.reg_address + (0 if self.low_word_first else 1)
high_word_reg_address = self.reg_address + (1 if self.low_word_first else 0)
low_word_reg_address = self.__reg_address + (0 if self.__low_word_first else 1)
high_word_reg_address = self.__reg_address + (1 if self.__low_word_first else 0)
if low_word_reg_address in registers and high_word_reg_address in registers:
low_word = registers[low_word_reg_address]
high_word = registers[high_word_reg_address]
return int.from_bytes(high_word + low_word, "big", signed=self.signed) * self.factor + self.offset
return int.from_bytes(high_word + low_word, "big", signed=self.__signed) * self.__factor + self.__offset
else:
return None

@abstractmethod
def get_registers(self) -> list[int]:
return [self.reg_address, self.reg_address + 1]
return [self.__reg_address, self.__reg_address + 1]

def reset_daily(self) -> DailyResetSensor:
return DailyResetSensor(self)

@property
def data_type(self) -> str:
data_type = "S_DWORD" if self.__signed else "U_DWORD"
return data_type + (" (LW,HW)" if self.__low_word_first else " (HW,LW)")

@property
def scale_factor(self) -> float:
return self.__factor

@property
def low_word_first(self) -> bool:
return self.__low_word_first

@property
def offset(self) -> float:
return self.__offset


class SignedMagnitudeSingleRegisterSensor(SingleRegisterSensor):
Expand All @@ -152,16 +319,19 @@ class SignedMagnitudeSingleRegisterSensor(SingleRegisterSensor):
"""

def read_value(self, registers: dict[int, bytearray]):
if self.reg_address in registers:
reg_value = int.from_bytes(registers[self.reg_address], "big", signed=False) & 0x7FFF
reg_address = self.get_registers()[0]
if reg_address in registers:
reg_value = int.from_bytes(registers[reg_address], "big", signed=False) & 0x7FFF
result = reg_value * self.scale_factor + self.offset
# If highest bit is set, we've got a negative value
if bool(registers[self.reg_address][0] & 0x80):
return -1 * reg_value * self.factor + self.offset
else:
return reg_value * self.factor + self.offset
return -result if bool(registers[reg_address][0] & 0x80) else result
else:
return None

@property
def data_type(self) -> str:
return "SM_WORD"


class SignedMagnitudeDoubleRegisterSensor(DoubleRegisterSensor):
"""
Expand All @@ -170,22 +340,23 @@ class SignedMagnitudeDoubleRegisterSensor(DoubleRegisterSensor):
"""

def read_value(self, registers: dict[int, bytearray]):
high_word_reg_address = self.reg_address
low_word_reg_address = self.reg_address + 1
high_word_reg_address, low_word_reg_address = self.get_registers()
if low_word_reg_address in registers and high_word_reg_address in registers:
low_word = registers[low_word_reg_address]
high_word = registers[high_word_reg_address]
reg_value = int.from_bytes(high_word + low_word, "big", signed=False) & 0x7FFFFFFF
result = reg_value * self.scale_factor + self.offset
# If highest bit is set, we've got a negative value
if bool(registers[self.reg_address][0] & 0x80):
return -1 * reg_value * self.factor + self.offset
else:
return reg_value * self.factor + self.offset
return -result if bool(high_word[0] & 0x80) else result
else:
return None

@property
def data_type(self) -> str:
return "SM_DWORD " + ("(LW,HW)" if self.low_word_first else "(HW,LW)")


class ComputedPowerSensor(Sensor):
class ComputedPowerSensor(AbstractSensor):
"""
Electric Power sensor with value computed as multiplication of values read by voltage and current sensors.
"""
Expand Down Expand Up @@ -216,7 +387,7 @@ def get_registers(self) -> list[int]:
return []


class ComputedSumSensor(Sensor):
class ComputedSumSensor(AbstractSensor):
"""
Computes a sum of values read by given list of sensors.
"""
Expand All @@ -240,7 +411,7 @@ def get_registers(self) -> list[int]:
return []


class AggregatedValueSensor(Sensor):
class AggregatedValueSensor(AbstractSensor):
"""
Represents value computed as an aggregation in multi-inverter installation
"""
Expand Down
Loading

0 comments on commit 70a37d7

Please sign in to comment.