Skip to content

Commit

Permalink
improve doc name generation when processing jsonl
Browse files Browse the repository at this point in the history
  • Loading branch information
baixiac committed Jul 1, 2024
1 parent 1fd3e50 commit e7c9d15
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 12 deletions.
4 changes: 4 additions & 0 deletions app/api/api.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import logging
import asyncio
import importlib
import os.path
Expand All @@ -23,6 +24,9 @@
from utils import get_settings


logging.getLogger("asyncio").setLevel(logging.ERROR)


def get_model_server(msd_overwritten: Optional[ModelServiceDep] = None) -> FastAPI:
app = _get_app(msd_overwritten)
config = get_settings()
Expand Down
10 changes: 7 additions & 3 deletions app/api/routers/invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

PATH_INFO = "/info"
PATH_PROCESS = "/process"
PATH_PROCESS_STREAM = "/process_stream"
PATH_PROCESS_JSON_LINES = "/process_jsonl"
PATH_PROCESS_BULK = "/process_bulk"
PATH_PROCESS_BULK_FILE = "/process_bulk_file"
PATH_REDACT = "/redact"
Expand Down Expand Up @@ -72,7 +72,7 @@ def get_entities_from_text(request: Request,
return TextWithAnnotations(text=text, annotations=annotations)


@router.post(PATH_PROCESS_STREAM,
@router.post(PATH_PROCESS_JSON_LINES,
response_class=StreamingResponse,
tags=[Tags.Annotations.name],
dependencies=[Depends(cms_globals.props.current_active_user)],
Expand Down Expand Up @@ -257,9 +257,13 @@ def _send_bulk_processed_docs_metric(processed_docs: List[Dict], handler: str) -

def _chunk_request_body(json_lines: str, chunk_size: int = 5) -> Iterator[pd.DataFrame]:
chunk = []
doc_idx = 0
for line in json_lines.splitlines():
json_line_obj = json.loads(line)
TextStreamItem(**json_line_obj)
if "name" not in json_line_obj:
json_line_obj["name"] = str(doc_idx)
doc_idx += 1
chunk.append(json_line_obj)

if len(chunk) == chunk_size:
Expand All @@ -277,7 +281,7 @@ def _get_jsonlines_stream(output_stream: Iterator[Dict[str, Any]]) -> Iterator[s
annotation_num = 0
for item in output_stream:
if current_doc_name != "" and current_doc_name != item["doc_name"]:
cms_doc_annotations.labels(handler=PATH_PROCESS_STREAM).observe(annotation_num)
cms_doc_annotations.labels(handler=PATH_PROCESS_JSON_LINES).observe(annotation_num)
current_doc_name = item["doc_name"]
annotation_num += 1
yield json.dumps(item) + "\n"
1 change: 1 addition & 0 deletions app/trainers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from domain import TrainingType

logger = logging.getLogger("cms")
logging.getLogger("asyncio").setLevel(logging.ERROR)


class TrainerCommon(object):
Expand Down
2 changes: 1 addition & 1 deletion tests/app/api/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def test_get_model_server():
assert {"name": "Authentication", "description": "Authenticate registered users"} in tags
assert "/info" in paths
assert "/process" in paths
assert "/process_stream" in paths
assert "/process_jsonl" in paths
assert "/process_bulk" in paths
assert "/process_bulk_file" in paths
assert "/redact" in paths
Expand Down
37 changes: 29 additions & 8 deletions tests/app/api/test_serving_medcat.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def test_process():
}


def test_process_stream():
def test_process_jsonl():
annotations = [{
"label_name": "Spinal stenosis",
"label_id": "76107001",
Expand All @@ -104,7 +104,7 @@ def test_process_stream():
model_manager = ModelManager(None, None)
model_manager.model_service = model_service
cms_globals.model_manager_dep = lambda: model_manager
response = client.post("/process_stream",
response = client.post("/process_jsonl",
data='{"name": "doc1", "text": "Spinal stenosis"}\n{"name": "doc2", "text": "Spinal stenosis"}',
headers={"Content-Type": "application/x-ndjson"})

Expand All @@ -114,7 +114,7 @@ def test_process_stream():
assert json.loads(jsonlines[1]) == {"doc_name": "doc2", **annotations[0]}


def test_process_invalid_stream():
def test_process_invalid_jsonl():
annotations = [{
"label_name": "Spinal stenosis",
"label_id": "76107001",
Expand All @@ -134,13 +134,34 @@ def test_process_invalid_stream():
model_manager.model_service = model_service
cms_globals.model_manager_dep = lambda: model_manager

response = client.post("/process_stream",
data='invalid stream',
response = client.post("/process_jsonl",
data="invalid json lines",
headers={"Content-Type": "application/x-ndjson"})
assert response.status_code == 400
assert response.json() == {"message": "Invalid JSON Lines."}

response = client.post("/process_stream",

def test_process_unknown_jsonl_properties():
annotations = [{
"label_name": "Spinal stenosis",
"label_id": "76107001",
"start": 0,
"end": 15,
"accuracy": 1.0,
"meta_anns": {
"Status": {
"value": "Affirmed",
"confidence": 0.9999833106994629,
"name": "Status"
}
},
}]
model_service.annotate.return_value = annotations
model_manager = ModelManager(None, None)
model_manager.model_service = model_service
cms_globals.model_manager_dep = lambda: model_manager

response = client.post("/process_jsonl",
data='{"unknown": "doc1", "text": "Spinal stenosis"}\n{"unknown": "doc2", "text": "Spinal stenosis"}',
headers={"Content-Type": "application/x-ndjson"})
assert response.status_code == 400
Expand Down Expand Up @@ -193,7 +214,7 @@ async def test_stream_process_empty_stream():


@pytest.mark.asyncio
async def test_stream_process_invalidate_json():
async def test_stream_process_invalidate_jsonl():
async with httpx.AsyncClient(app=app2, base_url="http://test") as ac:
response = await ac.post("/stream/process",
data='{"name": "doc1", "text": Spinal stenosis}\n'.encode("utf-8"),
Expand All @@ -207,7 +228,7 @@ async def test_stream_process_invalidate_json():


@pytest.mark.asyncio
async def test_stream_process_invalidate_json_property():
async def test_stream_process_unknown_jsonl_property():
async with httpx.AsyncClient(app=app2, base_url="http://test") as ac:
response = await ac.post("/stream/process",
data='{"unknown": "doc1", "text": "Spinal stenosis"}\n{"unknown": "doc2", "text": "Spinal stenosis"}',
Expand Down

0 comments on commit e7c9d15

Please sign in to comment.