forked from Oliviamiller008/drinkbot
-
Notifications
You must be signed in to change notification settings - Fork 0
/
drinkbot.py
153 lines (116 loc) · 4.15 KB
/
drinkbot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import asyncio
from inspect import getcallargs
import numpy
from viam.robot.client import RobotClient
from viam.rpc.dial import Credentials, DialOptions
from viam.components.motor import Motor
from viam.services.types import ServiceType
from viam.services.vision import DetectorConfig, DetectorType
from viam.components.camera import Camera
from viam.components.base import Base, Vector3
from PIL import Image
linear_zero = Vector3(x = 0, y = 0, z = 0)
angular_zero = Vector3(x = 0, y = 0, z = 0)
async def client():
creds = Credentials(
type='robot-location-secret',
payload='lauqrt4op4x6ubhji867wue0qsnqdq76x00ljfv7vcoyq7mi')
opts = RobotClient.Options(
refresh_interval=0,
dial_options=DialOptions(credentials=creds)
)
async with await RobotClient.at_address(
'drinkbot-main.rdt5n4brox.local.viam.cloud:8080',
opts) as robot:
print('Resources:')
print(robot.resource_names)
# vis = await getVisService(robot)
vis = robot.get_service(ServiceType.VISION)
motor = Motor.from_robot(robot, 'motor')
wheels = Base.from_robot(robot, 'base')
names = await vis.get_detector_names()
print(names)
while 1:
# check camera for detections
detections = await vis.get_detections_from_camera("cam", "detector_color")
# return
# found no detections
if len(detections) == 0:
detections = await find_target(vis, wheels, detections)
# found detection
if len(detections) != 0:
# move to target
await move_to_target(vis, wheels, detections)
# pour
await motor.set_power(0.35)
await asyncio.sleep(4.5)
await motor.set_power(0)
await asyncio.sleep(2)
await motor.set_power(-0.35)
await asyncio.sleep(4.5)
await motor.set_power(0)
return
await robot.close()
async def move_to_target(vis, wheels, detections):
x_center = (detections[0].x_max + detections[0].x_min)/2
print("x center: ")
print(x_center)
linear = Vector3(x = 0, y = 0.5, z = 0)
angular = angular_zero
while 1:
if(x_center < 160):
angular = Vector3(x = 0, y = 0, z = 0.05)
elif(x_center > 160):
angular = Vector3(x = 0, y = 0, z = -0.05)
elif(x_center == 160):
angular = angular_zero
await wheels.set_power(linear, angular)
detections = await vis.get_detections_from_camera("cam", "detector_color")
if len(detections) == 0:
await wheels.stop()
break
x_center = (detections[0].x_max + detections[0].x_min)/2
print("x center: ")
print(x_center)
print("in range for pick up")
await wheels.stop()
await asyncio.sleep(1)
async def find_target(vis, wheels, detections):
rotation_count = 0
# creat power vectors to only spin
angular = Vector3(x = 0, y = 0, z = 0.5)
# found no detections
while (len(detections) == 0):
print("rotation count = ", rotation_count)
# if 360 deg with no detections, task completed
if rotation_count > 52:
print("all objects cleared")
return
# await wheels.spin(10, 90) <-- not currently working, but ideal implementation
await wheels.set_power(linear_zero, angular)
await asyncio.sleep(0.2)
await wheels.stop()
rotation_count += 1
detections = await vis.get_detections_from_camera("cam", "detector_color")
print("found object")
return detections
return
motor = Motor.from_robot(robot, "motor")
await motor.set_power(0.35)
await asyncio.sleep(4.5)
await motor.set_power(0)
await asyncio.sleep(2)
await motor.set_power(-0.35)
await asyncio.sleep(4.5)
await motor.set_power(0)
await robot.close()
# async def get_cup(detections):
# cup = None
# for d in detections:
# if (d.class_name == "Cup"):
# print(d)
# cup = d
# return cup
# return cup
if __name__ == '__main__':
asyncio.run(client())