-
Notifications
You must be signed in to change notification settings - Fork 28
/
Copy pathsimulation.py
233 lines (183 loc) · 7.57 KB
/
simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
"""`Simulation` represents an airport simulation of a day on the given
parameters. `ClonedSimulation` is designed to be an immutable delegate of a
simulation that can be used for other objects to observe or predict the
simulation states.
"""
import time
import logging
import traceback
import importlib
from copy import deepcopy, copy
from clock import Clock, ClockException
from airport import Airport
from scenario import Scenario
from routing_expert import RoutingExpert
from analyst import Analyst
from utils import get_seconds_after
from uncertainty import Uncertainty
from config import Config
from state_logger import StateLogger
class Simulation:
"""Simulation, representing a simulation day, holds both static and dynamic
states of the current airport, and implements `tick()` functions for the
caller to simulation to the next state.
"""
def __init__(self):
params = Config.params
# Setups the logger
self.logger = logging.getLogger(__name__)
# Setups the clock
self.clock = Clock()
# Sets up the airport
airport_name = params["airport"]
self.airport = Airport.create(airport_name)
# Sets up the scenario
self.scenario = Scenario.create(
airport_name, self.airport.surface)
# Sets up the routing expert monitoring the airport surface
self.routing_expert = RoutingExpert(self.airport.surface.links,
self.airport.surface.nodes,
params["simulation"]["cache"])
# Sets up the uncertainty module
self.uncertainty = (Uncertainty(params["uncertainty"]["prob_hold"])
if params["uncertainty"]["enabled"] else (None))
# Loads the requested scheduler
self.scheduler = get_scheduler()
if not params["simulator"]["test_mode"]:
# Sets up the analyst
self.analyst = Analyst(self)
# Sets up the state logger
self.state_logger = StateLogger()
# Initializes the previous schedule time
self.last_schedule_time = None
# Initializes the last execution time for rescheduling to None
self.last_schedule_exec_time = None
self.__print_stats()
def tick(self):
"""Moves the states of this simulation to the next state."""
self.logger.debug("\nCurrent Time: %s", self.now)
try:
# Reschedule happens before the tick
if self.__is_time_to_reschedule():
self.logger.info("Time to reschedule")
start = time.time()
self.__reschedule()
self.last_schedule_exec_time = time.time() - start # seconds
self.last_schedule_time = self.now
self.logger.info("Last schedule time is updated to %s",
self.last_schedule_time)
# Add aircraft
self.airport.add_aircrafts(self.scenario, self.now,
self.clock.sim_time)
# Inject uncertainties
if self.uncertainty:
self.uncertainty.inject(self)
# Tick
self.airport.tick()
state = None
if not Config.params["simulator"]["test_mode"]:
state = self.state_logger.log_on_tick(self)
self.clock.tick()
# Remove aircraft
self.airport.remove_aircrafts(self.scenario)
# Abort on conflict
conflicts = self.airport.conflicts
if conflicts:
for conflict in conflicts:
self.logger.error("Found %s", conflict)
raise SimulationException("Conflict found")
# Observe
if not Config.params["simulator"]["test_mode"]:
self.analyst.observe_on_tick(self)
# return current state for streaming visualization
return state
except ClockException as error:
# Finishes
if not Config.params["simulator"]["test_mode"]:
self.analyst.save()
raise error
except SimulationException as error:
raise error
except Exception as error:
self.logger.error(traceback.format_exc())
raise error
def __is_time_to_reschedule(self):
reschedule_cycle = Config.params["simulation"]["reschedule_cycle"]
last_time = self.last_schedule_time
next_time = (get_seconds_after(last_time, reschedule_cycle)
if last_time is not None else None)
return last_time is None or next_time <= self.now
def __reschedule(self):
schedule = self.scheduler.schedule(self)
self.airport.apply_schedule(schedule)
if not Config.params["simulator"]["test_mode"]:
self.analyst.observe_on_reschedule(self)
@property
def now(self):
"""Return the current time of the simulation."""
return self.clock.now
def __print_stats(self):
self.scenario.print_stats()
self.airport.print_stats()
def __getstate__(self):
__dict = dict(self.__dict__)
del __dict["logger"]
__dict["uncertainty"] = None
__dict["routing_expert"] = None
return __dict
def __setstate__(self, new_dict):
self.__dict__.update(new_dict)
def set_quiet(self, logger):
"""Sets the simulation and its subclass to quiet mode where the logger
doesn't print that many stuff.
"""
self.logger = logger
self.airport.set_quiet(logger)
self.scenario.set_quiet(logger)
self.routing_expert.set_quiet(logger)
@property
def copy(self):
"""Obtains a immutable copy of this simulation."""
# NOTE: If uncertainty is not None, call inject() in tick().
return ClonedSimulation(self)
class ClonedSimulation:
"""ClonedSimulation is a copy of a `Simulation` object; however, it shares
objects with the source `Simulation` object on immutable data objects in
order to avoid the overhead in copying.
The `tick()` function is divided into `pre_tick`, `tick`, and `post_tick`
to allow the called (mainly the scheduler) to inject operations in between.
"""
def __init__(self, simulation):
self.clock = deepcopy(simulation.clock)
self.airport = deepcopy(simulation.airport)
self.scenario = copy(simulation.scenario)
# Sets up the logger in quiet mode
self.logger = logging.getLogger("QUIET_MODE")
self.airport.set_quiet(self.logger)
self.scenario.set_quiet(self.logger)
def pre_tick(self):
"""Adds aircraft before a tick."""
self.airport.add_aircrafts(self.scenario, self.now,
self.clock.sim_time)
def tick(self):
"""Turn off the logger, reschedule, and analyst."""
self.logger.debug("\nPredicted Time: %s", self.now)
self.airport.tick()
try:
self.clock.tick()
except ClockException as error:
raise error
def post_tick(self):
"""Removes aircraft after a tick."""
self.airport.remove_aircrafts(self.scenario)
@property
def now(self):
"""Returns simulated time of now."""
return self.clock.now
def get_scheduler():
"""Loads the requested scheduler."""
scheduler_name = Config.params["scheduler"]["name"]
return importlib.import_module("scheduler." + scheduler_name).Scheduler()
class SimulationException(Exception):
"""Extends `Exception` for the simulation errors."""
pass