Skip to content

Commit

Permalink
Refactor scenario run process into own module
Browse files Browse the repository at this point in the history
  • Loading branch information
anamileva committed Aug 5, 2019
1 parent d5ba379 commit 2da1e31
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 82 deletions.
113 changes: 31 additions & 82 deletions ui/api/flask_local_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,17 @@

import atexit
from flask import Flask
from flask_socketio import SocketIO, emit
from flask_socketio import SocketIO
import os
import psutil
import signal
import subprocess
import sys

from flask_restful import Api

# Gridpath modules

# API resources
from ui.api.common_functions import connect_to_database
# Database operations functions (Socket IO)
from ui.api.db_ops.add_scenario import add_or_update_scenario

# RESTful API resources
from ui.api.resources.home import ServerStatus
from ui.api.resources.scenario_detail import ScenarioDetailName, \
ScenarioDetailAll, ScenarioDetailFeatures, ScenarioDetailTemporal, \
Expand Down Expand Up @@ -77,6 +74,10 @@
ViewDataLocalCapacityBAs, ViewDataProjectLocalCapacityBAs, \
ViewDataLocalCapacityReq, ViewDataProjectLocalCapacityChars

# Scenario process functions (Socket IO)
from ui.api.scenario_process import launch_scenario_process, \
check_scenario_process_status


# Define custom signal handlers
def sigterm_handler(signal, frame):
Expand Down Expand Up @@ -906,93 +907,41 @@ def socket_add_or_edit_new_scenario(msg):
# TODO: incomplete functionality

@socketio.on('launch_scenario_process')
def launch_scenario_process(client_message):
def socket_launch_scenario_process(client_message):
"""
Launch a process to run the scenario.
:param client_message:
:return:
Launch and manage a scenario run process.
"""
scenario_id = str(client_message['scenario'])

# Get the scenario name for this scenario ID
# TODO: pass both from the client and do a check here that they exist
io, c = connect_to_database(db_path=DATABASE_PATH)
scenario_name = c.execute(
"SELECT scenario_name FROM scenarios WHERE scenario_id = {}".format(
scenario_id
)
).fetchone()[0]

# First, check if the scenario is already running
process_status = check_scenario_process_status(
db_path=DATABASE_PATH,
client_message=client_message
# Launch the process, get back the process object, scenario_id,
# and scenario_name
p, scenario_id, scenario_name = launch_scenario_process(
db_path=DATABASE_PATH,
gridpath_directory=GRIDPATH_DIRECTORY,
scenario_status=SCENARIO_STATUS,
client_message=client_message
)
if process_status:
# TODO: what should happen if the scenario is already running? At a
# minimum, it should be a warning and perhaps a way to stop the
# process and re-start the scenario run.
print("Scenario already running.")
emit(
'scenario_already_running',
'scenario already running'
)
# If the scenario is not found among the running processes, launch a
# multiprocessing process
else:
print("Starting process for scenario_id " + scenario_id)
# p = multiprocessing.Process(
# target=run_scenario,
# name=scenario_id,
# args=(scenario_name,),
# )
# p.start()
os.chdir(os.path.join(GRIDPATH_DIRECTORY, 'gridpath'))
p = subprocess.Popen(
[sys.executable, '-u',
os.path.join(GRIDPATH_DIRECTORY, 'gridpath',
'run_start_to_end.py'),
'--log', '--scenario', scenario_name])

# Needed to ensure child processes are terminated when server exits
atexit.register(p.terminate)

# Save the scenario's process ID
# TODO: we should save to Electron instead, as closing the UI will
# delete the global data for the server
global SCENARIO_STATUS
SCENARIO_STATUS[(scenario_id, scenario_name)] = dict()
SCENARIO_STATUS[(scenario_id, scenario_name)]['process_id'] = p.pid
# Needed to ensure child processes are terminated when server exits
atexit.register(p.terminate)

# Save the scenario's process ID
# TODO: we should save to Electron instead, as closing the UI will
# delete the global data for the server
SCENARIO_STATUS[(scenario_id, scenario_name)] = dict()
SCENARIO_STATUS[(scenario_id, scenario_name)]['process_id'] = p.pid


# TODO: implement functionality to check on the process from the UI (
# @socketio is not linked to anything yet)
@socketio.on('check_scenario_process_status')
def check_scenario_process_status(db_path, client_message):
def socket_check_scenario_process_status(client_message):
"""
Check if there is any running process that contains the given scenario
:param client_message:
:return:
"""
scenario_id = str(client_message['scenario'])
io, c = connect_to_database(db_path=db_path)
scenario_name = c.execute(
"SELECT scenario_name FROM scenarios WHERE scenario_id = {}".format(
scenario_id
)
).fetchone()[0]

global SCENARIO_STATUS

if (scenario_id, scenario_name) in SCENARIO_STATUS.keys():
pid = SCENARIO_STATUS[(scenario_id, scenario_name)]['process_id']
# Process ID saved in global and process is still running
if pid in [p.pid for p in psutil.process_iter()] \
and psutil.Process(pid).status() == 'running':
return True
else:
# Process ID saved in global but process is not running
return False
else:
return False
check_scenario_process_status(db_path=DATABASE_PATH,
scenario_status=SCENARIO_STATUS,
client_message=client_message)


def main():
Expand Down
92 changes: 92 additions & 0 deletions ui/api/scenario_process.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Copyright 2019 Blue Marble Analytics LLC. All rights reserved.
import os
import subprocess
import sys

import psutil
from flask_socketio import emit

from ui.api.common_functions import connect_to_database


def launch_scenario_process(
db_path, gridpath_directory, scenario_status, client_message
):
"""
:param db_path:
:param gridpath_directory:
:param scenario_status:
:param client_message:
:return:
Launch a process to run the scenario.
"""
scenario_id = str(client_message['scenario'])

# Get the scenario name for this scenario ID
# TODO: pass both from the client and do a check here that they exist
io, c = connect_to_database(db_path=db_path)
scenario_name = c.execute(
"SELECT scenario_name FROM scenarios WHERE scenario_id = {}".format(
scenario_id
)
).fetchone()[0]

# First, check if the scenario is already running
process_status = check_scenario_process_status(
db_path=db_path,
scenario_status=scenario_status,
client_message=client_message
)
if process_status:
# TODO: what should happen if the scenario is already running? At a
# minimum, it should be a warning and perhaps a way to stop the
# process and re-start the scenario run.
print("Scenario already running.")
emit(
'scenario_already_running',
'scenario already running'
)
# If the scenario is not found among the running processes, launch a
# multiprocessing process
else:
print("Starting process for scenario_id " + scenario_id)
# p = multiprocessing.Process(
# target=run_scenario,
# name=scenario_id,
# args=(scenario_name,),
# )
# p.start()
os.chdir(os.path.join(gridpath_directory, 'gridpath'))
p = subprocess.Popen(
[sys.executable, '-u',
os.path.join(gridpath_directory, 'gridpath',
'run_start_to_end.py'),
'--log', '--scenario', scenario_name])

return p, scenario_id, scenario_name


def check_scenario_process_status(db_path, scenario_status, client_message):
"""
Check if there is any running process that contains the given scenario
"""
scenario_id = str(client_message['scenario'])
io, c = connect_to_database(db_path=db_path)
scenario_name = c.execute(
"SELECT scenario_name FROM scenarios WHERE scenario_id = {}".format(
scenario_id
)
).fetchone()[0]

if (scenario_id, scenario_name) in scenario_status.keys():
pid = scenario_status[(scenario_id, scenario_name)]['process_id']
# Process ID saved in global and process is still running
if pid in [p.pid for p in psutil.process_iter()] \
and psutil.Process(pid).status() == 'running':
return True
else:
# Process ID saved in global but process is not running
return False
else:
return False

0 comments on commit 2da1e31

Please sign in to comment.