diff --git a/app/api/api.py b/app/api/api.py index a07f5b1..68357a3 100644 --- a/app/api/api.py +++ b/app/api/api.py @@ -1,3 +1,4 @@ +import logging import asyncio import importlib import os.path @@ -23,6 +24,9 @@ from utils import get_settings +logging.getLogger("asyncio").setLevel(logging.ERROR) + + def get_model_server(msd_overwritten: Optional[ModelServiceDep] = None) -> FastAPI: app = _get_app(msd_overwritten) config = get_settings() diff --git a/app/api/routers/invocation.py b/app/api/routers/invocation.py index e32ecf2..c1905c5 100644 --- a/app/api/routers/invocation.py +++ b/app/api/routers/invocation.py @@ -31,7 +31,7 @@ PATH_INFO = "/info" PATH_PROCESS = "/process" -PATH_PROCESS_STREAM = "/process_stream" +PATH_PROCESS_JSON_LINES = "/process_jsonl" PATH_PROCESS_BULK = "/process_bulk" PATH_PROCESS_BULK_FILE = "/process_bulk_file" PATH_REDACT = "/redact" @@ -72,7 +72,7 @@ def get_entities_from_text(request: Request, return TextWithAnnotations(text=text, annotations=annotations) -@router.post(PATH_PROCESS_STREAM, +@router.post(PATH_PROCESS_JSON_LINES, response_class=StreamingResponse, tags=[Tags.Annotations.name], dependencies=[Depends(cms_globals.props.current_active_user)], @@ -257,9 +257,13 @@ def _send_bulk_processed_docs_metric(processed_docs: List[Dict], handler: str) - def _chunk_request_body(json_lines: str, chunk_size: int = 5) -> Iterator[pd.DataFrame]: chunk = [] + doc_idx = 0 for line in json_lines.splitlines(): json_line_obj = json.loads(line) TextStreamItem(**json_line_obj) + if "name" not in json_line_obj: + json_line_obj["name"] = str(doc_idx) + doc_idx += 1 chunk.append(json_line_obj) if len(chunk) == chunk_size: @@ -277,7 +281,7 @@ def _get_jsonlines_stream(output_stream: Iterator[Dict[str, Any]]) -> Iterator[s annotation_num = 0 for item in output_stream: if current_doc_name != "" and current_doc_name != item["doc_name"]: - cms_doc_annotations.labels(handler=PATH_PROCESS_STREAM).observe(annotation_num) + cms_doc_annotations.labels(handler=PATH_PROCESS_JSON_LINES).observe(annotation_num) current_doc_name = item["doc_name"] annotation_num += 1 yield json.dumps(item) + "\n" diff --git a/app/trainers/base.py b/app/trainers/base.py index 48b1d69..b1b3053 100644 --- a/app/trainers/base.py +++ b/app/trainers/base.py @@ -15,6 +15,7 @@ from domain import TrainingType logger = logging.getLogger("cms") +logging.getLogger("asyncio").setLevel(logging.ERROR) class TrainerCommon(object): diff --git a/tests/app/api/test_api.py b/tests/app/api/test_api.py index 9bca8d9..57ac603 100644 --- a/tests/app/api/test_api.py +++ b/tests/app/api/test_api.py @@ -29,7 +29,7 @@ def test_get_model_server(): assert {"name": "Authentication", "description": "Authenticate registered users"} in tags assert "/info" in paths assert "/process" in paths - assert "/process_stream" in paths + assert "/process_jsonl" in paths assert "/process_bulk" in paths assert "/process_bulk_file" in paths assert "/redact" in paths diff --git a/tests/app/api/test_serving_medcat.py b/tests/app/api/test_serving_medcat.py index 06413c5..f5ddea9 100644 --- a/tests/app/api/test_serving_medcat.py +++ b/tests/app/api/test_serving_medcat.py @@ -85,7 +85,7 @@ def test_process(): } -def test_process_stream(): +def test_process_jsonl(): annotations = [{ "label_name": "Spinal stenosis", "label_id": "76107001", @@ -104,7 +104,7 @@ def test_process_stream(): model_manager = ModelManager(None, None) model_manager.model_service = model_service cms_globals.model_manager_dep = lambda: model_manager - response = client.post("/process_stream", + response = client.post("/process_jsonl", data='{"name": "doc1", "text": "Spinal stenosis"}\n{"name": "doc2", "text": "Spinal stenosis"}', headers={"Content-Type": "application/x-ndjson"}) @@ -114,7 +114,7 @@ def test_process_stream(): assert json.loads(jsonlines[1]) == {"doc_name": "doc2", **annotations[0]} -def test_process_invalid_stream(): +def test_process_invalid_jsonl(): annotations = [{ "label_name": "Spinal stenosis", "label_id": "76107001", @@ -134,13 +134,34 @@ def test_process_invalid_stream(): model_manager.model_service = model_service cms_globals.model_manager_dep = lambda: model_manager - response = client.post("/process_stream", - data='invalid stream', + response = client.post("/process_jsonl", + data="invalid json lines", headers={"Content-Type": "application/x-ndjson"}) assert response.status_code == 400 assert response.json() == {"message": "Invalid JSON Lines."} - response = client.post("/process_stream", + +def test_process_unknown_jsonl_properties(): + annotations = [{ + "label_name": "Spinal stenosis", + "label_id": "76107001", + "start": 0, + "end": 15, + "accuracy": 1.0, + "meta_anns": { + "Status": { + "value": "Affirmed", + "confidence": 0.9999833106994629, + "name": "Status" + } + }, + }] + model_service.annotate.return_value = annotations + model_manager = ModelManager(None, None) + model_manager.model_service = model_service + cms_globals.model_manager_dep = lambda: model_manager + + response = client.post("/process_jsonl", data='{"unknown": "doc1", "text": "Spinal stenosis"}\n{"unknown": "doc2", "text": "Spinal stenosis"}', headers={"Content-Type": "application/x-ndjson"}) assert response.status_code == 400 @@ -193,7 +214,7 @@ async def test_stream_process_empty_stream(): @pytest.mark.asyncio -async def test_stream_process_invalidate_json(): +async def test_stream_process_invalidate_jsonl(): async with httpx.AsyncClient(app=app2, base_url="http://test") as ac: response = await ac.post("/stream/process", data='{"name": "doc1", "text": Spinal stenosis}\n'.encode("utf-8"), @@ -207,7 +228,7 @@ async def test_stream_process_invalidate_json(): @pytest.mark.asyncio -async def test_stream_process_invalidate_json_property(): +async def test_stream_process_unknown_jsonl_property(): async with httpx.AsyncClient(app=app2, base_url="http://test") as ac: response = await ac.post("/stream/process", data='{"unknown": "doc1", "text": "Spinal stenosis"}\n{"unknown": "doc2", "text": "Spinal stenosis"}',