Skip to content

Commit

Permalink
Merge pull request #193 from thegridelectric/dev
Browse files Browse the repository at this point in the history
 Reupload / #190 fix - Ensure that reupload continues even when corrupt events are discovered.
  • Loading branch information
anschweitzer authored May 3, 2024
2 parents b5d1eef + 4bf7d23 commit 3bb6b93
Show file tree
Hide file tree
Showing 14 changed files with 865 additions and 184 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/constraints.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
pip==24.0
nox==2024.3.2
nox==2024.4.15
nox-poetry==1.0.3
poetry==1.8.2
2 changes: 1 addition & 1 deletion .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,4 @@ jobs:
nox --session=coverage -- xml
- name: Upload coverage report
uses: codecov/codecov-action@v4.1.1
uses: codecov/codecov-action@v4.3.1
6 changes: 3 additions & 3 deletions docs/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
furo==2024.1.29
sphinx==7.2.6
sphinx==7.3.7
sphinx-click==5.1.0
myst_parser==2.0.0
myst_parser==3.0.1
sphinxcontrib-mermaid==0.9.2
pytest==8.1.1
pytest==8.2.0
pytest-asyncio==0.23.6
443 changes: 356 additions & 87 deletions poetry.lock

Large diffs are not rendered by default.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "gridworks-proactor"
version = "0.4.3"
version = "0.4.4"
description = "Gridworks Proactor"
authors = ["Jessica Millar <[email protected]>"]
license = "MIT"
Expand Down Expand Up @@ -35,6 +35,7 @@ gridworks-cert = {version = ">=0.4.2", optional = true}
aiohttp = "^3.8.5"
yarl = "^1.9.2"
multidict = "^6.0.4"
pendulum = "2.1.2"

[tool.poetry.dev-dependencies]
Pygments = ">=2.10.0"
Expand Down
174 changes: 126 additions & 48 deletions src/gwproactor/links/link_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@
from gwproactor.message import MQTTDisconnectPayload
from gwproactor.message import MQTTReceiptPayload
from gwproactor.message import MQTTSubackPayload
from gwproactor.persister import ByteDecodingError
from gwproactor.persister import DecodingError
from gwproactor.persister import FileEmptyWarning
from gwproactor.persister import JSONDecodingError
from gwproactor.persister import PersisterInterface
from gwproactor.persister import UIDMissingWarning
Expand Down Expand Up @@ -92,7 +95,6 @@ def __init__(
self._stats = stats
self._event_persister = event_persister
self._reuploads = Reuploads(
self._event_persister,
self._logger,
self._settings.num_initial_event_reuploads,
)
Expand Down Expand Up @@ -208,6 +210,9 @@ def log_subscriptions(self, tag=""):
s += f"\t\t[{subscription}]\n"
self._logger.lifecycle(s)

def get_reuploads_str(self, verbose: bool = True, num_events: int = 5) -> str:
return self._reuploads.get_str(verbose=verbose, num_events=num_events)

def publish_message(
self, client, message: Message, qos: int = 0, context: Any = None
) -> MQTTMessageInfo:
Expand Down Expand Up @@ -253,61 +258,140 @@ def generate_event(self, event: EventT) -> Result[bool, BaseException]:
)

def _start_reupload(self) -> None:
self._logger.path("++_start_reupload reuploading: %s", self.reuploading())
path_dbg = 0
if not self._reuploads.reuploading():
self._continue_reupload(
self._reuploads.start_reupload(self._event_persister.pending())
)

def _continue_reupload(self, event_ids: list[str]) -> None:
self._logger.path("++_continue_reupload %d", len(event_ids))
path_dbg = 0
tried_count_dbg = 0
sent_count_dbg = 0
continuation_count_dbg = -1

if event_ids:
path_dbg |= 0x00000001
events_to_reupload = self._reuploads.start_reupload()
self._reupload_events(events_to_reupload)
if self._logger.isEnabledFor(logging.INFO):
path_dbg |= 0x00000002
if self._reuploads.reuploading():
path_dbg |= 0x00000004
state_str = f"{self._reuploads.num_reupload_pending} reupload events pending."
else:
path_dbg |= 0x00000008
state_str = "reupload complete."
self._logger.info(
f"_start_reupload: reuploaded {len(events_to_reupload)} events. "
f"{state_str} "
f"Total pending events: {self._event_persister.num_pending}."
)
sent_one = False
# Try to send all requested events. At least send must succeed to
# continue the reupload, so if all sends fail, get more until
# one is sent or there are no more reuploads.
while not sent_one and self._reuploads.reuploading() and event_ids:
continuation_path_dbg = 0x00000002
continuation_count_dbg += 1
next_event_ids = []
for event_id in event_ids:
event_path_dbg = 0x00000004
tried_count_dbg += 1
problems = Problems()
ret = self._reupload_event(event_id)
if ret.is_ok():
event_path_dbg |= 0x00000008
if ret.value:
path_dbg |= 0x00000010
sent_count_dbg += 1
sent_one = True
else:
event_path_dbg |= 0x00000020
problems.add_error(DecodingError(uid=event_id))
else:
event_path_dbg |= 0x00000040
problems.add_problems(ret.err())
if problems:
event_path_dbg |= 0x00000080
# There was some error decoding this event.
# We generate a new event with information
# about decoding failure and delete this event.
self.generate_event(
problems.problem_event(
f"Event decoding error - uid:{event_id}"
)
)
self._event_persister.clear(event_id)
if sent_one:
event_path_dbg |= 0x00000100
self._reuploads.clear_unacked_event(event_id)
else:
event_path_dbg |= 0x00000200
next_event_ids.extend(
self._reuploads.process_ack_for_reupload(event_id)
)
self._logger.path(" 1 event path:0x%08X", event_path_dbg)
continuation_path_dbg |= event_path_dbg
self._logger.path(" 1 continuation path:0x%08X", continuation_path_dbg)
event_ids = next_event_ids
path_dbg |= continuation_path_dbg
self._logger.path(
"--_start_reupload reuploading: %s path:0x%08X",
self.reuploading(),
"--_continue_reupload path:0x%08X sent:%d tried:%d continuations:%d",
path_dbg,
sent_count_dbg,
tried_count_dbg,
continuation_count_dbg,
)

def _reupload_events(self, event_ids: list[str]) -> Result[bool, BaseException]:
errors = []
for message_id in event_ids:
match self._event_persister.retrieve(message_id):
case Ok(event_bytes):
if event_bytes is None:
errors.append(
UIDMissingWarning("reupload_events", uid=message_id)
def _reupload_event(self, event_id) -> Result[bool, Problems]:
"""Load event for event_id from storage, decoded to JSON and send it.
Return either Ok(True) or Err(Problems(list of decoding errors)).
Send errors handled either by exception, which will propagate up, or
by ack timeout.
"""
self._logger.path("++_reupload_event %s", event_id)
path_dbg = 0
problems = Problems()
match self._event_persister.retrieve(event_id):
case Ok(event_bytes):
path_dbg |= 0x00000001
if event_bytes is None:
path_dbg |= 0x00000002
problems.add_error(
UIDMissingWarning("reupload_events", uid=event_id)
)
elif len(event_bytes) == 0:
path_dbg |= 0x00000004
problems.add_error(
FileEmptyWarning("reupload_events", uid=event_id)
)
else:
path_dbg |= 0x00000008
try:
event_str = event_bytes.decode(encoding=self.PERSISTER_ENCODING)
except BaseException as e:
path_dbg |= 0x00000010
problems.add_error(e).add_error(
ByteDecodingError("reupload_events", uid=event_id)
)
else:
path_dbg |= 0x00000020
try:
event = json.loads(
event_bytes.decode(encoding=self.PERSISTER_ENCODING)
)
event = json.loads(event_str)
except BaseException as e:
errors.append(e)
errors.append(
JSONDecodingError("reupload_events", uid=message_id)
path_dbg |= 0x00000040
problems.add_error(e).add_error(
JSONDecodingError(
f"reupload_events - raw json:\n<\n{event_str}\n>",
uid=event_id,
)
)
else:
path_dbg |= 0x00000080
self.publish_upstream(event, AckRequired=True)
case Err(error):
errors.append(error)
if errors:
return Err(Problems(errors=errors))
return Ok()
self._logger.path(
"--_reupload_event:1 path:0x%08X", path_dbg
)
return Ok(True)
case Err(error):
path_dbg |= 0x00000100
problems.add_problems(error)
self._logger.path("--_reupload_event:0 path:0x%08X", path_dbg)
return Err(problems)

def start(
self, loop: asyncio.AbstractEventLoop, async_queue: asyncio.Queue
) -> None:
if self.upstream_client:
self._reuploads.stats = self._stats.link(self.upstream_client)
self._mqtt_clients.start(loop, async_queue)
self.generate_event(StartupEvent())
self._states.start_all()
Expand Down Expand Up @@ -410,17 +494,11 @@ def process_ack(self, link_name: str, message_id: str):
self._event_persister.clear(message_id)
if self._reuploads.reuploading() and link_name == self.upstream_client:
path_dbg |= 0x00000002
reupload_now = self._reuploads.process_ack_for_reupload(message_id)
if reupload_now:
path_dbg |= 0x00000004
self._reupload_events(reupload_now)
self._logger.path(
"events pending: %d reupload pending: %d",
self._event_persister.num_pending,
self._reuploads.num_reupload_pending,
self._continue_reupload(
self._reuploads.process_ack_for_reupload(message_id)
)
if not self._reuploads.reuploading():
path_dbg |= 0x00000008
path_dbg |= 0x00000004
self._logger.info("reupload complete.")
self._logger.path("--LinkManager.process_ack path:0x%08X", path_dbg)

Expand Down
Loading

0 comments on commit 3bb6b93

Please sign in to comment.