-
Notifications
You must be signed in to change notification settings - Fork 0
/
example.py
164 lines (128 loc) · 5.02 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import random
import datetime
from engine.entities import Object
from engine.logger import Logger, LogType
from engine.world import Location, World
from engine.agents import Agent, ContextRegistry, MoveToLocation
def create_base_agent(
name: str,
world: World,
home: Location,
context_manager: ContextRegistry,
) -> Agent:
agent = Agent(name, world, context_manager)
world.register_entity(agent)
world.place_entity(agent, home)
weight_vector = {}
possible_features = context_manager.get_expanded_features()
for feature in possible_features:
weight_vector[feature] = (random.random(), random.random())
agent.set_weights(MoveToLocation, weight_vector)
return agent
def show_report(agent: Agent):
logger = Logger.instance()
entered_actions = logger.get_action(
subject=agent, actions=[LogType.ENTERED_LOCATION]
)
print("")
print(f"# Stats for {agent.name}:")
print(f"## Number of different locations visited: {len(set([action.properties['location'] for action in entered_actions]))}") # type: ignore
print(f"## Number of locations visited: {len(entered_actions)}") # type: ignore
# Time per practice
practices = logger.get_action(
subject=agent, actions=[LogType.STARTED_PRACTICE, LogType.FINISHED_PRACTICE]
)
practice_log = []
practices_available = set()
for i in range(len(practices) // 2):
practice_time = practices[i * 2 + 1].tick - practices[i * 2].tick
practice_label = str(practices[i * 2].properties) # type: ignore
practice_log.append((practice_label, practice_time))
practices_available.add(practice_label)
print(f"# Stats for {agent.name} practices:")
for practice in practices_available:
num_occurences = 0
total_time = 0
for log in practice_log:
if log[0] == practice:
num_occurences += 1
total_time += log[1]
print(
f"## {practice:70} / occur: {num_occurences:5} / time: {total_time:5} / avg: {total_time/num_occurences:.5}"
)
if __name__ == "__main__":
logger = Logger.instance()
w1 = World(logger)
# Add Locations
house1 = Location("House1", min_time_inside=10, is_path=False)
house2 = Location("House2", min_time_inside=10, is_path=False)
house3 = Location("House3", min_time_inside=10, is_path=False)
square = Location("Square", min_time_inside=50, is_path=False)
path1 = Location("Path1", min_time_inside=1, is_path=True)
path2 = Location("Path2", min_time_inside=2, is_path=True)
path3 = Location("Path3", min_time_inside=2, is_path=True)
workplace = Location("Workplace", min_time_inside=10, is_path=False)
w1.register_location(house1)
w1.register_location(house2)
w1.register_location(house3)
w1.register_location(workplace)
w1.register_location(square)
w1.register_location(path1)
w1.register_location(path2)
w1.register_location(path3)
w1.register_location_connection(house1, path1)
w1.register_location_connection(house2, path2)
w1.register_location_connection(house3, path2)
w1.register_location_connection(path1, square)
w1.register_location_connection(path2, square)
w1.register_location_connection(path3, square)
w1.register_location_connection(path3, workplace)
# Create Objects
house1_bed = Object("Bed", labels=["BED"])
house2_bed = Object("Bed", labels=["BED"])
house3_bed = Object("Bed", labels=["BED"])
w1.register_entity(house1_bed)
w1.register_entity(house2_bed)
w1.register_entity(house3_bed)
w1.place_entity(house1_bed, house1)
w1.place_entity(house2_bed, house2)
w1.place_entity(house3_bed, house3)
# Define Context
cm = ContextRegistry()
cm.registerScalarFeature("Time")
cm.registerCategoricalFeature(
"CurrentLocation", [str(location) for location in w1.locations], False
)
cm.registerCategoricalFeature(
"TargetLocation", [str(location) for location in w1.locations], True
)
# Create Agent 1
agent_1 = create_base_agent(
name="Agent 1", world=w1, home=house1, context_manager=cm
)
agent_2 = create_base_agent(
name="Agent 2", world=w1, home=house2, context_manager=cm
)
agent_3 = create_base_agent(
name="Agent 3", world=w1, home=house3, context_manager=cm
)
# Simulate
NUM_TICKS = 24000
NUM_TICKS_TO_LOG_COMMIT = 1000
print("Starting Simulation...")
start = datetime.datetime.now()
for i in range(NUM_TICKS):
w1.tick()
logger.set_tick(w1.time)
if i % NUM_TICKS_TO_LOG_COMMIT == 0:
logger.commit()
logger.commit()
print("Simulation ended")
end = datetime.datetime.now()
delta = end - start
total_miliseconds = delta.total_seconds() * 1000 + delta.microseconds / 1000
print(f"Total simulation took {total_miliseconds/1000} seconds")
print(f"Average tick took {total_miliseconds/NUM_TICKS} miliseconds")
show_report(agent_1)
show_report(agent_2)
show_report(agent_3)