diff --git a/.github/workflows/checks.yaml b/.github/workflows/checks.yaml index 113c7e6..4365257 100644 --- a/.github/workflows/checks.yaml +++ b/.github/workflows/checks.yaml @@ -25,6 +25,8 @@ jobs: run: flake8 pro_tes/ setup.py - name: Lint with Pylint run: pylint pro_tes/ setup.py + - name: Type check with mypy + run: mypy pro_tes/ test: name: Run tests runs-on: ubuntu-latest diff --git a/pro_tes/ga4gh/tes/models.py b/pro_tes/ga4gh/tes/models.py index 4d8d3b6..2f0b039 100644 --- a/pro_tes/ga4gh/tes/models.py +++ b/pro_tes/ga4gh/tes/models.py @@ -38,7 +38,7 @@ class TesCreateTaskResponse(CustomBaseModel): class TesExecutor(CustomBaseModel): image: str = Field( - [""], + default=[""], description=( "Name of the container image. The string will be passed as " " the image\nargument to the containerization run command. " @@ -46,29 +46,29 @@ class TesExecutor(CustomBaseModel): " - `gcr.io/my-org/my-image`\n - " " `myregistryhost:5000/fedora/httpd:version1.0`" ), - example="ubuntu:20.04", + examples=["ubuntu:20.04"], ) command: list[str] = Field( - [""], + default=[""], description=( "A sequence of program arguments to execute, where the " " first argument\nis the program to execute (i.e. argv). " ' Example:\n```\n{\n "command" : ["/bin/md5",' ' "/data/file1"]\n}\n```' ), - example=["/bin/md5", "/data/file1"], + examples=["/bin/md5", "/data/file1"], ) workdir: Optional[str] = Field( - None, + default=None, description=( "The working directory that the command will be executed " " in.\nIf not defined, the system will default to the directory" " set by\nthe container image." ), - example="/data/", + examples=["/data/"], ) stdin: Optional[str] = Field( - None, + default=None, description=( "Path inside the container to a file which will be " " piped\nto the executor's stdin. This must be an absolute path. " @@ -79,53 +79,53 @@ class TesExecutor(CustomBaseModel): ' STDIN\n```\n{\n "command" : ["/bin/md5"],\n "stdin" ' ' : "/data/file1"\n}\n```' ), - example="/data/file1", + examples=["/data/file1"], ) stdout: Optional[str] = Field( - None, + default=None, description=( "Path inside the container to a file where the " " executor's\nstdout will be written to. Must be an absolute" ' path. Example:\n```\n{\n "stdout" :' ' "/tmp/stdout.log"\n}\n```' ), - example="/tmp/stdout.log", + examples=["/tmp/stdout.log"], ) stderr: Optional[str] = Field( - None, + default=None, description=( "Path inside the container to a file where the " " executor's\nstderr will be written to. Must be an absolute" ' path. Example:\n```\n{\n "stderr" :' ' "/tmp/stderr.log"\n}\n```' ), - example="/tmp/stderr.log", + examples=["/tmp/stderr.log"], ) env: Optional[dict[str, str]] = Field( - None, + default=None, description=( "Enviromental variables to set within the container. " ' Example:\n```\n{\n "env" : {\n "ENV_CONFIG_PATH"' ' : "/data/config.file",\n "BLASTDB" : ' ' "/data/GRC38",\n "HMMERDB" : "/data/hmmer"\n }\n}\n```' ), - example={"BLASTDB": "/data/GRC38", "HMMERDB": "/data/hmmer"}, + examples=[{"BLASTDB": "/data/GRC38", "HMMERDB": "/data/hmmer"}], ) class TesExecutorLog(CustomBaseModel): start_time: Optional[str] = Field( - None, + default=None, description="Time the executor started, in RFC 3339 format.", - example="2020-10-02T10:00:00-05:00", + examples=["2020-10-02T10:00:00-05:00"], ) end_time: Optional[str] = Field( - None, + default=None, description="Time the executor ended, in RFC 3339 format.", - example="2020-10-02T11:00:00-05:00", + examples=["2020-10-02T11:00:00-05:00"], ) stdout: Optional[str] = Field( - None, + default=None, description=( "Stdout content.\n\nThis is meant for convenience. No " " guarantees are made about the content.\nImplementations may" @@ -137,7 +137,7 @@ class TesExecutorLog(CustomBaseModel): ), ) stderr: Optional[str] = Field( - None, + default=None, description=( "Stderr content.\n\nThis is meant for convenience. No " " guarantees are made about the content.\nImplementations may" @@ -150,7 +150,7 @@ class TesExecutorLog(CustomBaseModel): ) # exit code not optional according to specs, but Funnel may return 'null' exit_code: Optional[int] = Field( - None, + default=None, description="Exit code.", ) @@ -164,14 +164,14 @@ class TesInput(CustomBaseModel): name: Optional[str] = None description: Optional[str] = None url: Optional[str] = Field( - None, + default=None, description=( 'REQUIRED, unless "content" is set.\n\nURL in long term ' " storage, for example:\n - s3://my-object-store/file1\n - " " gs://my-bucket/file2\n - file:///path/to/my/file\n - " " /path/to/my/file" ), - example="s3://my-object-store/file1", + examples=["s3://my-object-store/file1"], ) path: str = Field( ..., @@ -179,11 +179,11 @@ class TesInput(CustomBaseModel): "Path of the file inside the container.\nMust be an " " absolute path." ), - example="/data/file1", + examples=["/data/file1"], ) type: TesFileType content: Optional[str] = Field( - None, + default=None, description=( "File content literal.\n\nImplementations should support a " " minimum of 128 KiB in this field\nand may define their own " @@ -195,10 +195,10 @@ class TesInput(CustomBaseModel): class TesOutput(CustomBaseModel): name: Optional[str] = Field( - None, description="User-provided name of output file" + default=None, description="User-provided name of output file" ) description: Optional[str] = Field( - None, + default=None, description=( "Optional users provided description field, can be used " " for documentation." @@ -244,32 +244,36 @@ class TesOutputFileLog(CustomBaseModel): " as a string\nbecause official JSON doesn't support int64" " numbers." ), - example=["1024"], + examples=["1024"], ) class TesResources(CustomBaseModel): cpu_cores: Optional[int] = Field( - None, description="Requested number of CPUs", example=4 + default=None, description="Requested number of CPUs", examples=[4] ) preemptible: Optional[bool] = Field( - None, + default=None, description=( "Define if the task is allowed to run on preemptible " " compute instances,\nfor example, AWS Spot. This option may have" " no effect when utilized\non some backends that don't" " have the concept of preemptible jobs." ), - example=False, + examples=[False], ) ram_gb: Optional[float] = Field( - None, description="Requested RAM required in gigabytes (GB)", example=8 + default=None, + description="Requested RAM required in gigabytes (GB)", + examples=[8] ) disk_gb: Optional[float] = Field( - None, description="Requested disk size in gigabytes (GB)", example=40 + default=None, + description="Requested disk size in gigabytes (GB)", + examples=[40] ) zones: Optional[list[str]] = Field( - None, + default=None, description=( "Request that the task be run in these compute zones. How " " this string\nis utilized will be dependent on the backend" @@ -278,7 +282,7 @@ class TesResources(CustomBaseModel): " define\npriorty queue to which the job is " " assigned." ), - example="us-west-1", + examples=["us-west-1"], ) @@ -298,16 +302,16 @@ class ServiceType(CustomBaseModel): " namespace (e.g. your organization's reverse domain " " name)." ), - example="org.ga4gh", + examples=["org.ga4gh"], ) - artifact: str = Field( + artifact: Enum = Field( ..., description=( "Name of the API or GA4GH specification implemented. " " Official GA4GH types should be assigned as part of standards " " approval process. Custom artifacts are supported." ), - example="beacon", + examples=["beacon"], ) version: str = Field( ..., @@ -315,7 +319,7 @@ class ServiceType(CustomBaseModel): "Version of the API or specification. GA4GH specifications " " use semantic versioning." ), - example="1.0.0", + examples=["1.0.0"], ) @@ -323,12 +327,12 @@ class Organization(CustomBaseModel): name: str = Field( ..., description="Name of the organization responsible for the service", - example="My organization", + examples=["My organization"], ) url: AnyUrl = Field( ..., description="URL of the website of the organization (RFC 3986 format)", - example="https://example.com", + examples=["https://example.com"], ) @@ -342,62 +346,62 @@ class Service(CustomBaseModel): " downstream aggregator services e.g. Service" " Registry." ), - example="org.ga4gh.myservice", + examples=["org.ga4gh.myservice"], ) name: str = Field( ..., description="Name of this service. Should be human readable.", - example="My project", + examples=["My project"], ) - type: ServiceType + type: Optional[ServiceType] description: Optional[str] = Field( - None, + default=None, description=( "Description of the service. Should be human readable and " " provide information about the service." ), - example="This service provides...", + examples=["This service provides..."], ) organization: Organization = Field( ..., description="Organization providing the service" ) contactUrl: Optional[AnyUrl] = Field( - None, + default=None, description=( "URL of the contact for the provider of this service, e.g. " " a link to a contact form (RFC 3986 format), or an email " " (RFC 2368 format)." ), - example="mailto:support@example.com", + examples=["mailto:support@example.com"], ) documentationUrl: Optional[AnyUrl] = Field( - None, + default=None, description=( "URL of the documentation of this service (RFC 3986" " format).This should help someone learn how to use" " your service, including any specifics required to " " access data, e.g. authentication." ), - example="https://docs.myservice.example.com", + examples=["https://docs.myservice.example.com"], ) createdAt: Optional[datetime] = Field( - None, + default=None, description=( "Timestamp describing when the service was first deployed " " and available (RFC 3339 format)" ), - example="2019-06-04T12:58:19Z", + examples=["2019-06-04T12:58:19Z"], ) updatedAt: Optional[datetime] = Field( - None, + default=None, description=( "Timestamp describing when the service was last updated " " (RFC 3339 format)" ), - example="2019-06-04T12:58:19Z", + examples=["2019-06-04T12:58:19Z"], ) environment: Optional[str] = Field( - None, + default=None, description=( "Environment the service is running in. Use this to " " distinguish between production, development and testing/staging " @@ -405,7 +409,7 @@ class Service(CustomBaseModel): " dev, staging. However this is advised and not" " enforced." ), - example="test", + examples=["test"], ) version: str = Field( ..., @@ -416,7 +420,7 @@ class Service(CustomBaseModel): " should be changed whenever the service is" " updated." ), - example="1.0.0", + examples=["1.0.0"], ) @@ -438,7 +442,7 @@ class TesNextTes(CustomBaseModel): url: str = Field( ..., description="TES server to which the task was forwarded.", - example="https://my.tes.instance/", + examples=["https://my.tes.instance/"], ) id: str = Field( ..., @@ -446,7 +450,7 @@ class TesNextTes(CustomBaseModel): "Task identifier assigned by the " "TES server to which the task was forwarded." ), - example="job-0012345", + examples=["job-0012345"], ) forwarded_to: Optional[TesNextTes] = None @@ -455,7 +459,7 @@ class Metadata(CustomBaseModel): """Create model instance for metadata.""" forwarded_to: Optional[TesNextTes] = Field( - None, + default=None, description="TaskLog describes logging information related to a Task", ) @@ -465,21 +469,21 @@ class TesTaskLog(CustomBaseModel): ..., description="Logs for each executor" ) metadata: Optional[Metadata] = Field( - None, + default=None, description=( "Arbitrary logging metadata included by the implementation." ), - example={"host": "worker-001", "slurmm_id": 123456}, + examples=[{"host": "worker-001", "slurmm_id": 123456}], ) start_time: Optional[str] = Field( - None, + default=None, description="When the task started, in RFC 3339 format.", - example="2020-10-02T10:00:00-05:00", + examples=["2020-10-02T10:00:00-05:00"], ) end_time: Optional[str] = Field( - None, + default=None, description="When the task ended, in RFC 3339 format.", - example="2020-10-02T11:00:00-05:00", + examples=["2020-10-02T11:00:00-05:00"], ) outputs: list[TesOutputFileLog] = Field( ..., @@ -489,7 +493,7 @@ class TesTaskLog(CustomBaseModel): ), ) system_logs: Optional[list[str]] = Field( - None, + default=None, description=( "System logs are any logs the system decides are relevant, " " \nwhich are not tied directly to an Executor" @@ -505,17 +509,17 @@ class TesTaskLog(CustomBaseModel): class TesServiceType(ServiceType): - artifact: Artifact = Field(..., example="tes") + artifact: Artifact = Field(..., examples=["tes"]) class TesServiceInfo(Service): storage: Optional[list[str]] = Field( - None, + default=None, description=( "Lists some, but not necessarily all, storage locations " " supported\nby the service." ), - example=[ + examples=[ "file:///path/to/local/funnel-storage", "s3://ohsu-compbio-funnel/storage", ], @@ -525,35 +529,41 @@ class TesServiceInfo(Service): class TesTask(CustomBaseModel): id: Optional[str] = Field( - None, + default=None, description="Task identifier assigned by the server.", - example="job-0012345", + examples=["job-0012345"], ) state: Optional[TesState] = None - name: Optional[str] = Field(None, description="User-provided task name.") + name: Optional[str] = Field( + default=None, + description="User-provided task name." + ) description: Optional[str] = Field( - None, + default=None, description=( "Optional user-provided description of task for " " documentation purposes." ), ) inputs: Optional[list[TesInput]] = Field( - None, + default=None, description=( "Input files that will be used by the task. Inputs will be " " downloaded\nand mounted into the executor container as" " defined by the task request\ndocument." ), - example=[{"url": "s3://my-object-store/file1", "path": "/data/file1"}], + examples=[[{ + "url": "s3://my-object-store/file1", + "path": "/data/file1" + }]] ) outputs: Optional[list[TesOutput]] = Field( - None, + default=None, description=( "Output files.\nOutputs will be uploaded from the executor " " container to long-term storage." ), - example=[ + examples=[ { "path": "/data/outfile", "url": "s3://my-object-store/outfile-1", @@ -563,7 +573,7 @@ class TesTask(CustomBaseModel): ) resources: Optional[TesResources] = None executors: list[TesExecutor] = Field( - [TesExecutor], + default=[TesExecutor], description=( "An array of executors to be run. Each of the executors " " will run one\nat a time sequentially. Each executor is a" @@ -575,7 +585,7 @@ class TesTask(CustomBaseModel): ), ) volumes: Optional[list[str]] = Field( - None, + default=None, description=( "Volumes are directories which may be used to share data " " between\nExecutors. Volumes are initialized as empty" @@ -588,10 +598,10 @@ class TesTask(CustomBaseModel): " a `docker run -v` flag where\nthe container path is " " the same for each executor)." ), - example=["/vol/A/"], + examples=[["/vol/A/"]], ) tags: Optional[dict[str, str]] = Field( - None, + default=None, description=( "A key-value map of arbitrary tags. These can be used to " " store meta-data\nand annotations about a task." @@ -599,10 +609,10 @@ class TesTask(CustomBaseModel): ' "cwl-01234",\n "PROJECT_GROUP" : "alice-lab"\n ' " }\n}\n```" ), - example={"WORKFLOW_ID": "cwl-01234", "PROJECT_GROUP": "alice-lab"}, + examples=[{"WORKFLOW_ID": "cwl-01234", "PROJECT_GROUP": "alice-lab"}], ) logs: Optional[list[TesTaskLog]] = Field( - None, + default=None, description=( "Task logging information.\nNormally, this will contain " " only one entry, but in the case where\na task fails and is " @@ -610,12 +620,12 @@ class TesTask(CustomBaseModel): ), ) creation_time: Optional[str] = Field( - None, + default=None, description=( "Date + time the task was created, in RFC 3339 format.\n " " This is set by the system, not the client." ), - example="2020-10-02T10:00:00-05:00", + examples=["2020-10-02T10:00:00-05:00"], ) class Config: @@ -635,7 +645,7 @@ class TesListTasksResponse(CustomBaseModel): ), ) next_page_token: Optional[str] = Field( - None, + default=None, description=( "Token used to return the next page of results. This value " " can be used\nin the `page_token` field of the next ListTasks " @@ -709,7 +719,7 @@ class DbDocument(CustomBaseModel): """ task: TesTask = TesTask() - task_original: TesTask = TesTask(executors=[]) + task_original: TesTask = TesTask(executors=[]) # type: ignore user_id: Optional[str] = None worker_id: str = "" basic_auth: BasicAuth = BasicAuth() diff --git a/pro_tes/ga4gh/tes/service_info.py b/pro_tes/ga4gh/tes/service_info.py index a9c1fc0..f9a24c8 100644 --- a/pro_tes/ga4gh/tes/service_info.py +++ b/pro_tes/ga4gh/tes/service_info.py @@ -24,7 +24,7 @@ class ServiceInfo: def __init__(self) -> None: """Construct class instance.""" self.db_client: Collection = ( - current_app.config.foca.db.dbs["taskStore"] + current_app.config.foca.db.dbs["taskStore"] # type: ignore .collections["service_info"] .client ) @@ -65,7 +65,7 @@ def init_service_info_from_config(self) -> None: Set service info only if it does not yet exist. """ - service_info_conf = current_app.config.foca.serviceInfo + service_info_conf = current_app.config.foca.serviceInfo # type: ignore try: service_info_db = self.get_service_info() except NotFound: diff --git a/pro_tes/ga4gh/tes/task_runs.py b/pro_tes/ga4gh/tes/task_runs.py index 8bfccc3..308aec1 100644 --- a/pro_tes/ga4gh/tes/task_runs.py +++ b/pro_tes/ga4gh/tes/task_runs.py @@ -16,6 +16,7 @@ import requests import tes # type: ignore from tes.models import Task # type: ignore +from pro_tes.ga4gh.tes.models import Metadata from pro_tes.exceptions import ( BadRequest, @@ -57,7 +58,7 @@ class TaskRuns: def __init__(self) -> None: """Construct object instance.""" - self.foca_config: Config = current_app.config.foca + self.foca_config: Config = current_app.config.foca # type: ignore self.db_client: Collection = ( self.foca_config.db.dbs["taskStore"].collections["tasks"].client ) @@ -84,7 +85,9 @@ def create_task( # pylint: disable=too-many-statements,too-many-branches # apply middlewares mw_handler = MiddlewareHandler() - mw_handler.set_middlewares(paths=current_app.config.foca.middlewares) + mw_handler.set_middlewares( + paths=current_app.config.foca.middlewares # type: ignore + ) logger.debug(f"Middlewares registered: {mw_handler.middlewares}") request_modified = mw_handler.apply_middlewares(request=request) @@ -276,7 +279,7 @@ def list_tasks(self, **kwargs) -> dict: view = kwargs.get("view", "BASIC") projection = self._set_projection(view=view) - name_prefix: str = kwargs.get("name_prefix") + name_prefix: str = str(kwargs.get("name_prefix")) if name_prefix is not None: filter_dict["task_original.name"] = {"$regex": f"^{name_prefix}"} @@ -355,7 +358,7 @@ def cancel_task(self, id: str, **kwargs) -> dict: if document is None: logger.error(f"task '{id}' not found.") raise TaskNotFound - db_document = DbDocument(**document) + db_document: DbDocument = DbDocument(**document) if db_document.task.state in States.CANCELABLE: db_connector = DbDocumentConnector( @@ -366,10 +369,27 @@ def cancel_task(self, id: str, **kwargs) -> dict: f"{db_document.tes_endpoint.host.rstrip('/')}/" f"{db_document.tes_endpoint.base_path.strip('/')}" ) + + _logs = db_document.task.logs + assert isinstance( + _logs, + (list, tuple) + ), "task logs is not indexable" + + _metadata = _logs[0].metadata + assert isinstance( + _metadata, + Metadata + ), "task metadata is None" + if self.store_logs: - task_id = db_document.task.logs[0].metadata.forwarded_to.id + assert ( + _metadata.forwarded_to is not None + ), "link to next TES is None" + task_id = _metadata.forwarded_to.id else: - task_id = db_document.task.logs[0].metadata["remote_task_id"] + task_id = _metadata["remote_task_id"] # type: ignore + logger.info( "Trying to cancel task with task identifier" f" '{task_id}' and worker job" @@ -421,6 +441,7 @@ def _write_doc_to_db( except DuplicateKeyError: continue assert document is not None + assert document.task.id is not None return document.task.id, document.worker_id raise DuplicateKeyError("Could not insert document into database.") @@ -603,10 +624,13 @@ def _update_task_metadata( Returns: The updated database document. """ - for logs in db_document.task.logs: + assert db_document.task.logs is not None + logs: list[TesTaskLog] = db_document.task.logs + for log in logs: + assert log.metadata is not None tesNextTes_obj = TesNextTes(id=remote_task_id, url=tes_url) - if logs.metadata.forwarded_to is None: - logs.metadata.forwarded_to = tesNextTes_obj + if log.metadata.forwarded_to is None: + log.metadata.forwarded_to = tesNextTes_obj return db_document @staticmethod diff --git a/pro_tes/plugins/middlewares/task_distribution/distance.py b/pro_tes/plugins/middlewares/task_distribution/distance.py index 9f69b28..505a701 100644 --- a/pro_tes/plugins/middlewares/task_distribution/distance.py +++ b/pro_tes/plugins/middlewares/task_distribution/distance.py @@ -228,7 +228,7 @@ def _get_ips(*args: AnyUrl) -> dict[AnyUrl, str]: ips: dict[AnyUrl, str] = {} for uri in args: try: - ips[uri] = gethostbyname(urlparse(strip_auth(uri)).netloc) + ips[uri] = gethostbyname(urlparse(strip_auth(str(uri))).netloc) except gaierror as exc: raise MiddlewareException( f"Could not determine IP address for URI: {uri}" diff --git a/pro_tes/tasks/track_task_progress.py b/pro_tes/tasks/track_task_progress.py index 3173515..09b6fa0 100644 --- a/pro_tes/tasks/track_task_progress.py +++ b/pro_tes/tasks/track_task_progress.py @@ -50,7 +50,7 @@ def task__track_task_progress( # pylint: disable=too-many-arguments user: User-name for basic authentication. password: Password for basic authentication. """ - foca_config: Config = current_app.config.foca + foca_config: Config = current_app.config.foca # type: ignore controller_config: dict = foca_config.controllers["post_task"] # create database client @@ -110,6 +110,8 @@ def task__track_task_progress( # pylint: disable=too-many-arguments # updating task after task is finished document.task.state = task_converted.state + assert task_converted.logs is not None + assert document.task.logs is not None for index, logs in enumerate(task_converted.logs): document.task.logs[index].logs = logs.logs document.task.logs[index].outputs = logs.outputs diff --git a/requirements_dev.txt b/requirements_dev.txt index f6f1bfe..6674972 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -7,3 +7,5 @@ mypy>=0.990 pylint>=2.15.5 pytest>=7.2.0 python-semantic-release>=7.32.2 +mypy>=1.8.0 +types-python-dateutil>=2.8.19.20240106 \ No newline at end of file