Skip to content

Commit

Permalink
merged in master
Browse files Browse the repository at this point in the history
  • Loading branch information
Christian-B committed Nov 27, 2024
2 parents ba27d77 + aad2544 commit cc6e3d2
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ class LiveEventConnection(DatabaseConnection):
"__is_running",
"__expect_scp_response",
"__expect_scp_response_lock",
"__scp_response_received")
"__scp_response_received",
"__tag_update_thread",
"__send_tag_update_thread_lock")

def __init__(self, live_packet_gather_label: Optional[str],
receive_labels: Optional[Iterable[str]] = None,
Expand Down Expand Up @@ -189,6 +191,8 @@ def __init__(self, live_packet_gather_label: Optional[str],
self.__receiver_connection: Optional[UDPConnection] = None
self.__error_keys: Set[int] = set()
self.__is_running = False
self.__tag_update_thread: Optional[Thread] = None
self.__send_tag_update_thread_lock = Condition()
self.__expect_scp_response = False
self.__expect_scp_response_lock = Condition()
self.__scp_response_received: Optional[bytes] = None
Expand Down Expand Up @@ -492,27 +496,37 @@ def __launch_thread(
thread.start()

def __do_start_resume(self) -> None:
while self.__tag_update_thread is not None:
sleep(0.5)
for label, callbacks in self.__start_resume_callbacks.items():
for callback in callbacks:
self.__launch_thread("start_resume", label, callback)
if not self.__is_running:
self.__is_running = True
thread = Thread(target=self.__send_tag_messages_thread)
thread.start()
with self.__send_tag_update_thread_lock:
if not self.__is_running:
self.__is_running = True
self.__tag_update_thread = Thread(
target=self.__send_tag_messages_thread)
self.__tag_update_thread.start()

def __do_stop_pause(self) -> None:
self.__is_running = False
with self.__send_tag_update_thread_lock:
self.__is_running = False
self.__send_tag_update_thread_lock.notify_all()
for label, callbacks in self.__pause_stop_callbacks.items():
for callback in callbacks:
self.__launch_thread("pause_stop", label, callback)
if self.__tag_update_thread is not None:
self.__tag_update_thread.join()
self.__tag_update_thread = None

def __send_tag_messages_thread(self) -> None:
if self.__receiver_connection is None:
return
while self.__is_running:
sleep(10.0)
if self.__is_running:
self.__send_tag_messages_now()
with self.__send_tag_update_thread_lock:
while self.__is_running:
self.__send_tag_update_thread_lock.wait(timeout=10.0)
if self.__is_running:
self.__send_tag_messages_now()

def __send_tag_messages_now(self) -> None:
if self.__receiver_connection is None:
Expand All @@ -523,15 +537,15 @@ def __send_tag_messages_now(self) -> None:
with self.__expect_scp_response_lock:
self.__scp_response_received = None
self.__expect_scp_response = True
if rc:
rc.update_tag(x, y, tag, do_receive=False)
# No port trigger necessary; proxied already
else:
reprogram_tag_to_listener(
self.__receiver_connection, x, y, board_address,
tag, read_response=False)
while self.__scp_response_received is None:
self.__expect_scp_response_lock.wait(timeout=1.0)
if rc:
rc.update_tag(x, y, tag, do_receive=False)
# No port trigger necessary; proxied already
else:
reprogram_tag_to_listener(
self.__receiver_connection, x, y, board_address,
tag, read_response=False)
self.__expect_scp_response_lock.wait(timeout=2.0)

def __handle_scp_packet(self, data: bytes) -> bool:
with self.__expect_scp_response_lock:
Expand All @@ -544,6 +558,7 @@ def __handle_scp_packet(self, data: bytes) -> bool:
data[_SCP_DEST_CPU_BYTE] == _SCP_RESPONSE_DEST):
self.__scp_response_received = data
self.__expect_scp_response = False
self.__expect_scp_response_lock.notify_all()
return True
return False

Expand Down Expand Up @@ -786,6 +801,8 @@ def _send(self, message: AbstractEIEIOMessage, x: int, y: int, p: int,
(ip_address, SCP_SCAMP_PORT))

def close(self) -> None:
self.__is_running = False
with self.__send_tag_update_thread_lock:
self.__is_running = False
self.__send_tag_update_thread_lock.notify_all()
self.__handle_possible_rerun_state()
super().close()
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,6 @@ def send_start_resume_notification(self) -> None:
"""
logger.info("** Sending start / resume message to external sources "
"to state the simulation has started or resumed. **")
if self.__wait_for_read_confirmation:
self.wait_for_confirmation()
eieio_command_message = NotificationProtocolStartResume()
for c in self.__database_message_connections:
try:
Expand Down

0 comments on commit cc6e3d2

Please sign in to comment.