-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream_server.py
105 lines (84 loc) · 2.72 KB
/
stream_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import signal
import threading
import time
from concurrent import futures
import serial
import grpc
import streamint_pb2 as pb2
import streamint_pb2_grpc as pb2_grpc
from openbci import OpenBCICyton
# Global variables
LATEST_DATA = None
DATA_LOCK = threading.Lock()
STOP_EVENT = threading.Event()
SERVER = None
BOARD = None
def signal_handler():
print("\nCtrl+C pressed. Stopping the server and Bluetooth connection...")
STOP_EVENT.set()
if SERVER:
SERVER.stop(0)
if BOARD:
BOARD.stop_stream()
signal.signal(signal.SIGINT, signal_handler)
def run_bluetooth():
global BOARD
try:
print("Attempting to connect to OpenBCI board...")
BOARD = OpenBCICyton(port="COM8", daisy=False)
print("Connected to OpenBCI board. Starting stream...")
BOARD.start_stream(update_data)
while not STOP_EVENT.is_set():
time.sleep(0.1)
except serial.serialutil.SerialException as e:
print(f"Error connecting to OpenBCI board: {e}")
except Exception as e:
print(f"Unexpected error in run_bluetooth: {e}")
finally:
if BOARD:
BOARD.stop_stream()
BOARD.disconnect()
print("Bluetooth connection closed.")
class StreamService(pb2_grpc.StreamIntServiceServicer):
def StreamInt(self, request, context):
print("StreamInt method called")
while not STOP_EVENT.is_set():
with DATA_LOCK:
if LATEST_DATA is not None:
result = int(LATEST_DATA[0])
received = True
print(f"Sending: result={result}, received={received}")
yield pb2.StreamIntResponse(result=result, received=received)
else:
print("Waiting for data...")
time.sleep(0.1)
def update_data(sample):
global LATEST_DATA
with DATA_LOCK:
LATEST_DATA = sample.channels_data
print("Received data from OpenBCI:", LATEST_DATA)
def serve():
global SERVER
SERVER = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
pb2_grpc.add_StreamIntServiceServicer_to_server(StreamService(), SERVER)
SERVER.add_insecure_port("[::]:50051")
SERVER.start()
print("Server started at port 50051")
try:
while not STOP_EVENT.is_set():
time.sleep(0.1)
except KeyboardInterrupt:
pass
finally:
SERVER.stop(0)
print("gRPC server stopped.")
def main():
print("Starting Bluetooth thread...")
bluetooth_thread = threading.Thread(target=run_bluetooth)
bluetooth_thread.start()
print("Starting gRPC server...")
serve()
bluetooth_thread.join()
print("Main thread exiting.")
if __name__ == "__main__":
main()