-
Notifications
You must be signed in to change notification settings - Fork 11
/
main.py
175 lines (148 loc) · 6.15 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
"""
This script connects a Meshtastic mesh network to Matrix chat rooms by relaying messages between them.
It uses Meshtastic-python and Matrix nio client library to interface with the radio and the Matrix server respectively.
"""
import asyncio
import logging
import signal
import sys
from typing import List
from nio import ReactionEvent, RoomMessageEmote, RoomMessageNotice, RoomMessageText
# Import meshtastic_utils as a module to set event_loop
import meshtastic_utils
from config import relay_config
from db_utils import (
initialize_database,
update_longnames,
update_shortnames,
wipe_message_map,
)
from log_utils import get_logger
from matrix_utils import connect_matrix, join_matrix_room
from matrix_utils import logger as matrix_logger
from matrix_utils import on_room_message
from meshtastic_utils import connect_meshtastic
from meshtastic_utils import logger as meshtastic_logger
from plugin_loader import load_plugins
# Initialize logger
logger = get_logger(name="M<>M Relay")
# Extract Matrix configuration
matrix_rooms: List[dict] = relay_config["matrix_rooms"]
# Set the logging level for 'nio' to ERROR to suppress warnings
logging.getLogger("nio").setLevel(logging.ERROR)
async def main():
"""
Main asynchronous function to set up and run the relay.
Includes logic for wiping the message_map if configured in db.msg_map.wipe_on_restart.
Also updates longnames and shortnames periodically as before.
"""
# Set the event loop in meshtastic_utils
meshtastic_utils.event_loop = asyncio.get_event_loop()
# Initialize the SQLite database
initialize_database()
# Check db config for wipe_on_restart
db_config = relay_config.get("db", {})
msg_map_config = db_config.get("msg_map", {})
wipe_on_restart = msg_map_config.get("wipe_on_restart", False)
if wipe_on_restart:
logger.debug("wipe_on_restart enabled. Wiping message_map now (startup).")
wipe_message_map()
# Load plugins early
load_plugins()
# Connect to Meshtastic
meshtastic_utils.meshtastic_client = connect_meshtastic()
# Connect to Matrix
matrix_client = await connect_matrix()
if matrix_client is None:
logger.error("Failed to connect to Matrix. Exiting.")
return
# Join the rooms specified in the config.yaml
for room in matrix_rooms:
await join_matrix_room(matrix_client, room["id"])
# Register the message callback for Matrix
matrix_logger.info("Listening for inbound Matrix messages...")
matrix_client.add_event_callback(
on_room_message, (RoomMessageText, RoomMessageNotice, RoomMessageEmote)
)
# Add ReactionEvent callback so we can handle matrix reactions
matrix_client.add_event_callback(on_room_message, ReactionEvent)
# Set up shutdown event
shutdown_event = asyncio.Event()
async def shutdown():
matrix_logger.info("Shutdown signal received. Closing down...")
meshtastic_utils.shutting_down = True # Set the shutting_down flag
shutdown_event.set()
loop = asyncio.get_running_loop()
# Handle signals differently based on the platform
if sys.platform != "win32":
for sig in (signal.SIGINT, signal.SIGTERM):
loop.add_signal_handler(sig, lambda: asyncio.create_task(shutdown()))
else:
# On Windows, we can't use add_signal_handler, so we'll handle KeyboardInterrupt
pass
# Start the Matrix client sync loop
try:
while not shutdown_event.is_set():
try:
if meshtastic_utils.meshtastic_client:
# Update longnames & shortnames
update_longnames(meshtastic_utils.meshtastic_client.nodes)
update_shortnames(meshtastic_utils.meshtastic_client.nodes)
else:
meshtastic_logger.warning("Meshtastic client is not connected.")
matrix_logger.info("Starting Matrix sync loop...")
sync_task = asyncio.create_task(
matrix_client.sync_forever(timeout=30000)
)
shutdown_task = asyncio.create_task(shutdown_event.wait())
done, pending = await asyncio.wait(
[sync_task, shutdown_task],
return_when=asyncio.FIRST_COMPLETED,
)
if shutdown_event.is_set():
matrix_logger.info("Shutdown event detected. Stopping sync loop...")
sync_task.cancel()
try:
await sync_task
except asyncio.CancelledError:
pass
break
except Exception as e:
if shutdown_event.is_set():
break
matrix_logger.error(f"Error syncing with Matrix server: {e}")
await asyncio.sleep(5) # Wait before retrying
except KeyboardInterrupt:
await shutdown()
finally:
# Cleanup
matrix_logger.info("Closing Matrix client...")
await matrix_client.close()
if meshtastic_utils.meshtastic_client:
meshtastic_logger.info("Closing Meshtastic client...")
try:
meshtastic_utils.meshtastic_client.close()
except Exception as e:
meshtastic_logger.warning(f"Error closing Meshtastic client: {e}")
# Attempt to wipe message_map on shutdown if enabled
if wipe_on_restart:
logger.debug("wipe_on_restart enabled. Wiping message_map now (shutdown).")
wipe_message_map()
# Cancel the reconnect task if it exists
if meshtastic_utils.reconnect_task:
meshtastic_utils.reconnect_task.cancel()
meshtastic_logger.info("Cancelled Meshtastic reconnect task.")
# Cancel any remaining tasks
tasks = [t for t in asyncio.all_tasks(loop) if not t.done()]
for task in tasks:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
matrix_logger.info("Shutdown complete.")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass