Skip to content

Commit

Permalink
Adding AC & DC HLC tests to core_tests/startup_test
Browse files Browse the repository at this point in the history
Signed-off-by: Sebastian Lukas <[email protected]>
  • Loading branch information
SebaLukas committed Aug 13, 2024
1 parent f4e273b commit 31f88cc
Showing 1 changed file with 68 additions and 5 deletions.
73 changes: 68 additions & 5 deletions tests/core_tests/startup_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import threading
import queue

from enum import Enum

from everest.testing.core_utils.fixtures import *
from everest.testing.core_utils.everest_core import EverestCore, Requirement

Expand All @@ -20,6 +22,10 @@ async def test_000_startup_check(everest_core: EverestCore):
logging.info(">>>>>>>>> test_000_startup_check <<<<<<<<<")
everest_core.start()

class Mode(Enum):
Basic = 0
HlcAc = 1
HlcDc = 2

class ProbeModule:
def __init__(self, session: RuntimeSession):
Expand All @@ -32,6 +38,7 @@ def __init__(self, session: RuntimeSession):
self._handle_evse_manager_event)

self._msg_queue = queue.Queue()
self._energy_wh_import = 0

self._ready_event = threading.Event()
self._mod = m
Expand All @@ -42,8 +49,10 @@ def _ready(self):

def _handle_evse_manager_event(self, args):
self._msg_queue.put(args['event'])
if 'transaction_finished' in args:
self._energy_wh_import = args['transaction_finished']['meter_value']['energy_Wh_import']['total']

def test(self, timeout: float) -> bool:
def test(self, timeout: float, mode: Mode) -> bool:
end_of_time = time.time() + timeout

logging.info("Wating for ready event...")
Expand All @@ -57,9 +66,15 @@ def test(self, timeout: float) -> bool:
# enable simulator
self._mod.call_command(car_sim_ff, 'enable', {'value': True})

cmd = {'value': 'sleep 1;iec_wait_pwr_ready;sleep 1;draw_power_regulated 16,3;sleep 10;unplug'}

if mode == Mode.HlcAc:
cmd = {'value': 'sleep 1;iso_wait_slac_matched;iso_start_v2g_session AC;iso_wait_pwr_ready;iso_draw_power_regulated 16,3;iso_wait_for_stop 10;iso_wait_v2g_session_stopped;unplug'}
elif mode == Mode.HlcDc:
cmd = {'value': 'sleep 1;iso_wait_slac_matched;iso_start_v2g_session DC;iso_wait_pwr_ready;iso_wait_for_stop 10;iso_wait_v2g_session_stopped;unplug'}

# start charging simulation
self._mod.call_command(car_sim_ff, 'execute_charging_session', {
'value': 'sleep 1;iec_wait_pwr_ready;sleep 1;draw_power_regulated 16,3;sleep 5;unplug'})
self._mod.call_command(car_sim_ff, 'execute_charging_session', cmd)

expected_events = ['TransactionStarted', 'TransactionFinished']

Expand All @@ -80,7 +95,9 @@ def test(self, timeout: float) -> bool:
expected_events.pop(0)
except queue.Empty:
return False

logging.info("Total energy import: %f Wh", self._energy_wh_import)
if self._energy_wh_import <= 0:
return False
return True


Expand All @@ -105,4 +122,50 @@ async def test_001_start_test_module(everest_core: EverestCore):
everest_core.all_modules_started_event.set()
logging.info("set all modules started event...")

assert probe.test(20)
assert probe.test(20, Mode.Basic)

@pytest.mark.everest_core_config('config-sil.yaml')
@pytest.mark.asyncio
async def test_002_start_test_module_ac_hlc(everest_core: EverestCore):
logging.info(">>>>>>>>> test_002_start_test_module_ac_hlc <<<<<<<<<")

test_connections = {
'test_control': [Requirement('ev_manager', 'main')],
'connector_1': [Requirement('connector_1', 'evse')]
}

everest_core.start(standalone_module='probe', test_connections=test_connections)
logging.info("everest-core ready, waiting for probe module")

session = RuntimeSession(str(everest_core.prefix_path), str(everest_core.everest_config_path))

probe = ProbeModule(session)

if everest_core.status_listener.wait_for_status(10, ["ALL_MODULES_STARTED"]):
everest_core.all_modules_started_event.set()
logging.info("set all modules started event...")

assert probe.test(40, Mode.HlcAc)

@pytest.mark.everest_core_config('config-sil-dc.yaml')
@pytest.mark.asyncio
async def test_003_start_test_module_dc(everest_core: EverestCore):
logging.info(">>>>>>>>> test_003_start_test_module_dc <<<<<<<<<")

test_connections = {
'test_control': [Requirement('ev_manager', 'main')],
'connector_1': [Requirement('evse_manager', 'evse')]
}

everest_core.start(standalone_module='probe', test_connections=test_connections)
logging.info("everest-core ready, waiting for probe module")

session = RuntimeSession(str(everest_core.prefix_path), str(everest_core.everest_config_path))

probe = ProbeModule(session)

if everest_core.status_listener.wait_for_status(10, ["ALL_MODULES_STARTED"]):
everest_core.all_modules_started_event.set()
logging.info("set all modules started event...")

assert probe.test(40, Mode.HlcDc)

0 comments on commit 31f88cc

Please sign in to comment.