-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathagent_mani_random.py
83 lines (56 loc) · 2.1 KB
/
agent_mani_random.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
import evaluation_pb2
import evaluation_pb2_grpc
import grpc
import os
import pickle
import time
import numpy as np
time.sleep(60)
LOCAL_EVALUATION = os.environ.get("LOCAL_EVALUATION")
if LOCAL_EVALUATION:
channel = grpc.insecure_channel("environment:8085")
else:
channel = grpc.insecure_channel("localhost:8085")
def pack_for_grpc(entity):
return pickle.dumps(entity)
def unpack_for_grpc(entity):
return pickle.loads(entity)
str_test = 'July 13, 7:30AM' # <-- to be removed, only for testing
stub = evaluation_pb2_grpc.EnvironmentStub(channel)
print( unpack_for_grpc(
stub.get_obsdict(
evaluation_pb2.Package(SerializedEntity=pack_for_grpc(None))
).SerializedEntity
))
print(f'Len OBS: {len(unpack_for_grpc( stub.reset(evaluation_pb2.Package(SerializedEntity=pack_for_grpc(None))).SerializedEntity))}')
print("Original Ouput Obs keys:"+str(unpack_for_grpc(
stub.get_obsdict(
evaluation_pb2.Package(SerializedEntity=pack_for_grpc(None))
).SerializedEntity
)))
flag_completed = None # this flag will detect then the whole eval is finished
repetition = 0
while not flag_completed:
flag_trial = None # this flag will detect the end of an episode/trial
counter = 0
repetition +=1
while not flag_trial :
if counter == 0:
print('MANIPULATION : Start Resetting the environment and get 1st obs')
obs = unpack_for_grpc(
stub.reset(
evaluation_pb2.Package(SerializedEntity=pack_for_grpc(None))
).SerializedEntity
)
action = np.random.rand(63)
## stub gets info from the environment
p = evaluation_pb2.Package(SerializedEntity=pack_for_grpc(action))
s = stub.act_on_environment(p)
ss = s.SerializedEntity
base = unpack_for_grpc(ss)
obs = base["feedback"][0]
flag_trial = base["feedback"][2]
flag_completed = base["eval_completed"]
print(f"MANIPULATION : Random Agent Feedback iter {counter} -- solved: {flag_trial}")
print("*" * 100)
counter +=1