diff --git a/src/fmu/tools/rms/generate_petro_jobs_for_field_update.py b/src/fmu/tools/rms/generate_petro_jobs_for_field_update.py index e1eec0c4..aee3a4ae 100644 --- a/src/fmu/tools/rms/generate_petro_jobs_for_field_update.py +++ b/src/fmu/tools/rms/generate_petro_jobs_for_field_update.py @@ -13,19 +13,21 @@ import copy import json import pprint -import warnings import yaml -try: - import _roxar # type: ignore -except ModuleNotFoundError: +from fmu.tools import ROXAR + +if ROXAR: try: - import _rmsapi as _roxar # type: ignore - import roxar.jobs # type: ignore - import roxar.jobs.Job # type: ignore + import rmsapi + import rmsapi.jobs as jobs except ModuleNotFoundError: - warnings.warn("This script only supports interactive RMS usage", UserWarning) + import roxar as rmsapi + import roxar.jobs as jobs +else: + pass + from typing import Dict, List, Tuple, no_type_check # Fixed global variables @@ -140,7 +142,7 @@ def main(config_file: str, debug: bool = False, report_unused: bool = False) -> @no_type_check def check_rms_project(project): - if not isinstance(project, _roxar.Project): # type: ignore + if not isinstance(project, rmsapi.Project): # type: ignore raise RuntimeError("This run must be ran in an RoxAPI environment!") @@ -250,7 +252,7 @@ def sort_new_var_names( def get_original_job_settings( owner_string_list: List[str], job_type: str, job_name: str ) -> dict: - original_job = roxar.jobs.Job.get_job(owner_string_list, job_type, job_name) + original_job = jobs.Job.get_job(owner_string_list, job_type, job_name) return original_job.get_arguments(skip_defaults=False) @@ -260,7 +262,7 @@ def create_copy_of_job( original_job_arguments: Dict, new_job_name: str, ): - new_job = roxar.jobs.Job.create(owner_string_list, job_type, new_job_name) + new_job = jobs.Job.create(owner_string_list, job_type, new_job_name) new_job.set_arguments(original_job_arguments) return new_job @@ -499,7 +501,7 @@ def create_new_petro_job_per_facies( new_job = create_copy_of_job( owner_string_list, JOB_TYPE, new_job_arguments_current, new_job_name ) - ok, err_msg_list, warn_msg_list = roxar.jobs.Job.check(new_job) + ok, err_msg_list, warn_msg_list = jobs.Job.check(new_job) if not ok: print("Error messages from created job object:") for err_msg in err_msg_list: @@ -518,7 +520,7 @@ def create_new_petro_job_per_facies( def write_petro_job_to_file( owner_string_list: List[str], job_type: str, job_name: str, filename: str ) -> None: - job_instance = roxar.jobs.Job.get_job( + job_instance = jobs.Job.get_job( owner=owner_string_list, type=job_type, name=job_name ) arguments = job_instance.get_arguments(True) diff --git a/tests/rms/test_generate_petro_jobs_for_field_update.py b/tests/rms/test_generate_petro_jobs_for_field_update.py index 6bcf76c3..90e0a99b 100644 --- a/tests/rms/test_generate_petro_jobs_for_field_update.py +++ b/tests/rms/test_generate_petro_jobs_for_field_update.py @@ -219,7 +219,6 @@ def define_setting_for_original_job(spec_dict: Dict) -> Dict: { "Active": True, "AppliedTo": "INPUT_AND_OUTPUT", - "Automated": False, "Mode": "MIN", "Min": 0, "SequenceNumber": 0,