-
Notifications
You must be signed in to change notification settings - Fork 1
/
ipc.py
119 lines (93 loc) · 3.2 KB
/
ipc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#!/usr/bin/env python3
import numpy as np
import cv2
import queue, threading, time
from datetime import datetime
from time import sleep
import itertools, enum
class Areas(enum.Enum):
Pavement = 1
Car_A = 2
Car_B = 3
def __str__(self):
return self.name
class IPC:
def __init__(self, name, cam, fr, od, hac):
self._cap = cv2.VideoCapture(name)
self._name = name
self._fr = fr
self._od = od
self._hac = hac
self._zoom = False
self._camera = cam
self._fps = 0
self._thread = threading.Thread(target=self.loop)
self._thread.daemon = True
self._thread.start()
def loop(self):
i = 0
begin = time.time()
for area in itertools.cycle(Areas):
i += 1
ret, new_frame = self._cap.read()
if not ret:
print(datetime.now().strftime("%H:%M:%S")+": IPC read() error. Waiting 5 minutes.", file=open("dahua.log", "a"))
print(datetime.now().strftime("%H:%M:%S")+": IPC read() error. Waiting 5 minutes.")
self._camera.lock(60)
# sleep(5)
self._cap.release()
sleep(60)
self._cap = cv2.VideoCapture(self._name)
sleep(5)
continue
if self._camera.zoom:
crop = new_frame[0:600, 600:1600]
self._fr.process_frame(crop, Areas.Pavement)
ha_frame = new_frame[0:700, 400:1400]
else:
if area == Areas.Pavement:
crop = new_frame[580: 900, 1208: 1528]
elif area == Areas.Car_A:
crop = new_frame[585: 1001, 1492: 1908]
else:
crop = new_frame[572: 1100, 1834: 2362]
self._od.process_frame(crop, area)
ha_frame = new_frame[683: 931, 1143: 1517] #[0:300, 2000:3000] #
self._hac.set_frame(ha_frame.copy())
if time.time() - begin >= 10:
self._fps = int(i/10)
i = 0
begin = time.time()
@property
def fps(self):
return self._fps
class SimpleIPC:
def __init__(self, name, fd):
self._cap = cv2.VideoCapture(name)
self._name = name
self._fd = fd
self._fps = 0
self._thread = threading.Thread(target=self.loop)
self._thread.daemon = True
self._thread.start()
def loop(self):
i = 0
begin = time.time()
while True:
i += 1
ret, new_frame = self._cap.read()
if not ret:
print(datetime.now().strftime("%H:%M:%S")+": Furtka cv2.read() error")
self.cap.release()
sleep(60)
self.cap = cv2.VideoCapture(self._name)
sleep(60)
continue
self._fd.process_frame(new_frame, None)
if time.time() - begin >= 60:
self._fps = int(i/60)
i = 0
begin = time.time()
@property
def fps(self):
return self._fps