Skip to content

Commit

Permalink
Merge pull request #19 from airboxlab/fix_18
Browse files Browse the repository at this point in the history
only skip action sending on system timestep
  • Loading branch information
antoine-galataud authored Dec 18, 2023
2 parents ce3be28 + 362fc38 commit f2aa23f
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 21 deletions.
107 changes: 87 additions & 20 deletions rllibenergyplus/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,19 @@ def parse_args() -> argparse.Namespace:
default=False,
action="store_true"
)
parser.add_argument(
"--verbose",
help="In verbose mode, EnergyPlus will print to stdout",
required=False,
default=False,
action="store_true"
)
parser.add_argument(
"--eplus-timestep-duration",
help="EnergyPlus timestep duration, in fractional hour. Default is 0.25 (15 minutes)",
required=False,
default=0.25,
)
parser.add_argument(
"--output",
help="EnergyPlus output directory. Default is a generated one in /tmp/",
Expand Down Expand Up @@ -87,8 +100,12 @@ class EnergyPlusRunner:
def __init__(self, episode: int, env_config: Dict[str, Any], obs_queue: Queue, act_queue: Queue) -> None:
self.episode = episode
self.env_config = env_config
self.verbose = self.env_config["verbose"]

self.obs_queue = obs_queue
self.act_queue = act_queue
# protect act_queue from concurrent access that can happen at end of simulation
self.act_queue_mutex = threading.Lock()

self.energyplus_api = EnergyPlusAPI()
self.x: DataExchange = self.energyplus_api.exchange
Expand All @@ -98,6 +115,9 @@ def __init__(self, episode: int, env_config: Dict[str, Any], obs_queue: Queue, a
self.initialized = False
self.progress_value: int = 0
self.simulation_complete = False
# Zone timestep duration, in fractional hour. Default is 15 minutes
# Make sure to set this value to reflect your simulation timestep (ie 4 steps per hour in IDF = 0.25)
self.zone_timestep_duration = self.env_config["eplus_timestep_duration"]

# below is declaration of variables, meters and actuators
# this simulation will interact with
Expand Down Expand Up @@ -132,6 +152,7 @@ def __init__(self, episode: int, env_config: Dict[str, Any], obs_queue: Queue, a
)
}
self.actuator_handles: Dict[str, int] = {}
self.last_action = 0.0

def start(self) -> None:
self.energyplus_state = self.energyplus_api.state_manager.new_state()
Expand All @@ -140,10 +161,13 @@ def start(self) -> None:
# register callback used to track simulation progress
def _report_progress(progress: int) -> None:
self.progress_value = progress
print(f"Simulation progress: {self.progress_value}%")
if self.verbose:
print(f"Simulation progress: {self.progress_value}%")

runtime.callback_progress(self.energyplus_state, _report_progress)

runtime.set_console_output_status(self.energyplus_state, self.verbose)

# register callback used to collect observations
runtime.callback_end_zone_timestep_after_zone_reporting(self.energyplus_state, self._collect_obs)

Expand Down Expand Up @@ -200,6 +224,11 @@ def make_eplus_args(self) -> List[str]:
]
return eplus_args

def init_exchange(self, default_action: float) -> Dict[str, float]:
self.last_action = default_action
self.act_queue.put(default_action)
return self.obs_queue.get()

def _collect_obs(self, state_argument) -> None:
"""
EnergyPlus callback that collects output variables/meters
Expand Down Expand Up @@ -229,12 +258,31 @@ def _send_actions(self, state_argument):
if self.simulation_complete or not self._init_callback(state_argument):
return

if self.act_queue.empty():
# E+ has zone and system timesteps, a zone timestep can be made of several system timesteps
# (number varies on each iteration). We should send actions at least once per zone timestep, so we can
# resend the last action if we are iterating over system timesteps, but we need to wait for a new action
# when moving from one zone timestep to another.
sys_timestep_duration = self.x.system_time_step(state_argument)
if (
sys_timestep_duration < self.zone_timestep_duration
and self.act_queue.empty()
):
self.act_queue.put(self.last_action)

# wait for next action
with self.act_queue_mutex:
next_action = self.act_queue.get()

# end of simulation
if next_action is None:
self.simulation_complete = True
return

next_action = self.act_queue.get()
assert isinstance(next_action, float)

# keep last action to resend it if needed (see above)
self.last_action = next_action

self.x.set_actuator_value(
state=state_argument,
actuator_handle=self.actuator_handles["sat_spt"],
Expand All @@ -243,8 +291,10 @@ def _send_actions(self, state_argument):

def _init_callback(self, state_argument) -> bool:
"""initialize EnergyPlus handles and checks if simulation runtime is ready"""
self.initialized = self._init_handles(state_argument) \
and not self.x.warmup_flag(state_argument)
self.initialized = (
self._init_handles(state_argument)
and not self.x.warmup_flag(state_argument)
)
return self.initialized

def _init_handles(self, state_argument):
Expand Down Expand Up @@ -288,9 +338,17 @@ def _init_handles(self, state_argument):
return True

def _flush_queues(self):
for q in [self.obs_queue, self.act_queue]:
while not q.empty():
q.get()
# release waiting threads (if any)
if self.act_queue.empty():
self.act_queue.put(None)

while not self.obs_queue.empty():
self.obs_queue.get()

# flush actions queue after last callback was called
with self.act_queue_mutex:
while not self.act_queue.empty():
self.act_queue.get()


class EnergyPlusEnv(gym.Env):
Expand All @@ -315,6 +373,8 @@ def __init__(self, env_config: Dict[str, Any]):

# action space: supply air temperature (100 possible values)
self.action_space: Discrete = Discrete(100)
# actual range of action space (°C)
self.actual_range = (15.0, 30.0)

self.energyplus_runner: Optional[EnergyPlusRunner] = None
self.obs_queue: Optional[Queue] = None
Expand Down Expand Up @@ -346,8 +406,7 @@ def reset(
self.energyplus_runner.start()

# wait until E+ is ready.
obs = self.obs_queue.get()
self.last_obs = obs
self.last_obs = obs = self.energyplus_runner.init_exchange(default_action=self.actual_range[0])
return np.array(list(obs.values())), {}

def step(self, action):
Expand All @@ -367,23 +426,31 @@ def step(self, action):
# rescale agent decision to actuator range
sat_spt_value = self._rescale(
n=int(action), # noqa
range1=(0, self.action_space.n),
range2=(15, 30)
range1=(self.action_space.start, self.action_space.n),
range2=self.actual_range
)

# enqueue action (received by EnergyPlus through dedicated callback)
# Enqueue action (sent to EnergyPlus through dedicated callback)
# then wait to get next observation.
# timeout is set to 2s to handle end of simulation cases, which happens async
# Timeout is set to 2s to handle end of simulation cases, which happens async
# and materializes by worker thread waiting on this queue (EnergyPlus callback
# not consuming yet/anymore)
# timeout value can be increased if E+ timestep takes longer
# not consuming anymore).
# Timeout value can be increased if E+ timestep takes longer
timeout = 2
try:
self.act_queue.put(sat_spt_value, timeout=timeout)
self.last_obs = obs = self.obs_queue.get(timeout=timeout)
obs = self.obs_queue.get(timeout=timeout)
except (Full, Empty):
obs = None
pass

# obs can be None if E+ simulation is complete
# this materializes by either an empty queue or a None value received from queue
if obs is None:
done = True
obs = self.last_obs
else:
self.last_obs = obs

# compute reward
reward = self._compute_reward(obs)
Expand All @@ -402,10 +469,10 @@ def render(self, mode="human"):
def _compute_reward(obs: Dict[str, float]) -> float:
"""compute reward scalar"""
if obs["htg_spt"] > 0 and obs["clg_spt"] > 0:
tmp_rew = np.diff([
tmp_rew = np.diff(np.array([
[obs["htg_spt"], obs["iat"]],
[obs["iat"], obs["clg_spt"]]
])
]))
tmp_rew = tmp_rew[tmp_rew < 0]
tmp_rew = np.max(np.abs(tmp_rew)) if tmp_rew.size > 0 else 0
else:
Expand Down Expand Up @@ -438,7 +505,7 @@ def _rescale(
PPOConfig()
.environment(
env=EnergyPlusEnv,
env_config=vars(args)
env_config=vars(args),
)
.training(
gamma=0.95,
Expand Down
4 changes: 3 additions & 1 deletion tests/tests_basics.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@ def test_env_reset_close(self):
env = EnergyPlusEnv({
"idf": f"{root_dir}/model.idf",
"epw": f"{root_dir}/LUX_LU_Luxembourg.AP.065900_TMYx.2004-2018.epw",
"output": "/tmp/tests_output"
"output": "/tmp/tests_output",
"verbose": True,
"eplus_timestep_duration": 0.25,
})

obs, _ = env.reset()
Expand Down

0 comments on commit f2aa23f

Please sign in to comment.