Skip to content

Commit

Permalink
Merge pull request #157 from kbialek/feature/aggregate-multi-inverter…
Browse files Browse the repository at this point in the history
…-data

Publish data aggregated from multiple inverters
  • Loading branch information
kbialek authored Mar 25, 2024
2 parents e536322 + a534cbc commit 0b42403
Show file tree
Hide file tree
Showing 12 changed files with 306 additions and 9 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ METRIC_GROUPS = \
deye_sg01hp3 \
deye_sg01hp3_battery \
deye_sg01hp3_ups \
deye_sg01hp3_bms
deye_sg01hp3_bms \
aggregated
GENERATE_DOCS_TARGETS = $(addprefix generate-docs-, $(METRIC_GROUPS))
$(GENERATE_DOCS_TARGETS): generate-docs-%:
@mkdir -p docs
Expand Down
4 changes: 4 additions & 0 deletions docs/metric_group_aggregated.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
|Metric|Modbus address|MQTT topic suffix|Unit|
|---|:-:|---|:-:|
|Aggregated daily energy|computed|`day_energy`|kWh|
|Aggregated AC active power|computed|`ac/active_power`|W|
6 changes: 6 additions & 0 deletions src/deye_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ def __init__(
def logger_adapter(self, logger: logging.Logger):
return ParameterizedLogger(logger, self.index)

@staticmethod
def for_aggregator():
return DeyeLoggerConfig(0, "127.0.0.1", 0, index=0)

@staticmethod
def from_env():
return DeyeLoggerConfig(
Expand Down Expand Up @@ -286,4 +290,6 @@ def __read_active_processors() -> [str]:
active_processors.append("time_of_use")
if DeyeEnv.boolean("DEYE_FEATURE_ACTIVE_POWER_REGULATION", False):
active_processors.append("active_power_regulation")
if DeyeEnv.boolean("DEYE_FEATURE_MULTI_INVERTER_DATA_AGGREGATOR", False):
active_processors.append("multi_inverter_data_aggregator")
return active_processors
28 changes: 25 additions & 3 deletions src/deye_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from deye_sensors import sensor_list, sensor_register_ranges
from deye_processor_factory import DeyeProcessorFactory
from deye_inverter_state import DeyeInverterState
from deye_events import DeyeObservationEvent, DeyeEventList


class IntervalRunner:
Expand All @@ -40,22 +41,25 @@ def __init__(self, logger_config: DeyeLoggerConfig, interval: int, action):
self.__thread = threading.Thread(target=self.__handler)

def __handler(self):
self.__log.debug("Start to execute the daemon at intervals of %s seconds", self.__interval)
nextTime = time.time() + random.randint(0, self.__interval - 1)
while not self.__stopEvent.wait(nextTime - time.time()):
nextTime = time.time() + self.__interval
self.__log.debug("Invoking action")
self.__invoke_action()
self.__log.debug("Invocation loop stopped")

def __invoke_action(self):
try:
self.__action()
except Exception:
self.__log.exception("Unexpected error during daemon execution")
self.__log.exception("Unexpected error during runner execution")

def start(self):
self.__log.debug("Starting executing the runner at intervals of %s seconds", self.__interval)
self.__thread.start()

def stop(self):
self.__log.debug("Stopping the runner")
self.__stopEvent.set()


Expand All @@ -70,9 +74,13 @@ def __init__(self, config: DeyeConfig):

self.__mqtt_client = DeyeMqttClient(self.__config)
self.__processor_factory = DeyeProcessorFactory(self.__config, self.__mqtt_client)
self.__multi_inverter_data_aggregator = self.__processor_factory.create_multi_inverter_data_aggregator()
self.__interval_runners = [
self.__create_interval_runner_for_logger(logger_config) for logger_config in config.logger_configs
]
if len(self.__config.logger_configs) > 1:
self.__aggregating_processors = self.__processor_factory.create_aggregating_processors(self.__config.logger)
self.__interval_runners += [self.__create_interval_runner_for_aggregators()]

def __create_interval_runner_for_logger(self, logger_config: DeyeLoggerConfig) -> IntervalRunner:
modbus = DeyeModbus(DeyeConnectorFactory().create_connector(logger_config))
Expand All @@ -83,10 +91,24 @@ def __create_interval_runner_for_logger(self, logger_config: DeyeLoggerConfig) -
max_range_length=logger_config.max_register_range_length,
)

processors = self.__processor_factory.create_processors(logger_config, modbus, sensors)
processors = self.__processor_factory.create_processors(logger_config, modbus, sensors) + [
self.__multi_inverter_data_aggregator
]
inverter_state = DeyeInverterState(self.__config, logger_config, reg_ranges, modbus, sensors, processors)
return IntervalRunner(logger_config, self.__config.data_read_inverval, inverter_state.read_from_logger)

def __create_interval_runner_for_aggregators(self) -> IntervalRunner:
return IntervalRunner(
DeyeLoggerConfig.for_aggregator(), self.__config.data_read_inverval, self.__run_aggregating_processors
)

def __run_aggregating_processors(self) -> None:
self.__log.debug("Running aggregating processors")
observations = self.__multi_inverter_data_aggregator.aggregate()
events = DeyeEventList([DeyeObservationEvent(observation) for observation in observations], logger_index=0)
for processor in self.__aggregating_processors:
processor.process(events)

def start(self):
for interval_runner in self.__interval_runners:
interval_runner.start()
Expand Down
3 changes: 3 additions & 0 deletions src/deye_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,4 +158,7 @@ def get_description(self) -> str:

@abstractmethod
def process(self, events: DeyeEventList):
"""
Processes events representing changes of metric values
"""
pass
86 changes: 86 additions & 0 deletions src/deye_multi_inverter_data_aggregator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import logging
import deye_sensors_aggregated

from datetime import datetime
from deye_events import DeyeEventProcessor, DeyeEventList, DeyeObservationEvent
from deye_observation import Observation


class DeyeMultiInverterDataAggregator(DeyeEventProcessor):
def __init__(self):
self.__log = logging.getLogger(DeyeMultiInverterDataAggregator.__name__)
self.__ac_active_power = dict[int, float]()
self.__day_energy = dict[int, float]()
self.__last_aggregation_ts = datetime.now()

def get_id(self) -> str:
return "multi_inverter_data_aggregator"

def get_description(self) -> str:
return "Aggregate metrics from multiple inverters"

def process(self, events: DeyeEventList) -> None:
self.__update_ac_active_power_value(events)
self.__update_day_energy_value(events)

def aggregate(self) -> list[Observation]:
now = datetime.now()
if now.day != self.__last_aggregation_ts.day:
self.__reset_state()
aggregated_observations = [
Observation(
deye_sensors_aggregated.aggregated_ac_active_power_sensor,
now,
sum(self.__ac_active_power.values()),
),
Observation(
deye_sensors_aggregated.aggregated_day_energy_sensor,
now,
sum(self.__day_energy.values()),
),
]
self.__last_aggregation_ts = now
self.__log.debug("Aggregated observations: %s", aggregated_observations)
return aggregated_observations

def __get_metric(self, topic_suffix: str, events: DeyeEventList) -> float | None:
for event in events:
if not isinstance(event, DeyeObservationEvent):
continue
observation: Observation = event.observation
if observation.sensor.mqtt_topic_suffix == topic_suffix:
return observation.value
return None

def __update_ac_active_power_value(self, events: DeyeEventList) -> None:
new_value = self.__get_metric(
deye_sensors_aggregated.aggregated_ac_active_power_sensor.mqtt_topic_suffix, events
)
if new_value is not None:
self.__ac_active_power[events.logger_index] = new_value

def __update_day_energy_value(self, events: DeyeEventList) -> None:
new_value = self.__get_metric(deye_sensors_aggregated.aggregated_day_energy_sensor.mqtt_topic_suffix, events)
if new_value is not None:
self.__day_energy[events.logger_index] = new_value

def __reset_state(self):
self.__ac_active_power.clear()
self.__day_energy.clear()
3 changes: 3 additions & 0 deletions src/deye_observation.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ def __init__(self, sensor: Sensor, timestamp: datetime, value):

def value_as_str(self):
return self.sensor.format_value(self.value)

def __repr__(self) -> str:
return f"{self.sensor.mqtt_topic_suffix}@{self.timestamp}:{self.value}"
24 changes: 21 additions & 3 deletions src/deye_processor_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,15 @@
from deye_active_power_regulation import DeyeActivePowerRegulationEventProcessor
from deye_sensor import Sensor
from deye_plugin_loader import DeyePluginContext, DeyePluginLoader
from deye_multi_inverter_data_aggregator import DeyeMultiInverterDataAggregator


class DeyeProcessorFactory:
def __init__(self, config: DeyeConfig, mqtt_client: DeyeMqttClient):
self.__log = logging.getLogger(DeyeProcessorFactory.__name__)
self.__config = config
self.__mqtt_client = mqtt_client
self.__first_run = True
plugin_context = DeyePluginContext(config, mqtt_client)
self.plugin_loader = DeyePluginLoader(config)
self.plugin_loader.load_plugins(plugin_context)
Expand All @@ -46,6 +48,7 @@ def create_processors(
)
for p in processors:
p.initialize()
self.__first_run = False
return processors

def __create_builtin_processors(
Expand All @@ -60,10 +63,25 @@ def __create_builtin_processors(
)
return processors

def create_multi_inverter_data_aggregator(self) -> DeyeMultiInverterDataAggregator:
return DeyeMultiInverterDataAggregator()

def create_aggregating_processors(self, logger_config: DeyeLoggerConfig) -> list[DeyeEventProcessor]:
processors = self.__create_builtin_aggregating_processors(logger_config)
for p in processors:
p.initialize()
return processors

def __create_builtin_aggregating_processors(self, logger_config: DeyeLoggerConfig) -> list[DeyeEventProcessor]:
processors = []
self.__append_processor(processors, DeyeMqttPublisher(self.__mqtt_client))
return processors

def __append_processor(self, processors: list[DeyeEventProcessor], processor: DeyeEventProcessor):
is_processor_active = processor.get_id() in self.__config.active_processors
self.__log.info(
'Feature "{}": {}'.format(processor.get_description(), "enabled" if is_processor_active else "disabled")
)
if self.__first_run:
self.__log.info(
'Feature "{}": {}'.format(processor.get_description(), "enabled" if is_processor_active else "disabled")
)
if is_processor_active:
processors.append(processor)
20 changes: 18 additions & 2 deletions src/deye_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ def read_value(self, registers: dict[int, bytearray]):
else:
return None

@abstractmethod
def get_registers(self) -> list[int]:
return []

Expand All @@ -235,7 +234,24 @@ def read_value(self, registers: dict[int, bytearray]):
result += value
return result

@abstractmethod
def get_registers(self) -> list[int]:
return []


class AggregatedValueSensor(Sensor):
"""
Represents value computed as an aggregation in multi-inverter installation
"""

def __init__(self, name: str, mqtt_topic_suffix="", unit="", print_format="{:0.1f}", groups=[]):
super().__init__(name, mqtt_topic_suffix, unit, print_format, groups)

def read_value(self, registers: dict[int, bytearray]):
raise RuntimeError("Cannot read registers of aggregated sensor")

def write_value(self, value: str) -> dict[int, bytearray]:
raise RuntimeError("Cannot write registers of aggregated sensor")

def get_registers(self) -> list[int]:
return []

Expand Down
2 changes: 2 additions & 0 deletions src/deye_sensors.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from deye_sensors_igen_dtsd422 import igen_dtsd422_sensors, igen_dtsd422_register_ranges
from deye_sensors_deye_hybrid import deye_hybrid_sensors, deye_hybrid_register_ranges
from deye_sensors_settings import deye_settings_sensors, deye_settings_register_ranges
from deye_sensors_aggregated import aggregated_sensor_list

# AC Phase 1
phase1_voltage_sensor = SingleRegisterSensor(
Expand Down Expand Up @@ -266,6 +267,7 @@
+ deye_hybrid_sensors
+ deye_settings_sensors
+ deye_sg01hp3_sensors
+ aggregated_sensor_list
)

sensor_register_ranges = (
Expand Down
27 changes: 27 additions & 0 deletions src/deye_sensors_aggregated.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from deye_sensor import AggregatedValueSensor

aggregated_day_energy_sensor = AggregatedValueSensor(
"Aggregated daily energy", mqtt_topic_suffix="day_energy", unit="kWh", groups=["aggregated"]
)
aggregated_ac_active_power_sensor = AggregatedValueSensor(
"Aggregated AC active power", mqtt_topic_suffix="ac/active_power", unit="W", groups=["aggregated"]
)

aggregated_sensor_list = [aggregated_day_energy_sensor, aggregated_ac_active_power_sensor]
Loading

0 comments on commit 0b42403

Please sign in to comment.