From b6823a176ec6ea552af26cfa6f39e44357c1ff30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Tue, 13 Feb 2024 13:17:52 +0100 Subject: [PATCH 01/18] Removed results unwrapping from workflows --- .../complier/runtime_input_validator.py | 8 +- .../complier/steps_executors/auxiliary.py | 96 +++++------------- .../complier/steps_executors/models.py | 99 ++++--------------- .../complier/steps_executors/utils.py | 4 +- .../steps_executors/test_auxiliary.py | 86 ---------------- .../compiler/test_runtime_input_validator.py | 19 ++-- .../unit_tests/http/test_client.py | 48 ++++++--- .../http/utils/test_postprocessing.py | 5 +- 8 files changed, 100 insertions(+), 265 deletions(-) diff --git a/inference/enterprise/workflows/complier/runtime_input_validator.py b/inference/enterprise/workflows/complier/runtime_input_validator.py index 5e89dfa0b..ed79024ce 100644 --- a/inference/enterprise/workflows/complier/runtime_input_validator.py +++ b/inference/enterprise/workflows/complier/runtime_input_validator.py @@ -127,9 +127,11 @@ def assembly_input_images( for i, image in enumerate(runtime_parameters[definition.name]) ] else: - runtime_parameters[definition.name] = assembly_input_image( - parameter=input_node, image=runtime_parameters[definition.name] - ) + runtime_parameters[definition.name] = [ + assembly_input_image( + parameter=input_node, image=runtime_parameters[definition.name] + ) + ] return runtime_parameters diff --git a/inference/enterprise/workflows/complier/steps_executors/auxiliary.py b/inference/enterprise/workflows/complier/steps_executors/auxiliary.py index 9592269d7..de64c7845 100644 --- a/inference/enterprise/workflows/complier/steps_executors/auxiliary.py +++ b/inference/enterprise/workflows/complier/steps_executors/auxiliary.py @@ -91,9 +91,6 @@ async def run_crop_step( runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) - if not issubclass(type(image), list): - image = [image] - detections = [detections] decoded_images = [load_image(e) for e in image] decoded_images = [ i[0] if i[1] is True else i[0][:, :, ::-1] for i in decoded_images @@ -187,30 +184,15 @@ async def run_detection_filter( ) filter_callable = build_filter_callable(definition=step.filter_definition) result_detections, result_parent_id = [], [] - nested = False for prediction in predictions: - if issubclass(type(prediction), list): - nested = True # assuming that we either have all nested or none - filtered_predictions = [ - deepcopy(p) for p in prediction if filter_callable(p) - ] - result_detections.append(filtered_predictions) - result_parent_id.append([p[PARENT_ID_KEY] for p in filtered_predictions]) - elif filter_callable(prediction): - result_detections.append(deepcopy(prediction)) - result_parent_id.append(prediction[PARENT_ID_KEY]) + filtered_predictions = [deepcopy(p) for p in prediction if filter_callable(p)] + result_detections.append(filtered_predictions) + result_parent_id.append([p[PARENT_ID_KEY] for p in filtered_predictions]) step_selector = construct_step_selector(step_name=step.name) - if nested: - outputs_lookup[step_selector] = [ - {"predictions": d, PARENT_ID_KEY: p, "image": i} - for d, p, i in zip(result_detections, result_parent_id, images_meta) - ] - else: - outputs_lookup[step_selector] = { - "predictions": result_detections, - PARENT_ID_KEY: result_parent_id, - "image": images_meta, - } + outputs_lookup[step_selector] = [ + {"predictions": d, PARENT_ID_KEY: p, "image": i} + for d, p, i in zip(result_detections, result_parent_id, images_meta) + ] return None, outputs_lookup @@ -263,35 +245,18 @@ async def run_detection_offset_step( outputs_lookup=outputs_lookup, ) result_detections, result_parent_id = [], [] - nested = False for detection in detections: - if issubclass(type(detection), list): - nested = True # assuming that we either have all nested or none - offset_detections = [ - offset_detection(detection=d, offset_x=offset_x, offset_y=offset_y) - for d in detection - ] - result_detections.append(offset_detections) - result_parent_id.append([d[PARENT_ID_KEY] for d in offset_detections]) - else: - result_detections.append( - offset_detection( - detection=detection, offset_x=offset_x, offset_y=offset_y - ) - ) - result_parent_id.append(detection[PARENT_ID_KEY]) - step_selector = construct_step_selector(step_name=step.name) - if nested: - outputs_lookup[step_selector] = [ - {"predictions": d, PARENT_ID_KEY: p, "image": i} - for d, p, i in zip(result_detections, result_parent_id, images_meta) + offset_detections = [ + offset_detection(detection=d, offset_x=offset_x, offset_y=offset_y) + for d in detection ] - else: - outputs_lookup[step_selector] = { - "predictions": result_detections, - PARENT_ID_KEY: result_parent_id, - "image": images_meta, - } + result_detections.append(offset_detections) + result_parent_id.append([d[PARENT_ID_KEY] for d in offset_detections]) + step_selector = construct_step_selector(step_name=step.name) + outputs_lookup[step_selector] = [ + {"predictions": d, PARENT_ID_KEY: p, "image": i} + for d, p, i in zip(result_detections, result_parent_id, images_meta) + ] return None, outputs_lookup @@ -319,9 +284,6 @@ async def run_static_crop_step( runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) - - if not issubclass(type(image), list): - image = [image] decoded_images = [load_image(e) for e in image] decoded_images = [ i[0] if i[1] is True else i[0][:, :, ::-1] for i in decoded_images @@ -418,6 +380,7 @@ async def run_detections_consensus_step( outputs_lookup=outputs_lookup, ) all_predictions = [resolve_parameter_closure(p) for p in step.predictions] + # all_predictions has shape (n_consensus_input, bs, img_predictions) if len(all_predictions) < 1: raise ExecutionGraphError( f"Consensus step requires at least one source of predictions." @@ -432,19 +395,16 @@ async def run_detections_consensus_step( ) images_meta = resolve_parameter_closure(images_meta_selector) batch_size = batch_sizes[0] - if batch_size == 1: - all_predictions = [[e] for e in all_predictions] - images_meta = [images_meta] results = [] for batch_index in range(batch_size): - batch_predictions = [e[batch_index] for e in all_predictions] + batch_element_predictions = [e[batch_index] for e in all_predictions] ( parent_id, object_present, presence_confidence, consensus_detections, ) = resolve_batch_consensus( - predictions=batch_predictions, + predictions=batch_element_predictions, required_votes=resolve_parameter_closure(step.required_votes), class_aware=resolve_parameter_closure(step.class_aware), iou_threshold=resolve_parameter_closure(step.iou_threshold), @@ -464,14 +424,12 @@ async def run_detections_consensus_step( "image": images_meta[batch_index], } ) - if batch_size == 1: - results = results[0] outputs_lookup[construct_step_selector(step_name=step.name)] = results return None, outputs_lookup def get_and_validate_batch_sizes( - all_predictions: List[Union[List[dict], List[List[dict]]]], + all_predictions: List[List[List[dict]]], step_name: str, ) -> List[int]: batch_sizes = get_predictions_batch_sizes(all_predictions=all_predictions) @@ -482,16 +440,8 @@ def get_and_validate_batch_sizes( return batch_sizes -def get_predictions_batch_sizes( - all_predictions: List[Union[List[dict], List[List[dict]]]] -) -> List[int]: - return [get_batch_size(predictions=predictions) for predictions in all_predictions] - - -def get_batch_size(predictions: Union[List[dict], List[List[dict]]]) -> int: - if len(predictions) == 0 or issubclass(type(predictions[0]), dict): - return 1 - return len(predictions) +def get_predictions_batch_sizes(all_predictions: List[List[List[dict]]]) -> List[int]: + return [len(predictions) for predictions in all_predictions] def all_batch_sizes_equal(batch_sizes: List[int]) -> bool: diff --git a/inference/enterprise/workflows/complier/steps_executors/models.py b/inference/enterprise/workflows/complier/steps_executors/models.py index d1594ba14..1d056d70b 100644 --- a/inference/enterprise/workflows/complier/steps_executors/models.py +++ b/inference/enterprise/workflows/complier/steps_executors/models.py @@ -93,8 +93,6 @@ async def run_roboflow_model_step( outputs_lookup=outputs_lookup, api_key=api_key, ) - if issubclass(type(image), list) and len(image) == 1: - image = image[0] if step.type in {"ClassificationModel", "MultiLabelClassificationModel"}: serialised_result = attach_parent_info( image=image, results=serialised_result, nested_key=None @@ -110,14 +108,14 @@ async def run_roboflow_model_step( async def get_roboflow_model_predictions_locally( - image: Union[dict, List[dict]], + image: List[dict], model_id: str, step: RoboflowModel, runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], -) -> Union[dict, List[dict]]: +) -> List[dict]: request_constructor = MODEL_TYPE2REQUEST_CONSTRUCTOR[step.type] request = request_constructor( step=step, @@ -134,9 +132,7 @@ async def get_roboflow_model_predictions_locally( if issubclass(type(result), list): serialised_result = [e.dict(by_alias=True, exclude_none=True) for e in result] else: - serialised_result = result.dict(by_alias=True, exclude_none=True) - if issubclass(type(serialised_result), list) and len(serialised_result) == 1: - serialised_result = serialised_result[0] + serialised_result = [result.dict(by_alias=True, exclude_none=True)] return serialised_result @@ -252,13 +248,13 @@ def construct_keypoints_detection_request( async def get_roboflow_model_predictions_from_remote_api( - image: Union[dict, List[dict]], + image: List[dict], model_id: str, step: RoboflowModel, runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, api_key: Optional[str], -) -> Union[dict, List[dict]]: +) -> List[dict]: api_url = resolve_model_api_url(step=step) client = InferenceHTTPClient( api_url=api_url, @@ -272,50 +268,16 @@ async def get_roboflow_model_predictions_from_remote_api( outputs_lookup=outputs_lookup, ) client.configure(inference_configuration=configuration) - if issubclass(type(image), dict): - inference_input = image["value"] - else: - inference_input = [i["value"] for i in image] + inference_input = [i["value"] for i in image] results = await client.infer_async( inference_input=inference_input, model_id=model_id, ) - # just for now, until we have hosted inference deployed with new version - return _inject_detection_id_if_remote_api_does_not_provide_one( - results=results, - step_type=step.type, - ) - - -def _inject_detection_id_if_remote_api_does_not_provide_one( - results: Union[List[dict], dict], - step_type: str, -) -> Union[List[dict], dict]: - if step_type not in { - "ObjectDetectionModel", - "InstanceSegmentationModel", - "KeypointsDetectionModel", - }: - return results - if issubclass(type(results), dict): - results["predictions"] = _inject_detection_id_to_predictions( - predictions=results["predictions"] - ) - return results - for result in results: - result["predictions"] = _inject_detection_id_to_predictions( - predictions=result["predictions"] - ) + if not issubclass(type(results), list): + return [results] return results -def _inject_detection_id_to_predictions(predictions: List[dict]) -> List[dict]: - for prediction in predictions: - if DETECTION_ID_KEY not in prediction: - prediction[DETECTION_ID_KEY] = str(uuid4()) - return predictions - - def construct_http_client_configuration_for_classification_step( step: Union[ClassificationModel, MultiLabelClassificationModel], runtime_parameters: Dict[str, Any], @@ -428,8 +390,6 @@ async def run_ocr_model_step( runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) - if not issubclass(type(image), list): - image = [image] if step_execution_mode is StepExecutionMode.LOCAL: serialised_result = await get_ocr_predictions_locally( image=image, @@ -442,9 +402,6 @@ async def run_ocr_model_step( image=image, api_key=api_key, ) - if len(serialised_result) == 1: - serialised_result = serialised_result[0] - image = image[0] serialised_result = attach_parent_info( image=image, results=serialised_result, @@ -520,8 +477,6 @@ async def run_clip_comparison_step( runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) - if not issubclass(type(image), list): - image = [image] if step_execution_mode is StepExecutionMode.LOCAL: serialised_result = await get_clip_comparison_locally( image=image, @@ -536,9 +491,6 @@ async def run_clip_comparison_step( text=text, api_key=api_key, ) - if len(serialised_result) == 1: - serialised_result = serialised_result[0] - image = image[0] serialised_result = attach_parent_info( image=image, results=serialised_result, @@ -622,20 +574,16 @@ def load_core_model( def attach_parent_info( - image: Union[Dict[str, Any], List[Dict[str, Any]]], - results: Union[Dict[str, Any], List[Dict[str, Any]]], + image: List[Dict[str, Any]], + results: List[Dict[str, Any]], nested_key: Optional[str] = "predictions", -) -> Union[Dict[str, Any], List[Dict[str, Any]]]: - if issubclass(type(image), list): - return [ - attach_parent_info_to_image_detections( - image=i, predictions=p, nested_key=nested_key - ) - for i, p in zip(image, results) - ] - return attach_parent_info_to_image_detections( - image=image, predictions=results, nested_key=nested_key - ) +) -> List[Dict[str, Any]]: + return [ + attach_parent_info_to_image_detections( + image=i, predictions=p, nested_key=nested_key + ) + for i, p in zip(image, results) + ] def attach_parent_info_to_image_detections( @@ -652,18 +600,11 @@ def attach_parent_info_to_image_detections( def anchor_detections_in_parent_coordinates( - image: Union[Dict[str, Any], List[Dict[str, Any]]], - serialised_result: Union[Dict[str, Any], List[Dict[str, Any]]], + image: List[Dict[str, Any]], + serialised_result: List[Dict[str, Any]], image_metadata_key: str = "image", detections_key: str = "predictions", -) -> Union[Dict[str, Any], List[Dict[str, Any]]]: - if issubclass(type(image), dict): - return anchor_image_detections_in_parent_coordinates( - image=image, - serialised_result=serialised_result, - image_metadata_key=image_metadata_key, - detections_key=detections_key, - ) +) -> List[Dict[str, Any]]: return [ anchor_image_detections_in_parent_coordinates( image=i, diff --git a/inference/enterprise/workflows/complier/steps_executors/utils.py b/inference/enterprise/workflows/complier/steps_executors/utils.py index f9f6d314f..6b336b063 100644 --- a/inference/enterprise/workflows/complier/steps_executors/utils.py +++ b/inference/enterprise/workflows/complier/steps_executors/utils.py @@ -1,5 +1,7 @@ from typing import Any, Dict, Generator, Iterable, List, TypeVar, Union +import numpy as np + from inference.enterprise.workflows.complier.steps_executors.types import OutputsLookup from inference.enterprise.workflows.complier.utils import ( get_step_selector_from_its_output, @@ -34,7 +36,7 @@ def get_image( ], runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, -) -> Any: +) -> List[Dict[str, Union[str, np.ndarray]]]: if is_input_selector(selector_or_value=step.image): return runtime_parameters[get_last_selector_chunk(selector=step.image)] if is_step_output_selector(selector_or_value=step.image): diff --git a/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py b/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py index 81bcc67be..cd78eac1e 100644 --- a/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py +++ b/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py @@ -134,92 +134,6 @@ async def test_run_condition_step() -> None: assert next_step == "$steps.step_2" -@pytest.mark.asyncio -async def test_run_detection_filter_step_when_single_image_detections_given() -> None: - # given - step = DetectionFilter.parse_obj( - { - "type": "DetectionFilter", - "name": "step_2", - "predictions": "$steps.step_1.predictions", - "filter_definition": { - "type": "CompoundDetectionFilterDefinition", - "left": { - "type": "DetectionFilterDefinition", - "field_name": "class_name", - "operator": "equal", - "reference_value": "car", - }, - "operator": "and", - "right": { - "type": "DetectionFilterDefinition", - "field_name": "confidence", - "operator": "greater_or_equal_than", - "reference_value": 0.5, - }, - }, - } - ) - detections = [ - { - "x": 10, - "y": 10, - "width": 20, - "height": 20, - "parent_id": "p1", - "detection_id": "one", - "class_name": "car", - "confidence": 0.2, - }, - { - "x": 10, - "y": 10, - "width": 20, - "height": 20, - "parent_id": "p2", - "detection_id": "two", - "class_name": "car", - "confidence": 0.5, - }, - ] - - # when - next_step, outputs_lookup = await run_detection_filter( - step=step, - runtime_parameters={}, - outputs_lookup={ - "$steps.step_1": { - "predictions": detections, - "image": {"height": 100, "width": 100}, - } - }, - model_manager=MagicMock(), - api_key=None, - ) - - # then - assert next_step is None, "Next step should not be set here" - assert outputs_lookup["$steps.step_2"]["predictions"] == [ - { - "x": 10, - "y": 10, - "width": 20, - "height": 20, - "parent_id": "p2", - "detection_id": "two", - "class_name": "car", - "confidence": 0.5, - }, - ], "Only second prediction should survive" - assert outputs_lookup["$steps.step_2"]["parent_id"] == [ - "p2" - ], "Only second prediction should mark parent_id" - assert outputs_lookup["$steps.step_2"]["image"] == { - "height": 100, - "width": 100, - }, "image metadata must be copied from input" - - @pytest.mark.asyncio async def test_run_detection_filter_step_when_batch_detections_given() -> None: # given diff --git a/tests/inference/unit_tests/enterprise/workflows/compiler/test_runtime_input_validator.py b/tests/inference/unit_tests/enterprise/workflows/compiler/test_runtime_input_validator.py index 50c1df6bb..aeb78748d 100644 --- a/tests/inference/unit_tests/enterprise/workflows/compiler/test_runtime_input_validator.py +++ b/tests/inference/unit_tests/enterprise/workflows/compiler/test_runtime_input_validator.py @@ -144,20 +144,23 @@ def test_assembly_input_images_when_images_provided_as_single_elements() -> None ) # then - assert result["one"] == { - "type": "url", - "value": "https://some.com/image.jpg", - "parent_id": "$inputs.one", - }, "parent_id expected to be added" + assert result["one"] == [ + { + "type": "url", + "value": "https://some.com/image.jpg", + "parent_id": "$inputs.one", + } + ], "parent_id expected to be added" assert result["some"] == "value", "Value must not be touched by function" + assert len(result["two"]) == 1, "Image must be wrapped with list" assert ( - result["two"]["type"] == "numpy_object" + result["two"][0]["type"] == "numpy_object" ), "numpy array must be packed in dict with type definition" assert ( - result["two"]["value"] == np.zeros((192, 168, 3), dtype=np.uint8) + result["two"][0]["value"] == np.zeros((192, 168, 3), dtype=np.uint8) ).all(), "Image cannot be mutated" assert ( - result["two"]["parent_id"] == "$inputs.two" + result["two"][0]["parent_id"] == "$inputs.two" ), "parent_id expected to be added and match input identifier" diff --git a/tests/inference_sdk/unit_tests/http/test_client.py b/tests/inference_sdk/unit_tests/http/test_client.py index 686aa836e..394c5c92e 100644 --- a/tests/inference_sdk/unit_tests/http/test_client.py +++ b/tests/inference_sdk/unit_tests/http/test_client.py @@ -472,7 +472,9 @@ def test_client_unload_single_model_when_successful_response_expected_against_al assert requests_mock.last_request.json() == { "model_id": "coco/3", } - assert http_client.selected_model is None, "Even when alias is in use - selected model should be emptied" + assert ( + http_client.selected_model is None + ), "Even when alias is in use - selected model should be emptied" @pytest.mark.asyncio @@ -532,7 +534,9 @@ async def test_client_unload_single_model_async_when_successful_response_expecte }, headers=DEFAULT_HEADERS, ) - assert http_client.selected_model is None, "Even when alias is in use - selected model should be emptied" + assert ( + http_client.selected_model is None + ), "Even when alias is in use - selected model should be emptied" def test_client_unload_single_model_when_error_occurs(requests_mock: Mocker) -> None: @@ -659,7 +663,9 @@ async def test_client_load_model_async_when_successful_response_expected() -> No @pytest.mark.asyncio -async def test_client_load_model_async_when_successful_response_expected_against_alias() -> None: +async def test_client_load_model_async_when_successful_response_expected_against_alias() -> ( + None +): # given api_url = "http://some.com" http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) @@ -667,7 +673,9 @@ async def test_client_load_model_async_when_successful_response_expected_against with aioresponses() as m: m.post( f"{api_url}/model/add", - payload={"models": [{"model_id": "coco/3", "task_type": "object-detection"}]}, + payload={ + "models": [{"model_id": "coco/3", "task_type": "object-detection"}] + }, ) # when @@ -1000,7 +1008,9 @@ async def test_get_model_description_async_when_model_was_loaded_already() -> No @pytest.mark.asyncio -async def test_get_model_description_async_when_model_was_loaded_already_and_alias_was_resolved() -> None: +async def test_get_model_description_async_when_model_was_loaded_already_and_alias_was_resolved() -> ( + None +): # given api_url = "http://some.com" http_client = InferenceHTTPClient(api_key="my-api-key", api_url=api_url) @@ -1008,13 +1018,17 @@ async def test_get_model_description_async_when_model_was_loaded_already_and_ali with aioresponses() as m: m.get( f"{api_url}/model/registry", - payload={"models": [{"model_id": "coco/3", "task_type": "object-detection"}]}, + payload={ + "models": [{"model_id": "coco/3", "task_type": "object-detection"}] + }, ) # when result = await http_client.get_model_description_async(model_id="yolov8n-640") # then - assert result == ModelDescription(model_id="coco/3", task_type="object-detection") + assert result == ModelDescription( + model_id="coco/3", task_type="object-detection" + ) def test_get_model_description_when_model_was_not_loaded_before_and_successful_load( @@ -1105,14 +1119,18 @@ async def test_get_model_description_async_when_model_was_not_loaded_before_and_ ) m.post( f"{api_url}/model/add", - payload={"models": [{"model_id": "coco/3", "task_type": "object-detection"}]}, + payload={ + "models": [{"model_id": "coco/3", "task_type": "object-detection"}] + }, ) # when result = await http_client.get_model_description_async(model_id="yolov8n-640") # then - assert result == ModelDescription(model_id="coco/3", task_type="object-detection") + assert result == ModelDescription( + model_id="coco/3", task_type="object-detection" + ) def test_get_model_description_when_model_was_not_loaded_before_and_unsuccessful_load( @@ -1268,7 +1286,8 @@ def test_infer_from_api_v0_when_request_succeed_for_object_detection_with_batch_ # when result = http_client.infer_from_api_v0( - inference_input=["https://some/image.jpg"] * 2, model_id=model_id_to_use, + inference_input=["https://some/image.jpg"] * 2, + model_id=model_id_to_use, ) # then @@ -1363,7 +1382,8 @@ async def test_infer_from_api_v0_async_when_request_succeed_for_object_detection # when result = await http_client.infer_from_api_v0_async( - inference_input="https://some/image.jpg", model_id=model_id_to_use, + inference_input="https://some/image.jpg", + model_id=model_id_to_use, ) # then assert result == [ @@ -1710,7 +1730,8 @@ def test_infer_from_api_v1_when_request_succeed_for_object_detection_with_batch_ # when result = http_client.infer_from_api_v1( - inference_input="https://some/image.jpg", model_id=model_id_to_use, + inference_input="https://some/image.jpg", + model_id=model_id_to_use, ) # then assert result == [ @@ -1825,7 +1846,8 @@ async def test_infer_from_api_v1_async_when_request_succeed_for_object_detection # when result = await http_client.infer_from_api_v1_async( - inference_input="https://some/image.jpg", model_id=model_id_to_use, + inference_input="https://some/image.jpg", + model_id=model_id_to_use, ) # then assert result == [ diff --git a/tests/inference_sdk/unit_tests/http/utils/test_postprocessing.py b/tests/inference_sdk/unit_tests/http/utils/test_postprocessing.py index dc03de15e..6d45e4903 100644 --- a/tests/inference_sdk/unit_tests/http/utils/test_postprocessing.py +++ b/tests/inference_sdk/unit_tests/http/utils/test_postprocessing.py @@ -10,7 +10,7 @@ from PIL import Image, ImageChops from requests import Response -from inference_sdk.http.entities import VisualisationResponseFormat, ModelDescription +from inference_sdk.http.entities import ModelDescription, VisualisationResponseFormat from inference_sdk.http.utils import post_processing from inference_sdk.http.utils.post_processing import ( adjust_bbox_coordinates_to_client_scaling_factor, @@ -22,9 +22,10 @@ combine_gaze_detections, decode_workflow_output_image, decode_workflow_outputs, + filter_model_descriptions, is_workflow_image, response_contains_jpeg_image, - transform_base64_visualisation, filter_model_descriptions, + transform_base64_visualisation, ) From 3b26b2be77ee8bbee5b9f9a5102acc8d994bf61b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Tue, 13 Feb 2024 13:59:28 +0100 Subject: [PATCH 02/18] Make Tiny cache default option --- inference/core/env.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inference/core/env.py b/inference/core/env.py index 110e983e2..6bb53d57d 100644 --- a/inference/core/env.py +++ b/inference/core/env.py @@ -59,7 +59,7 @@ GAZE_MAX_BATCH_SIZE = int(os.getenv("GAZE_MAX_BATCH_SIZE", 8)) # If true, this will store a non-verbose version of the inference request and repsonse in the cache -TINY_CACHE = str2bool(os.getenv("TINY_CACHE", False)) +TINY_CACHE = str2bool(os.getenv("TINY_CACHE", True)) # Maximum batch size for CLIP, default is 8 CLIP_MAX_BATCH_SIZE = int(os.getenv("CLIP_MAX_BATCH_SIZE", 8)) From c2af81e4f4d24e05a8658f787628a336f4fe3e52 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Tue, 13 Feb 2024 17:40:11 +0100 Subject: [PATCH 03/18] Add draft implementation of the active learning block --- docker/config/cpu_http.py | 2 + inference/core/interfaces/http/http_api.py | 14 ++ .../enterprise/workflows/complier/core.py | 16 ++ .../workflows/complier/execution_engine.py | 25 +++ .../workflows/complier/graph_parser.py | 22 --- .../active_learning_middlewares.py | 86 ++++++++++ .../complier/steps_executors/auxiliary.py | 72 ++++++++- .../complier/steps_executors/utils.py | 2 + .../enterprise/workflows/entities/steps.py | 147 ++++++++++++++++++ .../entities/workflows_specification.py | 2 + .../steps_executors/test_auxiliary.py | 3 + .../workflows/compiler/test_graph_parser.py | 49 ++---- 12 files changed, 372 insertions(+), 68 deletions(-) create mode 100644 inference/enterprise/workflows/complier/steps_executors/active_learning_middlewares.py diff --git a/docker/config/cpu_http.py b/docker/config/cpu_http.py index 862dce75e..2de4d1706 100644 --- a/docker/config/cpu_http.py +++ b/docker/config/cpu_http.py @@ -10,6 +10,8 @@ from prometheus_fastapi_instrumentator import Instrumentator from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA +from inference.enterprise.workflows.complier.steps_executors.active_learning_middlewares import \ + BackgroundTaskWorkflowsActiveLearningMiddleware from inference.models.utils import ROBOFLOW_MODEL_TYPES model_registry = RoboflowModelRegistry(ROBOFLOW_MODEL_TYPES) diff --git a/inference/core/interfaces/http/http_api.py b/inference/core/interfaces/http/http_api.py index ff6ffbc08..647edbc2c 100644 --- a/inference/core/interfaces/http/http_api.py +++ b/inference/core/interfaces/http/http_api.py @@ -12,6 +12,7 @@ from fastapi_cprofile.profiler import CProfileMiddleware from inference.core import logger +from inference.core.cache import cache from inference.core.devices.utils import GLOBAL_INFERENCE_SERVER_ID from inference.core.entities.requests.clip import ( ClipCompareRequest, @@ -126,6 +127,9 @@ from inference.core.utils.notebooks import start_notebook from inference.enterprise.workflows.complier.core import compile_and_execute_async from inference.enterprise.workflows.complier.entities import StepExecutionMode +from inference.enterprise.workflows.complier.steps_executors.active_learning_middlewares import ( + WorkflowsActiveLearningMiddleware, +) from inference.enterprise.workflows.errors import ( ExecutionEngineError, RuntimePayloadError, @@ -296,6 +300,9 @@ async def count_errors(request: Request, call_next): self.app = app self.model_manager = model_manager + self.workflows_active_learning_middleware = WorkflowsActiveLearningMiddleware( + cache=cache, + ) async def process_inference_request( inference_request: InferenceRequest, **kwargs @@ -320,6 +327,7 @@ async def process_inference_request( async def process_workflow_inference_request( workflow_request: WorkflowInferenceRequest, workflow_specification: dict, + background_tasks: Optional[BackgroundTasks], ) -> WorkflowInferenceResponse: step_execution_mode = StepExecutionMode(WORKFLOWS_STEP_EXECUTION_MODE) result = await compile_and_execute_async( @@ -329,6 +337,8 @@ async def process_workflow_inference_request( api_key=workflow_request.api_key, max_concurrent_steps=WORKFLOWS_MAX_CONCURRENT_STEPS, step_execution_mode=step_execution_mode, + active_learning_middleware=self.workflows_active_learning_middleware, + background_tasks=background_tasks, ) outputs = serialise_workflow_result( result=result, @@ -656,6 +666,7 @@ async def infer_from_predefined_workflow( workspace_name: str, workflow_name: str, workflow_request: WorkflowInferenceRequest, + background_tasks: BackgroundTasks, ) -> WorkflowInferenceResponse: workflow_specification = get_workflow_specification( api_key=workflow_request.api_key, @@ -665,6 +676,7 @@ async def infer_from_predefined_workflow( return await process_workflow_inference_request( workflow_request=workflow_request, workflow_specification=workflow_specification, + background_tasks=background_tasks if not LAMBDA else None, ) @app.post( @@ -676,6 +688,7 @@ async def infer_from_predefined_workflow( @with_route_exceptions async def infer_from_workflow( workflow_request: WorkflowSpecificationInferenceRequest, + background_tasks: BackgroundTasks, ) -> WorkflowInferenceResponse: workflow_specification = { "specification": workflow_request.specification @@ -683,6 +696,7 @@ async def infer_from_workflow( return await process_workflow_inference_request( workflow_request=workflow_request, workflow_specification=workflow_specification, + background_tasks=background_tasks if not LAMBDA else None, ) if CORE_MODELS_ENABLED: diff --git a/inference/enterprise/workflows/complier/core.py b/inference/enterprise/workflows/complier/core.py index 1e5b656c9..913280fc9 100644 --- a/inference/enterprise/workflows/complier/core.py +++ b/inference/enterprise/workflows/complier/core.py @@ -2,6 +2,9 @@ from asyncio import AbstractEventLoop from typing import Any, Dict, Optional +from fastapi import BackgroundTasks + +from inference.core.cache import cache from inference.core.env import API_KEY, MAX_ACTIVE_MODELS from inference.core.managers.base import ModelManager from inference.core.managers.decorators.fixed_size_cache import WithFixedSizeCache @@ -9,6 +12,9 @@ from inference.enterprise.workflows.complier.entities import StepExecutionMode from inference.enterprise.workflows.complier.execution_engine import execute_graph from inference.enterprise.workflows.complier.graph_parser import prepare_execution_graph +from inference.enterprise.workflows.complier.steps_executors.active_learning_middlewares import ( + WorkflowsActiveLearningMiddleware, +) from inference.enterprise.workflows.complier.validator import ( validate_workflow_specification, ) @@ -25,6 +31,8 @@ def compile_and_execute( api_key: Optional[str] = None, model_manager: Optional[ModelManager] = None, loop: Optional[AbstractEventLoop] = None, + active_learning_middleware: Optional[WorkflowsActiveLearningMiddleware] = None, + background_tasks: Optional[BackgroundTasks] = None, max_concurrent_steps: int = 1, step_execution_mode: StepExecutionMode = StepExecutionMode.LOCAL, ) -> dict: @@ -36,6 +44,8 @@ def compile_and_execute( runtime_parameters=runtime_parameters, model_manager=model_manager, api_key=api_key, + active_learning_middleware=active_learning_middleware, + background_tasks=background_tasks, max_concurrent_steps=max_concurrent_steps, step_execution_mode=step_execution_mode, ) @@ -47,6 +57,8 @@ async def compile_and_execute_async( runtime_parameters: Dict[str, Any], model_manager: Optional[ModelManager] = None, api_key: Optional[str] = None, + active_learning_middleware: Optional[WorkflowsActiveLearningMiddleware] = None, + background_tasks: Optional[BackgroundTasks] = None, max_concurrent_steps: int = 1, step_execution_mode: StepExecutionMode = StepExecutionMode.LOCAL, ) -> dict: @@ -56,6 +68,8 @@ async def compile_and_execute_async( model_registry = RoboflowModelRegistry(ROBOFLOW_MODEL_TYPES) model_manager = ModelManager(model_registry=model_registry) model_manager = WithFixedSizeCache(model_manager, max_size=MAX_ACTIVE_MODELS) + if active_learning_middleware is None: + active_learning_middleware = WorkflowsActiveLearningMiddleware(cache=cache) parsed_workflow_specification = WorkflowSpecification.parse_obj( workflow_specification ) @@ -73,6 +87,8 @@ async def compile_and_execute_async( execution_graph=execution_graph, runtime_parameters=runtime_parameters, model_manager=model_manager, + active_learning_middleware=active_learning_middleware, + background_tasks=background_tasks, api_key=api_key, max_concurrent_steps=max_concurrent_steps, step_execution_mode=step_execution_mode, diff --git a/inference/enterprise/workflows/complier/execution_engine.py b/inference/enterprise/workflows/complier/execution_engine.py index 7f95b0786..bb0003615 100644 --- a/inference/enterprise/workflows/complier/execution_engine.py +++ b/inference/enterprise/workflows/complier/execution_engine.py @@ -3,6 +3,7 @@ from typing import Any, Dict, List, Optional, Set import networkx as nx +from fastapi import BackgroundTasks from networkx import DiGraph from inference.core import logger @@ -15,7 +16,11 @@ from inference.enterprise.workflows.complier.runtime_input_validator import ( prepare_runtime_parameters, ) +from inference.enterprise.workflows.complier.steps_executors.active_learning_middlewares import ( + WorkflowsActiveLearningMiddleware, +) from inference.enterprise.workflows.complier.steps_executors.auxiliary import ( + run_active_learning_data_collector, run_condition_step, run_crop_step, run_detection_filter, @@ -61,6 +66,7 @@ "RelativeStaticCrop": run_static_crop_step, "ClipComparison": run_clip_comparison_step, "DetectionsConsensus": run_detections_consensus_step, + "ActiveLearningDataCollector": run_active_learning_data_collector, } @@ -68,6 +74,8 @@ async def execute_graph( execution_graph: DiGraph, runtime_parameters: Dict[str, Any], model_manager: ModelManager, + active_learning_middleware: WorkflowsActiveLearningMiddleware, + background_tasks: Optional[BackgroundTasks] = None, api_key: Optional[str] = None, max_concurrent_steps: int = 1, step_execution_mode: StepExecutionMode = StepExecutionMode.LOCAL, @@ -101,6 +109,8 @@ async def execute_graph( model_manager=model_manager, api_key=api_key, step_execution_mode=step_execution_mode, + active_learning_middleware=active_learning_middleware, + background_tasks=background_tasks, ) return construct_response( execution_graph=execution_graph, outputs_lookup=outputs_lookup @@ -116,6 +126,8 @@ async def execute_steps( model_manager: ModelManager, api_key: Optional[str], step_execution_mode: StepExecutionMode, + active_learning_middleware: WorkflowsActiveLearningMiddleware, + background_tasks: Optional[BackgroundTasks], ) -> Set[str]: """outputs_lookup is mutated while execution, only independent steps may be run together""" logger.info(f"Executing steps: {steps}. Execution mode: {step_execution_mode}") @@ -132,6 +144,8 @@ async def execute_steps( model_manager=model_manager, api_key=api_key, step_execution_mode=step_execution_mode, + active_learning_middleware=active_learning_middleware, + background_tasks=background_tasks, ) for step in steps_batch ] @@ -149,6 +163,8 @@ async def safe_execute_step( model_manager: ModelManager, api_key: Optional[str], step_execution_mode: StepExecutionMode, + active_learning_middleware: WorkflowsActiveLearningMiddleware, + background_tasks: Optional[BackgroundTasks], ) -> Set[str]: try: return await execute_step( @@ -159,6 +175,8 @@ async def safe_execute_step( model_manager=model_manager, api_key=api_key, step_execution_mode=step_execution_mode, + active_learning_middleware=active_learning_middleware, + background_tasks=background_tasks, ) except Exception as error: raise ExecutionEngineError( @@ -176,11 +194,17 @@ async def execute_step( model_manager: ModelManager, api_key: Optional[str], step_execution_mode: StepExecutionMode, + active_learning_middleware: WorkflowsActiveLearningMiddleware, + background_tasks: Optional[BackgroundTasks], ) -> Set[str]: logger.info(f"started execution of: {step} - {datetime.now().isoformat()}") nodes_to_discard = set() step_definition = execution_graph.nodes[step]["definition"] executor = STEP_TYPE2EXECUTOR_MAPPING[step_definition.type] + additional_args = {} + if step_definition.type == "ActiveLearningDataCollector": + additional_args["active_learning_middleware"] = active_learning_middleware + additional_args["background_tasks"] = background_tasks next_step, outputs_lookup = await executor( step=step_definition, runtime_parameters=runtime_parameters, @@ -188,6 +212,7 @@ async def execute_step( model_manager=model_manager, api_key=api_key, step_execution_mode=step_execution_mode, + **additional_args, ) if is_condition_step(execution_graph=execution_graph, node=step): if execution_graph.nodes[step]["definition"].step_if_true == next_step: diff --git a/inference/enterprise/workflows/complier/graph_parser.py b/inference/enterprise/workflows/complier/graph_parser.py index 4650a7f55..2dc2f9558 100644 --- a/inference/enterprise/workflows/complier/graph_parser.py +++ b/inference/enterprise/workflows/complier/graph_parser.py @@ -39,7 +39,6 @@ def prepare_execution_graph(workflow_specification: WorkflowSpecificationV1) -> execution_graph = construct_graph(workflow_specification=workflow_specification) if not nx.is_directed_acyclic_graph(execution_graph): raise NotAcyclicGraphError(f"Detected cycle in execution graph.") - verify_each_node_reach_at_least_one_output(execution_graph=execution_graph) verify_each_node_step_has_parent_in_the_same_branch(execution_graph=execution_graph) verify_that_steps_are_connected_with_compatible_inputs( execution_graph=execution_graph @@ -189,27 +188,6 @@ def verify_edge_is_created_between_existing_nodes( ) -def verify_each_node_reach_at_least_one_output( - execution_graph: DiGraph, -) -> None: - all_nodes = set(execution_graph.nodes()) - output_nodes = get_nodes_of_specific_kind( - execution_graph=execution_graph, kind=OUTPUT_NODE_KIND - ) - nodes_reaching_output = ( - get_nodes_that_are_reachable_from_pointed_ones_in_reversed_graph( - execution_graph=execution_graph, - pointed_nodes=output_nodes, - ) - ) - nodes_not_reaching_output = all_nodes.difference(nodes_reaching_output) - if len(nodes_not_reaching_output) > 0: - raise NodesNotReachingOutputError( - f"Detected {len(nodes_not_reaching_output)} nodes not reaching any of output node:" - f"{nodes_not_reaching_output}." - ) - - def get_nodes_that_are_reachable_from_pointed_ones_in_reversed_graph( execution_graph: DiGraph, pointed_nodes: Set[str], diff --git a/inference/enterprise/workflows/complier/steps_executors/active_learning_middlewares.py b/inference/enterprise/workflows/complier/steps_executors/active_learning_middlewares.py new file mode 100644 index 000000000..98aa10ab0 --- /dev/null +++ b/inference/enterprise/workflows/complier/steps_executors/active_learning_middlewares.py @@ -0,0 +1,86 @@ +from typing import Dict, List, Optional + +from fastapi import BackgroundTasks + +from inference.core import logger +from inference.core.active_learning.middlewares import ActiveLearningMiddleware +from inference.core.cache.base import BaseCache +from inference.core.env import DISABLE_PREPROC_AUTO_ORIENT + + +class WorkflowsActiveLearningMiddleware: + + def __init__( + self, + cache: BaseCache, + middlewares: Optional[Dict[str, ActiveLearningMiddleware]] = None, + ): + self._cache = cache + self._middlewares = middlewares if middlewares is not None else {} + + def register( + self, + dataset_name: str, + images: List[dict], + predictions: List[dict], + api_key: Optional[str], + prediction_type: str, + active_learning_disabled_for_request: bool, + background_tasks: Optional[BackgroundTasks] = None, + ) -> None: + model_id = f"{dataset_name}/workflows" + if api_key is None or active_learning_disabled_for_request: + return None + if background_tasks is None: + self._register( + model_id=model_id, + images=images, + predictions=predictions, + api_key=api_key, + prediction_type=prediction_type, + ) + return None + background_tasks.add_task( + self._register, + model_id=model_id, + images=images, + predictions=predictions, + api_key=api_key, + prediction_type=prediction_type, + ) + + def _register( + self, + model_id: str, + images: List[dict], + predictions: List[dict], + api_key: str, + prediction_type: str, + ) -> None: + try: + self._ensure_middleware_initialised(model_id=model_id, api_key=api_key) + self._middlewares[model_id].register_batch( + inference_inputs=images, + predictions=predictions, + prediction_type=prediction_type, + disable_preproc_auto_orient=DISABLE_PREPROC_AUTO_ORIENT, + ) + except Exception as error: + # Error handling to be decided + logger.warning( + f"Error in datapoint registration for Active Learning. Details: {error}. " + f"Error is suppressed in favour of normal operations of API." + ) + + def _ensure_middleware_initialised( + self, + model_id: str, + api_key: str, + ) -> None: + if model_id in self._middlewares: + return None + self._middlewares[model_id] = ActiveLearningMiddleware.init( + api_key=api_key, + model_id=model_id, + cache=self._cache, + ) diff --git a/inference/enterprise/workflows/complier/steps_executors/auxiliary.py b/inference/enterprise/workflows/complier/steps_executors/auxiliary.py index de64c7845..a446df3fe 100644 --- a/inference/enterprise/workflows/complier/steps_executors/auxiliary.py +++ b/inference/enterprise/workflows/complier/steps_executors/auxiliary.py @@ -7,10 +7,14 @@ from uuid import uuid4 import numpy as np +from fastapi import BackgroundTasks from inference.core.managers.base import ModelManager from inference.core.utils.image_utils import ImageType, load_image from inference.enterprise.workflows.complier.entities import StepExecutionMode +from inference.enterprise.workflows.complier.steps_executors.active_learning_middlewares import ( + WorkflowsActiveLearningMiddleware, +) from inference.enterprise.workflows.complier.steps_executors.constants import ( CENTER_X_KEY, CENTER_Y_KEY, @@ -37,6 +41,7 @@ ) from inference.enterprise.workflows.entities.steps import ( AbsoluteStaticCrop, + ActiveLearningDataCollector, AggregationMode, BinaryOperator, CompoundDetectionFilterDefinition, @@ -49,6 +54,7 @@ Operator, RelativeStaticCrop, ) +from inference.enterprise.workflows.entities.validators import get_last_selector_chunk from inference.enterprise.workflows.errors import ExecutionGraphError OPERATORS = { @@ -79,7 +85,7 @@ async def run_crop_step( outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], - step_execution_mode: StepExecutionMode = StepExecutionMode.LOCAL, + step_execution_mode: StepExecutionMode, ) -> Tuple[NextStepReference, OutputsLookup]: image = get_image( step=step, @@ -143,7 +149,7 @@ async def run_condition_step( outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], - step_execution_mode: StepExecutionMode = StepExecutionMode.LOCAL, + step_execution_mode: StepExecutionMode, ) -> Tuple[NextStepReference, OutputsLookup]: left_value = resolve_parameter( selector_or_value=step.left, @@ -166,7 +172,7 @@ async def run_detection_filter( outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], - step_execution_mode: StepExecutionMode = StepExecutionMode.LOCAL, + step_execution_mode: StepExecutionMode, ) -> Tuple[NextStepReference, OutputsLookup]: predictions = resolve_parameter( selector_or_value=step.predictions, @@ -218,7 +224,7 @@ async def run_detection_offset_step( outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], - step_execution_mode: StepExecutionMode = StepExecutionMode.LOCAL, + step_execution_mode: StepExecutionMode, ) -> Tuple[NextStepReference, OutputsLookup]: detections = resolve_parameter( selector_or_value=step.predictions, @@ -277,7 +283,7 @@ async def run_static_crop_step( outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], - step_execution_mode: StepExecutionMode = StepExecutionMode.LOCAL, + step_execution_mode: StepExecutionMode, ) -> Tuple[NextStepReference, OutputsLookup]: image = get_image( step=step, @@ -372,7 +378,7 @@ async def run_detections_consensus_step( outputs_lookup: OutputsLookup, model_manager: ModelManager, api_key: Optional[str], - step_execution_mode: StepExecutionMode = StepExecutionMode.LOCAL, + step_execution_mode: StepExecutionMode, ) -> Tuple[NextStepReference, OutputsLookup]: resolve_parameter_closure = partial( resolve_parameter, @@ -821,3 +827,57 @@ def aggregate_field_values( ) -> float: values = [d[field] for d in detections] return AGGREGATION_MODE2FIELD_AGGREGATOR[aggregation_mode](values) + + +async def run_active_learning_data_collector( + step: ActiveLearningDataCollector, + runtime_parameters: Dict[str, Any], + outputs_lookup: OutputsLookup, + model_manager: ModelManager, + api_key: Optional[str], + step_execution_mode: StepExecutionMode, + active_learning_middleware: WorkflowsActiveLearningMiddleware, + background_tasks: Optional[BackgroundTasks], +) -> Tuple[NextStepReference, OutputsLookup]: + resolve_parameter_closure = partial( + resolve_parameter, + runtime_parameters=runtime_parameters, + outputs_lookup=outputs_lookup, + ) + image = get_image( + step=step, + runtime_parameters=runtime_parameters, + outputs_lookup=outputs_lookup, + ) + images_meta_selector = construct_selector_pointing_step_output( + selector=step.predictions, + new_output="image", + ) + images_meta = resolve_parameter_closure(images_meta_selector) + predictions = resolve_parameter_closure(step.predictions) + predictions_output_name = get_last_selector_chunk(step.predictions) + target_dataset = resolve_parameter_closure(step.target_dataset) + target_dataset_api_key = resolve_parameter_closure(step.target_dataset_api_key) + disable_active_learning = resolve_parameter_closure(step.disable_active_learning) + active_learning_compatible_predictions = [ + {"image": image_meta, predictions_output_name: prediction} + for image_meta, prediction in zip(images_meta, predictions) + ] + active_learning_middleware.register( + # this should actually be asyncio, but that requires a lot of backend components redesign + dataset_name=target_dataset, + images=image, + predictions=active_learning_compatible_predictions, + api_key=target_dataset_api_key or api_key, + active_learning_disabled_for_request=disable_active_learning, + prediction_type=( + "classification" if predictions_output_name == "top" else "object-detection" + ), + # This is quick and dirty assumption - that shall work as samplers are suited to distinguish + # between cls and non-cls predictions and at this stage, we do also only recognise those 2 + # states - to further divide between tasks, each prediction step should provide output metadata + # that can be referred using `step.predictions` with output name replaced to `task_type` or sth + # similar + background_tasks=background_tasks, + ) + return None, outputs_lookup diff --git a/inference/enterprise/workflows/complier/steps_executors/utils.py b/inference/enterprise/workflows/complier/steps_executors/utils.py index 6b336b063..b79839a32 100644 --- a/inference/enterprise/workflows/complier/steps_executors/utils.py +++ b/inference/enterprise/workflows/complier/steps_executors/utils.py @@ -10,6 +10,7 @@ ) from inference.enterprise.workflows.entities.steps import ( AbsoluteStaticCrop, + ActiveLearningDataCollector, ClipComparison, Crop, OCRModel, @@ -33,6 +34,7 @@ def get_image( AbsoluteStaticCrop, RelativeStaticCrop, ClipComparison, + ActiveLearningDataCollector, ], runtime_parameters: Dict[str, Any], outputs_lookup: OutputsLookup, diff --git a/inference/enterprise/workflows/entities/steps.py b/inference/enterprise/workflows/entities/steps.py index f4c93575e..01ee8840c 100644 --- a/inference/enterprise/workflows/entities/steps.py +++ b/inference/enterprise/workflows/entities/steps.py @@ -6,6 +6,7 @@ from inference.enterprise.workflows.entities.base import GraphNone from inference.enterprise.workflows.entities.validators import ( + get_last_selector_chunk, is_selector, validate_field_has_given_type, validate_field_is_empty_or_selector_or_list_of_string, @@ -1121,3 +1122,149 @@ def _validate_required_objects_binding(self, value: Any) -> None: field_name=f"required_objects[{k}]", error=VariableTypeError, ) + + +ACTIVE_LEARNING_DATA_COLLECTOR_ELIGIBLE_SELECTORS = { + "ObjectDetectionModel": "predictions", + "KeypointsDetectionModel": "predictions", + "InstanceSegmentationModel": "predictions", + "DetectionFilter": "predictions", + "DetectionsConsensus": "predictions", + "DetectionOffset": "predictions", + "ClassificationModel": "top", +} + + +class ActiveLearningDataCollector(BaseModel, StepInterface): + type: Literal["ActiveLearningDataCollector"] + name: str + image: str + predictions: str + target_dataset: str + target_dataset_api_key: Optional[str] = Field(default=None) + disable_active_learning: Union[bool, str] = Field(default=False) + + @classmethod + @field_validator("image") + def image_must_only_hold_selectors(cls, value: Any) -> Union[str, List[str]]: + validate_image_is_valid_selector(value=value) + return value + + @classmethod + @field_validator("predictions") + def predictions_must_hold_selector(cls, value: Any) -> str: + if not is_selector(selector_or_value=value): + raise ValueError("`predictions` field can only contain selector values") + return value + + @classmethod + @field_validator("target_dataset") + def validate_target_dataset_field(cls, value: Any) -> Union[str, bool]: + validate_field_is_selector_or_has_given_type( + value=value, field_name="target_dataset", allowed_types=[bool] + ) + return value + + @classmethod + @field_validator("target_dataset") + def validate_target_dataset_api_key_field(cls, value: Any) -> Union[str, bool]: + validate_field_is_selector_or_has_given_type( + value=value, + field_name="target_dataset_api_key", + allowed_types=[bool, type(None)], + ) + return value + + @classmethod + @field_validator("disable_active_learning") + def validate_boolean_flags_or_selectors(cls, value: Any) -> Union[str, bool]: + validate_field_is_selector_or_has_given_type( + value=value, field_name="class_aware", allowed_types=[bool] + ) + return value + + def get_type(self) -> str: + return self.type + + def get_input_names(self) -> Set[str]: + return { + "image", + "predictions", + "target_dataset", + "target_dataset_api_key", + "disable_active_learning", + } + + def get_output_names(self) -> Set[str]: + return set() + + def validate_field_selector( + self, field_name: str, input_step: GraphNone, index: Optional[int] = None + ) -> None: + selector = getattr(self, field_name) + if not is_selector(selector_or_value=selector): + raise ExecutionGraphError( + f"Attempted to validate selector value for field {field_name}, but field is not selector." + ) + if field_name == "predictions": + input_step_type = input_step.get_type() + expected_last_selector_chunk = ( + ACTIVE_LEARNING_DATA_COLLECTOR_ELIGIBLE_SELECTORS.get(input_step_type) + ) + if expected_last_selector_chunk is None: + raise ExecutionGraphError( + f"Attempted to validate predictions selector of {self.name} step, but input step of type: " + f"{input_step_type} does match by type." + ) + if get_last_selector_chunk(selector) != expected_last_selector_chunk: + raise ExecutionGraphError( + f"It is only allowed to refer to {input_step_type} step output named {expected_last_selector_chunk}. " + f"Reference that was found: {selector}" + ) + input_step_image = getattr(input_step, "image", self.image) + if input_step_image != self.image: + raise ExecutionGraphError( + f"ActiveLearningDataCollector step refers to input step that uses reference to different image. " + f"ActiveLearningDataCollector step image: {self.image}. Input step (of type {input_step_image}) " + f"uses {input_step_image}." + ) + validate_selector_holds_image( + step_type=self.type, + field_name=field_name, + input_step=input_step, + ) + validate_selector_is_inference_parameter( + step_type=self.type, + field_name=field_name, + input_step=input_step, + applicable_fields={ + "target_dataset", + "target_dataset_api_key", + "disable_active_learning", + }, + ) + + def validate_field_binding(self, field_name: str, value: Any) -> None: + if field_name == "image": + validate_image_biding(value=value) + elif field_name in {"disable_active_learning"}: + validate_field_has_given_type( + field_name=field_name, + allowed_types=[bool], + value=value, + error=VariableTypeError, + ) + elif field_name in {"target_dataset"}: + validate_field_has_given_type( + field_name=field_name, + allowed_types=[str], + value=value, + error=VariableTypeError, + ) + elif field_name in {"target_dataset_api_key"}: + validate_field_has_given_type( + field_name=field_name, + allowed_types=[str, type(None)], + value=value, + error=VariableTypeError, + ) diff --git a/inference/enterprise/workflows/entities/workflows_specification.py b/inference/enterprise/workflows/entities/workflows_specification.py index 4cfad3ab4..6adc51145 100644 --- a/inference/enterprise/workflows/entities/workflows_specification.py +++ b/inference/enterprise/workflows/entities/workflows_specification.py @@ -9,6 +9,7 @@ from inference.enterprise.workflows.entities.outputs import JsonField from inference.enterprise.workflows.entities.steps import ( AbsoluteStaticCrop, + ActiveLearningDataCollector, ClassificationModel, ClipComparison, Condition, @@ -43,6 +44,7 @@ RelativeStaticCrop, AbsoluteStaticCrop, DetectionsConsensus, + ActiveLearningDataCollector, ], Field(discriminator="type"), ] diff --git a/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py b/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py index cd78eac1e..d5538b6e8 100644 --- a/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py +++ b/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py @@ -4,6 +4,7 @@ import numpy as np import pytest +from inference.enterprise.workflows.complier.entities import StepExecutionMode from inference.enterprise.workflows.complier.steps_executors import auxiliary from inference.enterprise.workflows.complier.steps_executors.auxiliary import ( aggregate_field_values, @@ -125,6 +126,7 @@ async def test_run_condition_step() -> None: outputs_lookup={"$steps.step_0": {"top": "cat"}}, model_manager=MagicMock(), api_key=None, + step_execution_mode=StepExecutionMode.LOCAL, ) # then @@ -219,6 +221,7 @@ async def test_run_detection_filter_step_when_batch_detections_given() -> None: }, model_manager=MagicMock(), api_key=None, + step_execution_mode=StepExecutionMode.LOCAL, ) # then diff --git a/tests/inference/unit_tests/enterprise/workflows/compiler/test_graph_parser.py b/tests/inference/unit_tests/enterprise/workflows/compiler/test_graph_parser.py index 83f68e17e..1f5e33a17 100644 --- a/tests/inference/unit_tests/enterprise/workflows/compiler/test_graph_parser.py +++ b/tests/inference/unit_tests/enterprise/workflows/compiler/test_graph_parser.py @@ -8,7 +8,6 @@ construct_graph, get_nodes_that_are_reachable_from_pointed_ones_in_reversed_graph, prepare_execution_graph, - verify_each_node_reach_at_least_one_output, ) from inference.enterprise.workflows.constants import ( INPUT_NODE_KIND, @@ -320,43 +319,6 @@ def test_construct_graph_when_detections_consensus_block_is_used() -> None: assert len(result.edges) == 5, "10 edges in total should be created" -def test_verify_each_node_reach_at_least_one_output_when_graph_is_valid() -> None: - # given - execution_graph = nx.DiGraph() - execution_graph.add_node("a", kind=INPUT_NODE_KIND) - execution_graph.add_node("b", kind=INPUT_NODE_KIND) - execution_graph.add_node("c", kind=STEP_NODE_KIND) - execution_graph.add_node("d", kind=STEP_NODE_KIND) - execution_graph.add_node("e", kind=OUTPUT_NODE_KIND) - execution_graph.add_node("f", kind=OUTPUT_NODE_KIND) - execution_graph.add_edge("a", "c") - execution_graph.add_edge("b", "d") - execution_graph.add_edge("c", "e") - execution_graph.add_edge("d", "f") - - # when - verify_each_node_reach_at_least_one_output(execution_graph=execution_graph) - - # then - no error raised - - -def test_verify_each_node_reach_at_least_one_output_when_graph_is_invalid() -> None: - # given - execution_graph = nx.DiGraph() - execution_graph.add_node("a", kind=INPUT_NODE_KIND) - execution_graph.add_node("b", kind=INPUT_NODE_KIND) - execution_graph.add_node("c", kind=STEP_NODE_KIND) - execution_graph.add_node("d", kind=STEP_NODE_KIND) - execution_graph.add_node("e", kind=OUTPUT_NODE_KIND) - execution_graph.add_edge("a", "c") - execution_graph.add_edge("b", "d") - execution_graph.add_edge("c", "e") - - # when - with pytest.raises(NodesNotReachingOutputError): - verify_each_node_reach_at_least_one_output(execution_graph=execution_graph) - - def test_get_nodes_that_are_reachable_from_pointed_ones_in_reversed_graph() -> None: # given execution_graph = nx.DiGraph() @@ -496,8 +458,15 @@ def test_prepare_execution_graph_when_graph_node_does_not_reach_output() -> None ) # when - with pytest.raises(NodesNotReachingOutputError): - _ = prepare_execution_graph(workflow_specification=workflow_specification) + execution_graph = prepare_execution_graph(workflow_specification=workflow_specification) + + # then + assert len(execution_graph.edges) == 4, "4 edges are expected to be created" + assert execution_graph.has_edge("$inputs.image", "$steps.step_1"), "Input must be connected to step_1" + assert execution_graph.has_edge("$inputs.image", "$steps.step_2"), "Input must be connected to step_2" + assert execution_graph.has_edge("$steps.step_1", "$steps.step_2"), "step_1 must be connected to step_2" + assert execution_graph.has_edge("$steps.step_1", "$outputs.predictions"), "step_1 output must be connected to output" + def test_prepare_execution_graph_when_graph_when_there_is_a_collapse_of_condition_branch() -> ( From 89a39a796442ab4a1d08575680be2281efe5f36f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Tue, 13 Feb 2024 19:43:05 +0100 Subject: [PATCH 04/18] Add extension to output prediction type from model steps and steps that transform predictions --- .../complier/steps_executors/auxiliary.py | 56 +++++++++++++++---- .../complier/steps_executors/models.py | 31 +++++++++- .../enterprise/workflows/entities/steps.py | 11 ++-- .../steps_executors/test_auxiliary.py | 3 + .../workflows/compiler/test_utils.py | 1 + 5 files changed, 84 insertions(+), 18 deletions(-) diff --git a/inference/enterprise/workflows/complier/steps_executors/auxiliary.py b/inference/enterprise/workflows/complier/steps_executors/auxiliary.py index a446df3fe..1d6ad1019 100644 --- a/inference/enterprise/workflows/complier/steps_executors/auxiliary.py +++ b/inference/enterprise/workflows/complier/steps_executors/auxiliary.py @@ -188,6 +188,15 @@ async def run_detection_filter( runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) + prediction_type_selector = construct_selector_pointing_step_output( + selector=step.predictions, + new_output="prediction_type", + ) + predictions_type = resolve_parameter( + selector_or_value=prediction_type_selector, + runtime_parameters=runtime_parameters, + outputs_lookup=outputs_lookup, + ) filter_callable = build_filter_callable(definition=step.filter_definition) result_detections, result_parent_id = [], [] for prediction in predictions: @@ -196,8 +205,10 @@ async def run_detection_filter( result_parent_id.append([p[PARENT_ID_KEY] for p in filtered_predictions]) step_selector = construct_step_selector(step_name=step.name) outputs_lookup[step_selector] = [ - {"predictions": d, PARENT_ID_KEY: p, "image": i} - for d, p, i in zip(result_detections, result_parent_id, images_meta) + {"predictions": d, PARENT_ID_KEY: p, "image": i, "prediction_type": pt} + for d, p, i, pt in zip( + result_detections, result_parent_id, images_meta, predictions_type + ) ] return None, outputs_lookup @@ -240,6 +251,15 @@ async def run_detection_offset_step( runtime_parameters=runtime_parameters, outputs_lookup=outputs_lookup, ) + prediction_type_selector = construct_selector_pointing_step_output( + selector=step.predictions, + new_output="prediction_type", + ) + predictions_type = resolve_parameter( + selector_or_value=prediction_type_selector, + runtime_parameters=runtime_parameters, + outputs_lookup=outputs_lookup, + ) offset_x = resolve_parameter( selector_or_value=step.offset_x, runtime_parameters=runtime_parameters, @@ -260,8 +280,10 @@ async def run_detection_offset_step( result_parent_id.append([d[PARENT_ID_KEY] for d in offset_detections]) step_selector = construct_step_selector(step_name=step.name) outputs_lookup[step_selector] = [ - {"predictions": d, PARENT_ID_KEY: p, "image": i} - for d, p, i in zip(result_detections, result_parent_id, images_meta) + {"predictions": d, PARENT_ID_KEY: p, "image": i, "prediction_type": pt} + for d, p, i, pt in zip( + result_detections, result_parent_id, images_meta, predictions_type + ) ] return None, outputs_lookup @@ -428,6 +450,7 @@ async def run_detections_consensus_step( "object_present": object_present, "presence_confidence": presence_confidence, "image": images_meta[batch_index], + "prediction_type": "object-detection", } ) outputs_lookup[construct_step_selector(step_name=step.name)] = results @@ -854,6 +877,22 @@ async def run_active_learning_data_collector( new_output="image", ) images_meta = resolve_parameter_closure(images_meta_selector) + prediction_type_selector = construct_selector_pointing_step_output( + selector=step.predictions, + new_output="prediction_type", + ) + predictions_type = resolve_parameter( + selector_or_value=prediction_type_selector, + runtime_parameters=runtime_parameters, + outputs_lookup=outputs_lookup, + ) + prediction_type = set(predictions_type) + if len(prediction_type) > 1: + raise ExecutionGraphError( + f"Active Learning data collection step requires only single prediction " + f"type to be part of ingest. Detected: {prediction_type}." + ) + prediction_type = next(iter(prediction_type)) predictions = resolve_parameter_closure(step.predictions) predictions_output_name = get_last_selector_chunk(step.predictions) target_dataset = resolve_parameter_closure(step.target_dataset) @@ -870,14 +909,7 @@ async def run_active_learning_data_collector( predictions=active_learning_compatible_predictions, api_key=target_dataset_api_key or api_key, active_learning_disabled_for_request=disable_active_learning, - prediction_type=( - "classification" if predictions_output_name == "top" else "object-detection" - ), - # This is quick and dirty assumption - that shall work as samplers are suited to distinguish - # between cls and non-cls predictions and at this stage, we do also only recognise those 2 - # states - to further divide between tasks, each prediction step should provide output metadata - # that can be referred using `step.predictions` with output name replaced to `task_type` or sth - # similar + prediction_type=prediction_type, background_tasks=background_tasks, ) return None, outputs_lookup diff --git a/inference/enterprise/workflows/complier/steps_executors/models.py b/inference/enterprise/workflows/complier/steps_executors/models.py index 1d056d70b..9f795a686 100644 --- a/inference/enterprise/workflows/complier/steps_executors/models.py +++ b/inference/enterprise/workflows/complier/steps_executors/models.py @@ -27,7 +27,6 @@ from inference.enterprise.workflows.complier.steps_executors.constants import ( CENTER_X_KEY, CENTER_Y_KEY, - DETECTION_ID_KEY, ORIGIN_COORDINATES_KEY, ORIGIN_SIZE_KEY, PARENT_COORDINATES_SUFFIX, @@ -55,6 +54,14 @@ ) from inference_sdk import InferenceConfiguration, InferenceHTTPClient +MODEL_TYPE2PREDICTION_TYPE = { + "ClassificationModel": "classification", + "MultiLabelClassificationModel": "classification", + "ObjectDetectionModel": "object-detection", + "InstanceSegmentationModel": "instance-segmentation", + "KeypointsDetectionModel": "keypoint-detection", +} + async def run_roboflow_model_step( step: RoboflowModel, @@ -93,6 +100,10 @@ async def run_roboflow_model_step( outputs_lookup=outputs_lookup, api_key=api_key, ) + serialised_result = attach_prediction_type_info( + results=serialised_result, + prediction_type=MODEL_TYPE2PREDICTION_TYPE[step.get_type()], + ) if step.type in {"ClassificationModel", "MultiLabelClassificationModel"}: serialised_result = attach_parent_info( image=image, results=serialised_result, nested_key=None @@ -407,6 +418,10 @@ async def run_ocr_model_step( results=serialised_result, nested_key=None, ) + serialised_result = attach_prediction_type_info( + results=serialised_result, + prediction_type="ocr", + ) outputs_lookup[construct_step_selector(step_name=step.name)] = serialised_result return None, outputs_lookup @@ -496,6 +511,10 @@ async def run_clip_comparison_step( results=serialised_result, nested_key=None, ) + serialised_result = attach_prediction_type_info( + results=serialised_result, + prediction_type="embeddings-comparison", + ) outputs_lookup[construct_step_selector(step_name=step.name)] = serialised_result return None, outputs_lookup @@ -573,6 +592,16 @@ def load_core_model( return core_model_id +def attach_prediction_type_info( + results: List[Dict[str, Any]], + prediction_type: str, + key: str = "prediction_type", +) -> List[Dict[str, Any]]: + for result in results: + result[key] = prediction_type + return results + + def attach_parent_info( image: List[Dict[str, Any]], results: List[Dict[str, Any]], diff --git a/inference/enterprise/workflows/entities/steps.py b/inference/enterprise/workflows/entities/steps.py index 01ee8840c..438cfa254 100644 --- a/inference/enterprise/workflows/entities/steps.py +++ b/inference/enterprise/workflows/entities/steps.py @@ -105,7 +105,7 @@ def get_input_names(self) -> Set[str]: return {"image", "model_id", "disable_active_learning"} def get_output_names(self) -> Set[str]: - return set() + return {"prediction_type"} def validate_field_selector( self, field_name: str, input_step: GraphNone, index: Optional[int] = None @@ -489,7 +489,7 @@ def get_input_names(self) -> Set[str]: return {"image"} def get_output_names(self) -> Set[str]: - return {"result", "parent_id"} + return {"result", "parent_id", "prediction_type"} class Crop(BaseModel, StepInterface): @@ -624,7 +624,7 @@ def get_input_names(self) -> Set[str]: return {"predictions"} def get_output_names(self) -> Set[str]: - return {"predictions", "parent_id", "image"} + return {"predictions", "parent_id", "image", "prediction_type"} def validate_field_selector( self, field_name: str, input_step: GraphNone, index: Optional[int] = None @@ -660,7 +660,7 @@ def get_input_names(self) -> Set[str]: return {"predictions", "offset_x", "offset_y"} def get_output_names(self) -> Set[str]: - return {"predictions", "parent_id", "image"} + return {"predictions", "parent_id", "image", "predictions_type"} def validate_field_selector( self, field_name: str, input_step: GraphNone, index: Optional[int] = None @@ -890,7 +890,7 @@ def get_input_names(self) -> Set[str]: return {"image", "text"} def get_output_names(self) -> Set[str]: - return {"similarity", "parent_id"} + return {"similarity", "parent_id", "predictions_type"} class AggregationMode(Enum): @@ -1013,6 +1013,7 @@ def get_output_names(self) -> Set[str]: "image", "object_present", "presence_confidence", + "predictions_type", } def validate_field_selector( diff --git a/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py b/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py index d5538b6e8..1202157eb 100644 --- a/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py +++ b/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py @@ -217,6 +217,7 @@ async def test_run_detection_filter_step_when_batch_detections_given() -> None: "$steps.step_1": { "predictions": detections, "image": [{"height": 100, "width": 100}] * 2, + "prediction_type": ["object-detection"] * 2 } }, model_manager=MagicMock(), @@ -226,6 +227,8 @@ async def test_run_detection_filter_step_when_batch_detections_given() -> None: # then assert next_step is None, "Next step should not be set here" + assert outputs_lookup["$steps.step_2"][0]["prediction_type"] == "object-detection", "Prediction type must be preserved" + assert outputs_lookup["$steps.step_2"][1]["prediction_type"] == "object-detection", "Prediction type must be preserved" assert outputs_lookup["$steps.step_2"][0]["predictions"] == [ { "x": 10, diff --git a/tests/inference/unit_tests/enterprise/workflows/compiler/test_utils.py b/tests/inference/unit_tests/enterprise/workflows/compiler/test_utils.py index 8e30ce9f3..185b8f436 100644 --- a/tests/inference/unit_tests/enterprise/workflows/compiler/test_utils.py +++ b/tests/inference/unit_tests/enterprise/workflows/compiler/test_utils.py @@ -262,6 +262,7 @@ def test_get_steps_output_selectors() -> None: "$steps.my_model.image", "$steps.my_model.predictions", "$steps.my_model.parent_id", + "$steps.my_model.prediction_type", }, "Each step output must be prefixed with $steps. and name of step. Crop step defines `crops` and `parent_id` outputs, object detection defines `image`, `predictions` and `parent_id`" From 134eb87322942fb19400724f6bb173d8fdc652da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 10:53:00 +0100 Subject: [PATCH 05/18] Bring back validation of dangling nodes --- .../workflows/complier/graph_parser.py | 39 ++++++ .../workflows/compiler/test_graph_parser.py | 116 +++++++++++++++++- 2 files changed, 150 insertions(+), 5 deletions(-) diff --git a/inference/enterprise/workflows/complier/graph_parser.py b/inference/enterprise/workflows/complier/graph_parser.py index 2dc2f9558..f3bfead98 100644 --- a/inference/enterprise/workflows/complier/graph_parser.py +++ b/inference/enterprise/workflows/complier/graph_parser.py @@ -39,6 +39,7 @@ def prepare_execution_graph(workflow_specification: WorkflowSpecificationV1) -> execution_graph = construct_graph(workflow_specification=workflow_specification) if not nx.is_directed_acyclic_graph(execution_graph): raise NotAcyclicGraphError(f"Detected cycle in execution graph.") + verify_each_node_reach_at_least_one_output(execution_graph=execution_graph) verify_each_node_step_has_parent_in_the_same_branch(execution_graph=execution_graph) verify_that_steps_are_connected_with_compatible_inputs( execution_graph=execution_graph @@ -188,6 +189,44 @@ def verify_edge_is_created_between_existing_nodes( ) +def verify_each_node_reach_at_least_one_output( + execution_graph: DiGraph, +) -> None: + all_nodes = set(execution_graph.nodes()) + output_nodes = get_nodes_of_specific_kind( + execution_graph=execution_graph, kind=OUTPUT_NODE_KIND + ) + nodes_without_outputs = get_nodes_that_do_not_produce_outputs( + execution_graph=execution_graph + ) + nodes_that_must_be_reached = output_nodes.union(nodes_without_outputs) + nodes_reaching_output = ( + get_nodes_that_are_reachable_from_pointed_ones_in_reversed_graph( + execution_graph=execution_graph, + pointed_nodes=nodes_that_must_be_reached, + ) + ) + nodes_not_reaching_output = all_nodes.difference(nodes_reaching_output) + if len(nodes_not_reaching_output) > 0: + raise NodesNotReachingOutputError( + f"Detected {len(nodes_not_reaching_output)} nodes not reaching any of output node:" + f"{nodes_not_reaching_output}." + ) + + +def get_nodes_that_do_not_produce_outputs(execution_graph: DiGraph) -> Set[str]: + # assumption is that nodes without outputs will produce some side effect and shall be + # treated as output nodes while checking if there is no dangling steps in graph + step_nodes = get_nodes_of_specific_kind( + execution_graph=execution_graph, kind=STEP_NODE_KIND + ) + return { + step_node + for step_node in step_nodes + if len(execution_graph.nodes[step_node]["definition"].get_output_names()) == 0 + } + + def get_nodes_that_are_reachable_from_pointed_ones_in_reversed_graph( execution_graph: DiGraph, pointed_nodes: Set[str], diff --git a/tests/inference/unit_tests/enterprise/workflows/compiler/test_graph_parser.py b/tests/inference/unit_tests/enterprise/workflows/compiler/test_graph_parser.py index 1f5e33a17..dd76b0bc9 100644 --- a/tests/inference/unit_tests/enterprise/workflows/compiler/test_graph_parser.py +++ b/tests/inference/unit_tests/enterprise/workflows/compiler/test_graph_parser.py @@ -7,7 +7,7 @@ add_steps_nodes_for_graph, construct_graph, get_nodes_that_are_reachable_from_pointed_ones_in_reversed_graph, - prepare_execution_graph, + prepare_execution_graph, verify_each_node_reach_at_least_one_output, ) from inference.enterprise.workflows.constants import ( INPUT_NODE_KIND, @@ -19,7 +19,7 @@ InferenceParameter, ) from inference.enterprise.workflows.entities.outputs import JsonField -from inference.enterprise.workflows.entities.steps import Crop, ObjectDetectionModel +from inference.enterprise.workflows.entities.steps import Crop, ObjectDetectionModel, ActiveLearningDataCollector from inference.enterprise.workflows.entities.workflows_specification import ( WorkflowSpecificationV1, ) @@ -319,6 +319,111 @@ def test_construct_graph_when_detections_consensus_block_is_used() -> None: assert len(result.edges) == 5, "10 edges in total should be created" +def test_verify_each_node_reach_at_least_one_output_when_all_steps_are_connected_to_inputs_and_outputs() -> None: + # given + example_step = Crop( + type="Crop", + name="my_crop", + image="$inputs.image", + detections="$steps.detect_2.predictions", + ) + execution_graph = nx.DiGraph() + execution_graph.add_node("a", kind=INPUT_NODE_KIND) + execution_graph.add_node("b", kind=INPUT_NODE_KIND) + execution_graph.add_node("c", kind=STEP_NODE_KIND, definition=example_step) + execution_graph.add_node("d", kind=STEP_NODE_KIND, definition=example_step) + execution_graph.add_node("e", kind=OUTPUT_NODE_KIND) + execution_graph.add_node("f", kind=OUTPUT_NODE_KIND) + execution_graph.add_edge("a", "c") + execution_graph.add_edge("b", "d") + execution_graph.add_edge("c", "e") + execution_graph.add_edge("d", "f") + + # when + verify_each_node_reach_at_least_one_output(execution_graph=execution_graph) + + # then - no error raised + + +def test_verify_each_node_reach_at_least_one_output_when_there_is_step_with_outputs_defined_not_connected_to_output_node() -> None: + # given + example_step = Crop( + type="Crop", + name="my_crop", + image="$inputs.image", + detections="$steps.detect_2.predictions", + ) + execution_graph = nx.DiGraph() + execution_graph.add_node("a", kind=INPUT_NODE_KIND) + execution_graph.add_node("b", kind=INPUT_NODE_KIND) + execution_graph.add_node("c", kind=STEP_NODE_KIND, definition=example_step) + execution_graph.add_node("d", kind=STEP_NODE_KIND, definition=example_step) + execution_graph.add_node("e", kind=OUTPUT_NODE_KIND) + execution_graph.add_edge("a", "c") + execution_graph.add_edge("b", "d") + execution_graph.add_edge("c", "e") + + # when + with pytest.raises(NodesNotReachingOutputError): + verify_each_node_reach_at_least_one_output(execution_graph=execution_graph) + + +def test_verify_each_node_reach_at_least_one_output_when_there_is_input_node_not_used() -> None: + # given + example_step = Crop( + type="Crop", + name="my_crop", + image="$inputs.image", + detections="$steps.detect_2.predictions", + ) + execution_graph = nx.DiGraph() + execution_graph.add_node("a", kind=INPUT_NODE_KIND) + execution_graph.add_node("b", kind=INPUT_NODE_KIND) + execution_graph.add_node("c", kind=STEP_NODE_KIND, definition=example_step) + execution_graph.add_node("d", kind=STEP_NODE_KIND, definition=example_step) + execution_graph.add_node("e", kind=OUTPUT_NODE_KIND) + execution_graph.add_edge("a", "c") + execution_graph.add_edge("a", "d") + execution_graph.add_edge("c", "e") + execution_graph.add_edge("d", "f") + + # when + with pytest.raises(NodesNotReachingOutputError): + verify_each_node_reach_at_least_one_output(execution_graph=execution_graph) + + +def test_verify_each_node_reach_at_least_one_output_when_there_is_a_step_executing_side_effect_not_connected_to_output() -> None: + # given + example_step = Crop( + type="Crop", + name="my_crop", + image="$inputs.image", + detections="$steps.detect_2.predictions", + ) + side_effect_step = ActiveLearningDataCollector( + type="ActiveLearningDataCollector", + name="al_block", + image="$inputs.image", + predictions="$steps.detect.predictions", + target_dataset="some", + ) + execution_graph = nx.DiGraph() + execution_graph.add_node("a", kind=INPUT_NODE_KIND) + execution_graph.add_node("b", kind=INPUT_NODE_KIND) # this one is not used + execution_graph.add_node("c", kind=STEP_NODE_KIND, definition=example_step) + execution_graph.add_node("d", kind=STEP_NODE_KIND, definition=side_effect_step) + execution_graph.add_node("e", kind=OUTPUT_NODE_KIND) + execution_graph.add_node("f", kind=OUTPUT_NODE_KIND) + execution_graph.add_edge("a", "c") + execution_graph.add_edge("b", "d") + execution_graph.add_edge("c", "e") + + # when + verify_each_node_reach_at_least_one_output(execution_graph=execution_graph) + + # then - no error raised + + def test_get_nodes_that_are_reachable_from_pointed_ones_in_reversed_graph() -> None: # given execution_graph = nx.DiGraph() @@ -425,7 +530,7 @@ def test_prepare_execution_graph_when_graph_is_not_acyclic() -> None: _ = prepare_execution_graph(workflow_specification=workflow_specification) -def test_prepare_execution_graph_when_graph_node_does_not_reach_output() -> None: +def test_prepare_execution_graph_when_graph_node_with_side_effect_step_does_not_reach_output() -> None: # given workflow_specification = WorkflowSpecificationV1.parse_obj( { @@ -441,10 +546,11 @@ def test_prepare_execution_graph_when_graph_node_does_not_reach_output() -> None "model_id": "vehicle-classification-eapcd/2", }, { - "type": "Crop", + "type": "ActiveLearningDataCollector", "name": "step_2", "image": "$inputs.image", - "detections": "$steps.step_1.predictions", + "predictions": "$steps.step_1.predictions", + "target_dataset": "some", }, ], "outputs": [ From b2efc389be2f35ef4ce97be6116443aea72a2392 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 10:56:09 +0100 Subject: [PATCH 06/18] Bump rc version --- inference/core/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inference/core/version.py b/inference/core/version.py index 785a770d0..96fa1cace 100644 --- a/inference/core/version.py +++ b/inference/core/version.py @@ -1,4 +1,4 @@ -__version__ = "0.9.10" +__version__ = "0.9.11rc1" if __name__ == "__main__": From 83eed51f8012f04c7eb5adead3d509cb0442d9f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 11:58:04 +0100 Subject: [PATCH 07/18] Add tests for validation of al data collector step definition --- .../enterprise/workflows/entities/steps.py | 18 +- .../steps_executors/test_auxiliary.py | 10 +- .../workflows/compiler/test_graph_parser.py | 50 +- .../workflows/entities/test_steps.py | 592 ++++++++++++++++++ 4 files changed, 645 insertions(+), 25 deletions(-) diff --git a/inference/enterprise/workflows/entities/steps.py b/inference/enterprise/workflows/entities/steps.py index 438cfa254..e3019a84a 100644 --- a/inference/enterprise/workflows/entities/steps.py +++ b/inference/enterprise/workflows/entities/steps.py @@ -1145,29 +1145,29 @@ class ActiveLearningDataCollector(BaseModel, StepInterface): target_dataset_api_key: Optional[str] = Field(default=None) disable_active_learning: Union[bool, str] = Field(default=False) - @classmethod @field_validator("image") + @classmethod def image_must_only_hold_selectors(cls, value: Any) -> Union[str, List[str]]: validate_image_is_valid_selector(value=value) return value - @classmethod @field_validator("predictions") + @classmethod def predictions_must_hold_selector(cls, value: Any) -> str: if not is_selector(selector_or_value=value): raise ValueError("`predictions` field can only contain selector values") return value - @classmethod @field_validator("target_dataset") - def validate_target_dataset_field(cls, value: Any) -> Union[str, bool]: + @classmethod + def validate_target_dataset_field(cls, value: Any) -> str: validate_field_is_selector_or_has_given_type( - value=value, field_name="target_dataset", allowed_types=[bool] + value=value, field_name="target_dataset", allowed_types=[str] ) return value + @field_validator("target_dataset_api_key") @classmethod - @field_validator("target_dataset") def validate_target_dataset_api_key_field(cls, value: Any) -> Union[str, bool]: validate_field_is_selector_or_has_given_type( value=value, @@ -1176,11 +1176,11 @@ def validate_target_dataset_api_key_field(cls, value: Any) -> Union[str, bool]: ) return value - @classmethod @field_validator("disable_active_learning") + @classmethod def validate_boolean_flags_or_selectors(cls, value: Any) -> Union[str, bool]: validate_field_is_selector_or_has_given_type( - value=value, field_name="class_aware", allowed_types=[bool] + value=value, field_name="disable_active_learning", allowed_types=[bool] ) return value @@ -1265,7 +1265,7 @@ def validate_field_binding(self, field_name: str, value: Any) -> None: elif field_name in {"target_dataset_api_key"}: validate_field_has_given_type( field_name=field_name, - allowed_types=[str, type(None)], + allowed_types=[str], value=value, error=VariableTypeError, ) diff --git a/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py b/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py index 1202157eb..eed644660 100644 --- a/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py +++ b/tests/inference/unit_tests/enterprise/workflows/compiler/steps_executors/test_auxiliary.py @@ -217,7 +217,7 @@ async def test_run_detection_filter_step_when_batch_detections_given() -> None: "$steps.step_1": { "predictions": detections, "image": [{"height": 100, "width": 100}] * 2, - "prediction_type": ["object-detection"] * 2 + "prediction_type": ["object-detection"] * 2, } }, model_manager=MagicMock(), @@ -227,8 +227,12 @@ async def test_run_detection_filter_step_when_batch_detections_given() -> None: # then assert next_step is None, "Next step should not be set here" - assert outputs_lookup["$steps.step_2"][0]["prediction_type"] == "object-detection", "Prediction type must be preserved" - assert outputs_lookup["$steps.step_2"][1]["prediction_type"] == "object-detection", "Prediction type must be preserved" + assert ( + outputs_lookup["$steps.step_2"][0]["prediction_type"] == "object-detection" + ), "Prediction type must be preserved" + assert ( + outputs_lookup["$steps.step_2"][1]["prediction_type"] == "object-detection" + ), "Prediction type must be preserved" assert outputs_lookup["$steps.step_2"][0]["predictions"] == [ { "x": 10, diff --git a/tests/inference/unit_tests/enterprise/workflows/compiler/test_graph_parser.py b/tests/inference/unit_tests/enterprise/workflows/compiler/test_graph_parser.py index dd76b0bc9..71e4d70c4 100644 --- a/tests/inference/unit_tests/enterprise/workflows/compiler/test_graph_parser.py +++ b/tests/inference/unit_tests/enterprise/workflows/compiler/test_graph_parser.py @@ -7,7 +7,8 @@ add_steps_nodes_for_graph, construct_graph, get_nodes_that_are_reachable_from_pointed_ones_in_reversed_graph, - prepare_execution_graph, verify_each_node_reach_at_least_one_output, + prepare_execution_graph, + verify_each_node_reach_at_least_one_output, ) from inference.enterprise.workflows.constants import ( INPUT_NODE_KIND, @@ -19,7 +20,11 @@ InferenceParameter, ) from inference.enterprise.workflows.entities.outputs import JsonField -from inference.enterprise.workflows.entities.steps import Crop, ObjectDetectionModel, ActiveLearningDataCollector +from inference.enterprise.workflows.entities.steps import ( + ActiveLearningDataCollector, + Crop, + ObjectDetectionModel, +) from inference.enterprise.workflows.entities.workflows_specification import ( WorkflowSpecificationV1, ) @@ -319,7 +324,9 @@ def test_construct_graph_when_detections_consensus_block_is_used() -> None: assert len(result.edges) == 5, "10 edges in total should be created" -def test_verify_each_node_reach_at_least_one_output_when_all_steps_are_connected_to_inputs_and_outputs() -> None: +def test_verify_each_node_reach_at_least_one_output_when_all_steps_are_connected_to_inputs_and_outputs() -> ( + None +): # given example_step = Crop( type="Crop", @@ -345,7 +352,9 @@ def test_verify_each_node_reach_at_least_one_output_when_all_steps_are_connected # then - no error raised -def test_verify_each_node_reach_at_least_one_output_when_there_is_step_with_outputs_defined_not_connected_to_output_node() -> None: +def test_verify_each_node_reach_at_least_one_output_when_there_is_step_with_outputs_defined_not_connected_to_output_node() -> ( + None +): # given example_step = Crop( type="Crop", @@ -368,7 +377,9 @@ def test_verify_each_node_reach_at_least_one_output_when_there_is_step_with_outp verify_each_node_reach_at_least_one_output(execution_graph=execution_graph) -def test_verify_each_node_reach_at_least_one_output_when_there_is_input_node_not_used() -> None: +def test_verify_each_node_reach_at_least_one_output_when_there_is_input_node_not_used() -> ( + None +): # given example_step = Crop( type="Crop", @@ -392,7 +403,9 @@ def test_verify_each_node_reach_at_least_one_output_when_there_is_input_node_not verify_each_node_reach_at_least_one_output(execution_graph=execution_graph) -def test_verify_each_node_reach_at_least_one_output_when_there_is_a_step_executing_side_effect_not_connected_to_output() -> None: +def test_verify_each_node_reach_at_least_one_output_when_there_is_a_step_executing_side_effect_not_connected_to_output() -> ( + None +): # given example_step = Crop( type="Crop", @@ -530,7 +543,9 @@ def test_prepare_execution_graph_when_graph_is_not_acyclic() -> None: _ = prepare_execution_graph(workflow_specification=workflow_specification) -def test_prepare_execution_graph_when_graph_node_with_side_effect_step_does_not_reach_output() -> None: +def test_prepare_execution_graph_when_graph_node_with_side_effect_step_does_not_reach_output() -> ( + None +): # given workflow_specification = WorkflowSpecificationV1.parse_obj( { @@ -564,15 +579,24 @@ def test_prepare_execution_graph_when_graph_node_with_side_effect_step_does_not_ ) # when - execution_graph = prepare_execution_graph(workflow_specification=workflow_specification) + execution_graph = prepare_execution_graph( + workflow_specification=workflow_specification + ) # then assert len(execution_graph.edges) == 4, "4 edges are expected to be created" - assert execution_graph.has_edge("$inputs.image", "$steps.step_1"), "Input must be connected to step_1" - assert execution_graph.has_edge("$inputs.image", "$steps.step_2"), "Input must be connected to step_2" - assert execution_graph.has_edge("$steps.step_1", "$steps.step_2"), "step_1 must be connected to step_2" - assert execution_graph.has_edge("$steps.step_1", "$outputs.predictions"), "step_1 output must be connected to output" - + assert execution_graph.has_edge( + "$inputs.image", "$steps.step_1" + ), "Input must be connected to step_1" + assert execution_graph.has_edge( + "$inputs.image", "$steps.step_2" + ), "Input must be connected to step_2" + assert execution_graph.has_edge( + "$steps.step_1", "$steps.step_2" + ), "step_1 must be connected to step_2" + assert execution_graph.has_edge( + "$steps.step_1", "$outputs.predictions" + ), "step_1 output must be connected to output" def test_prepare_execution_graph_when_graph_when_there_is_a_collapse_of_condition_branch() -> ( diff --git a/tests/inference/unit_tests/enterprise/workflows/entities/test_steps.py b/tests/inference/unit_tests/enterprise/workflows/entities/test_steps.py index db068c877..49a30e684 100644 --- a/tests/inference/unit_tests/enterprise/workflows/entities/test_steps.py +++ b/tests/inference/unit_tests/enterprise/workflows/entities/test_steps.py @@ -9,6 +9,7 @@ InferenceParameter, ) from inference.enterprise.workflows.entities.steps import ( + ActiveLearningDataCollector, AggregationMode, ClassificationModel, Condition, @@ -2329,3 +2330,594 @@ def test_detections_consensus_validate_field_binding_for_required_objects_when_v ) # then - no error + + +def test_validate_al_data_collector_when_valid_input_given() -> None: + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + } + + # when + _ = ActiveLearningDataCollector.parse_obj(specification) + + # then NO ERROR is raised + + +@pytest.mark.parametrize("image_selector", [1, None, "some", 1.3, True]) +def test_validate_al_data_collector_image_field_when_field_does_not_hold_selector( + image_selector: Any, +) -> None: + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": image_selector, + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + } + + # when + with pytest.raises(ValidationError): + _ = ActiveLearningDataCollector.parse_obj(specification) + + +@pytest.mark.parametrize("predictions_selector", [1, None, "some", 1.3, True]) +def test_validate_al_data_collector_predictions_field_when_field_does_not_hold_selector( + predictions_selector: Any, +) -> None: + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": predictions_selector, + "target_dataset": "some", + } + + # when + with pytest.raises(ValidationError): + _ = ActiveLearningDataCollector.parse_obj(specification) + + +@pytest.mark.parametrize("target_dataset", [1, None, 1.3, True]) +def test_validate_al_data_collector_target_dataset_field_when_field_contains_invalid_value( + target_dataset: Any, +) -> None: + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": target_dataset, + } + + # when + with pytest.raises(ValidationError): + _ = ActiveLearningDataCollector.parse_obj(specification) + + +@pytest.mark.parametrize("target_dataset_api_key", [1, 1.3, True]) +def test_validate_al_data_collector_target_dataset_api_key_field_when_field_contains_invalid_value( + target_dataset_api_key: Any, +) -> None: + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + "target_dataset_api_key": target_dataset_api_key, + } + + # when + with pytest.raises(ValidationError): + _ = ActiveLearningDataCollector.parse_obj(specification) + + +@pytest.mark.parametrize("disable_active_learning", ["some", 1.3]) +def test_validate_al_data_collector_disable_active_learning_field_when_field_contains_invalid_value( + disable_active_learning: Any, +) -> None: + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + "disable_active_learning": disable_active_learning, + } + + # when + with pytest.raises(ValidationError): + _ = ActiveLearningDataCollector.parse_obj(specification) + + +def test_al_data_collector_validate_field_selector_when_field_does_not_hold_selector() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + with pytest.raises(ExecutionGraphError): + step.validate_field_selector( + field_name="target_dataset", + input_step=ObjectDetectionModel( + type="ObjectDetectionModel", + name="some", + image="$inputs.image", + model_id="some/1", + ), + ) + + +def test_al_data_collector_validate_field_selector_when_prediction_field_refers_to_invalid_step() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + with pytest.raises(ExecutionGraphError): + step.validate_field_selector( + field_name="predictions", + input_step=Crop( + type="Crop", + name="some", + image="$inputs.image", + detections="$steps.detection.predictions", + ), + ) + + +def test_al_data_collector_validate_field_selector_when_prediction_field_refers_to_invalid_output_of_detection_step() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.image", + "target_dataset": "some", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + with pytest.raises(ExecutionGraphError): + step.validate_field_selector( + field_name="predictions", + input_step=ObjectDetectionModel( + type="ObjectDetectionModel", + name="some", + image="$inputs.image", + model_id="some/1", + ), + ) + + +def test_al_data_collector_validate_field_selector_when_prediction_field_refers_to_invalid_output_of_classification_step() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + with pytest.raises(ExecutionGraphError): + step.validate_field_selector( + field_name="predictions", + input_step=ClassificationModel( + type="ClassificationModel", + name="some", + image="$inputs.image", + model_id="some/1", + ), + ) + + +def test_al_data_collector_validate_field_selector_when_prediction_field_refers_to_valid_output_of_classification_step() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.top", + "target_dataset": "some", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + step.validate_field_selector( + field_name="predictions", + input_step=ClassificationModel( + type="ClassificationModel", + name="some", + image="$inputs.image", + model_id="some/1", + ), + ) + + # then - NO ERROR + + +def test_al_data_collector_validate_field_selector_when_prediction_field_refers_to_step_bounded_in_different_image() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + with pytest.raises(ExecutionGraphError): + step.validate_field_selector( + field_name="predictions", + input_step=ObjectDetectionModel( + type="ObjectDetectionModel", + name="some", + image="$inputs.image2", + model_id="some/1", + ), + ) + + +def test_al_data_collector_validate_field_selector_when_prediction_field_refers_to_step_which_cannot_be_verified_against_image_ref_correctness() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + step.validate_field_selector( + field_name="predictions", + input_step=DetectionFilter( + type="DetectionFilter", + name="detection", + predictions="$steps.det.predictions", + filter_definition=DetectionFilterDefinition( + type="DetectionFilterDefinition", + field_name="confidence", + operator="greater_than", + reference_value=0.3, + ), + ), + ) + + # then - NO ERROR + + +def test_al_data_collector_validate_field_selector_when_image_field_does_not_refer_image() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + with pytest.raises(ExecutionGraphError): + step.validate_field_selector( + field_name="image", + input_step=ObjectDetectionModel( + type="ObjectDetectionModel", + name="some", + image="$inputs.image2", + model_id="some/1", + ), + ) + + +def test_al_data_collector_validate_field_selector_when_image_field_refers_image() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + step.validate_field_selector( + field_name="image", + input_step=InferenceImage( + type="InferenceImage", + name="some", + ), + ) + + # then - NO ERROR + + +@pytest.mark.parametrize( + "field_name", + ["target_dataset", "target_dataset_api_key", "disable_active_learning"], +) +def test_al_data_collector_validate_fields_that_can_only_accept_inference_parameter_when_invalid_input_is_provided( + field_name: str, +) -> None: + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + with pytest.raises(ExecutionGraphError): + step.validate_field_selector( + field_name=field_name, + input_step=InferenceImage( + type="InferenceImage", + name="some", + ), + ) + + +@pytest.mark.parametrize( + "field_name", + ["target_dataset", "target_dataset_api_key", "disable_active_learning"], +) +def test_al_data_collector_validate_fields_that_can_only_accept_inference_parameter_when_valid_input_is_provided( + field_name: str, +) -> None: + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "$inputs.some", + "target_dataset_api_key": "$inputs.other", + "disable_active_learning": "$inputs.value", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + step.validate_field_selector( + field_name=field_name, + input_step=InferenceParameter( + type="InferenceParameter", + name="some", + ), + ) + + # then - NO ERROR + + +def test_al_data_collector_validate_image_binding_when_provided_value_is_valid() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "$inputs.some", + "target_dataset_api_key": "$inputs.other", + "disable_active_learning": "$inputs.value", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + step.validate_field_binding( + field_name="image", value={"type": "url", "value": "https://some.com/image.jpg"} + ) + + # then - NO ERROR + + +def test_al_data_collector_validate_image_binding_when_provided_value_is_invalid() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "$inputs.some", + "target_dataset_api_key": "$inputs.other", + "disable_active_learning": "$inputs.value", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + with pytest.raises(VariableTypeError): + step.validate_field_binding(field_name="image", value="invalid") + + +def test_al_data_collector_validate_disable_al_flag_binding_when_provided_value_is_valid() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "$inputs.some", + "target_dataset_api_key": "$inputs.other", + "disable_active_learning": "$inputs.value", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + step.validate_field_binding( + field_name="disable_active_learning", + value=True, + ) + + # then - NO ERROR + + +def test_al_data_collector_validate_disable_al_flag_binding_when_provided_value_is_invalid() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "$inputs.some", + "target_dataset_api_key": "$inputs.other", + "disable_active_learning": "$inputs.value", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + with pytest.raises(VariableTypeError): + step.validate_field_binding( + field_name="disable_active_learning", + value="some", + ) + + +def test_al_data_collector_validate_target_dataset_binding_when_provided_value_is_valid() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "$inputs.some", + "target_dataset_api_key": "$inputs.other", + "disable_active_learning": "$inputs.value", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + step.validate_field_binding( + field_name="target_dataset", + value="some", + ) + + # then - NO ERROR + + +def test_al_data_collector_validate_target_dataset_binding_when_provided_value_is_invalid() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "$inputs.some", + "target_dataset_api_key": "$inputs.other", + "disable_active_learning": "$inputs.value", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + with pytest.raises(VariableTypeError): + step.validate_field_binding( + field_name="target_dataset", + value=None, + ) + + +def test_al_data_collector_validate_target_dataset_api_key_binding_when_provided_value_is_valid() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "$inputs.some", + "target_dataset_api_key": "$inputs.other", + "disable_active_learning": "$inputs.value", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + step.validate_field_binding( + field_name="target_dataset_api_key", + value="some", + ) + + # then - NO ERROR + + +def test_al_data_collector_validate_target_dataset_api_key_binding_when_provided_value_is_invalid() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "$inputs.some", + "target_dataset_api_key": "$inputs.other", + "disable_active_learning": "$inputs.value", + } + step = ActiveLearningDataCollector.parse_obj(specification) + + # when + with pytest.raises(VariableTypeError): + step.validate_field_binding( + field_name="target_dataset_api_key", + value=None, + ) From cc34098ae9be5c16d39d5659606d5cd47db9e6c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 12:53:11 +0100 Subject: [PATCH 08/18] Add possibility to define AL config at the level of AL workflow steps --- .../enterprise/workflows/entities/steps.py | 105 ++++++++++- .../workflows/entities/test_steps.py | 171 +++++++++++++++++- 2 files changed, 271 insertions(+), 5 deletions(-) diff --git a/inference/enterprise/workflows/entities/steps.py b/inference/enterprise/workflows/entities/steps.py index e3019a84a..7f1380a40 100644 --- a/inference/enterprise/workflows/entities/steps.py +++ b/inference/enterprise/workflows/entities/steps.py @@ -1,8 +1,16 @@ from abc import ABCMeta, abstractmethod from enum import Enum -from typing import Annotated, Any, Dict, List, Literal, Optional, Set, Union - -from pydantic import BaseModel, ConfigDict, Field, field_validator +from typing import Annotated, Any, Dict, List, Literal, Optional, Set, Tuple, Union + +from pydantic import ( + BaseModel, + ConfigDict, + Field, + NonNegativeInt, + PositiveInt, + confloat, + field_validator, +) from inference.enterprise.workflows.entities.base import GraphNone from inference.enterprise.workflows.entities.validators import ( @@ -1136,6 +1144,94 @@ def _validate_required_objects_binding(self, value: Any) -> None: } +class DisabledActiveLearningConfiguration(BaseModel): + enabled: bool + + +class LimitDefinition(BaseModel): + type: Literal["minutely", "hourly", "daily"] + value: PositiveInt + + +class RandomSamplingConfig(BaseModel): + type: Literal["random"] + name: str + traffic_percentage: confloat(ge=0.0, le=1.0) + tags: List[str] = Field(default_factory=lambda: []) + limits: List[LimitDefinition] = Field(default_factory=lambda: []) + + +class CloseToThresholdSampling(BaseModel): + type: Literal["close_to_threshold"] + name: str + probability: confloat(ge=0.0, le=1.0) + threshold: confloat(ge=0.0, le=1.0) + epsilon: confloat(ge=0.0, le=1.0) + max_batch_images: Optional[int] = Field(default=None) + only_top_classes: bool = Field(default=True) + minimum_objects_close_to_threshold: int = Field(default=1) + selected_class_names: Optional[List[str]] = Field(default=None) + tags: List[str] = Field(default_factory=lambda: []) + limits: List[LimitDefinition] = Field(default_factory=lambda: []) + + +class ClassesBasedSampling(BaseModel): + type: Literal["classes_based"] + name: str + probability: confloat(ge=0.0, le=1.0) + selected_class_names: List[str] + tags: List[str] = Field(default_factory=lambda: []) + limits: List[LimitDefinition] = Field(default_factory=lambda: []) + + +class DetectionsBasedSampling(BaseModel): + type: Literal["detections_number_based"] + name: str + probability: confloat(ge=0.0, le=1.0) + more_than: Optional[NonNegativeInt] + less_than: Optional[NonNegativeInt] + selected_class_names: Optional[List[str]] = Field(default=None) + tags: List[str] = Field(default_factory=lambda: []) + limits: List[LimitDefinition] = Field(default_factory=lambda: []) + + +class ActiveLearningBatchingStrategy(BaseModel): + batches_name_prefix: str + recreation_interval: Literal["never", "daily", "weekly", "monthly"] + max_batch_images: Optional[int] = Field(default=None) + + +ActiveLearningStrategyType = Annotated[ + Union[ + RandomSamplingConfig, + CloseToThresholdSampling, + ClassesBasedSampling, + DetectionsBasedSampling, + ], + Field(discriminator="type"), +] + + +class EnabledActiveLearningConfiguration(BaseModel): + enabled: bool + persist_predictions: bool + sampling_strategies: List[ActiveLearningStrategyType] + batching_strategy: ActiveLearningBatchingStrategy + tags: List[str] = Field(default_factory=lambda: []) + max_image_size: Optional[Tuple[PositiveInt, PositiveInt]] = Field(default=None) + jpeg_compression_level: int = Field(default=95) + + @field_validator("jpeg_compression_level") + @classmethod + def validate_json_compression_level(cls, value: Any): + validate_field_has_given_type( + field_name="jpeg_compression_level", allowed_types=[int], value=value + ) + if value <= 0 or value > 100: + raise ValueError("`jpeg_compression_level` must be in range [1, 100]") + return value + + class ActiveLearningDataCollector(BaseModel, StepInterface): type: Literal["ActiveLearningDataCollector"] name: str @@ -1144,6 +1240,9 @@ class ActiveLearningDataCollector(BaseModel, StepInterface): target_dataset: str target_dataset_api_key: Optional[str] = Field(default=None) disable_active_learning: Union[bool, str] = Field(default=False) + active_learning_configuration: Optional[ + Union[EnabledActiveLearningConfiguration, DisabledActiveLearningConfiguration] + ] = Field(default=None) @field_validator("image") @classmethod diff --git a/tests/inference/unit_tests/enterprise/workflows/entities/test_steps.py b/tests/inference/unit_tests/enterprise/workflows/entities/test_steps.py index 49a30e684..67b31f6bf 100644 --- a/tests/inference/unit_tests/enterprise/workflows/entities/test_steps.py +++ b/tests/inference/unit_tests/enterprise/workflows/entities/test_steps.py @@ -9,21 +9,29 @@ InferenceParameter, ) from inference.enterprise.workflows.entities.steps import ( + ActiveLearningBatchingStrategy, ActiveLearningDataCollector, AggregationMode, + ClassesBasedSampling, ClassificationModel, + CloseToThresholdSampling, Condition, Crop, DetectionFilter, DetectionFilterDefinition, DetectionOffset, + DetectionsBasedSampling, DetectionsConsensus, + DisabledActiveLearningConfiguration, + EnabledActiveLearningConfiguration, InstanceSegmentationModel, KeypointsDetectionModel, + LimitDefinition, MultiLabelClassificationModel, ObjectDetectionModel, OCRModel, Operator, + RandomSamplingConfig, ) from inference.enterprise.workflows.errors import ( ExecutionGraphError, @@ -2343,9 +2351,168 @@ def test_validate_al_data_collector_when_valid_input_given() -> None: } # when - _ = ActiveLearningDataCollector.parse_obj(specification) + result = ActiveLearningDataCollector.parse_obj(specification) - # then NO ERROR is raised + # then + assert result == ActiveLearningDataCollector( + type="ActiveLearningDataCollector", + name="some", + image="$inputs.image", + predictions="$steps.detection.predictions", + target_dataset="some", + target_dataset_api_key=None, + disable_active_learning=False, + active_learning_configuration=None, + ) + + +def test_validate_al_data_collector_when_valid_input_with_disabled_al_config_given() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + "active_learning_configuration": {"enabled": False}, + } + + # when + result = ActiveLearningDataCollector.parse_obj(specification) + + # then + assert result == ActiveLearningDataCollector( + type="ActiveLearningDataCollector", + name="some", + image="$inputs.image", + predictions="$steps.detection.predictions", + target_dataset="some", + target_dataset_api_key=None, + disable_active_learning=False, + active_learning_configuration=DisabledActiveLearningConfiguration( + enabled=False + ), + ) + + +def test_validate_al_data_collector_when_valid_input_with_enabled_al_config_given() -> ( + None +): + # given + specification = { + "type": "ActiveLearningDataCollector", + "name": "some", + "image": "$inputs.image", + "predictions": "$steps.detection.predictions", + "target_dataset": "some", + "active_learning_configuration": { + "enabled": True, + "persist_predictions": True, + "sampling_strategies": [ + { + "type": "random", + "name": "a", + "traffic_percentage": 0.6, + "limits": [{"type": "daily", "value": 100}], + }, + { + "type": "close_to_threshold", + "name": "b", + "probability": 0.7, + "threshold": 0.5, + "epsilon": 0.25, + "tags": ["some"], + "limits": [{"type": "daily", "value": 200}], + }, + { + "type": "classes_based", + "name": "c", + "probability": 0.8, + "selected_class_names": ["a", "b", "c"], + "limits": [{"type": "daily", "value": 300}], + }, + { + "type": "detections_number_based", + "name": "d", + "probability": 0.9, + "more_than": 3, + "less_than": 5, + "limits": [{"type": "daily", "value": 400}], + }, + ], + "batching_strategy": { + "batches_name_prefix": "my_batches", + "recreation_interval": "monthly", + }, + }, + } + + # when + result = ActiveLearningDataCollector.parse_obj(specification) + + # then + assert result == ActiveLearningDataCollector( + type="ActiveLearningDataCollector", + name="some", + image="$inputs.image", + predictions="$steps.detection.predictions", + target_dataset="some", + target_dataset_api_key=None, + disable_active_learning=False, + active_learning_configuration=EnabledActiveLearningConfiguration( + enabled=True, + persist_predictions=True, + sampling_strategies=[ + RandomSamplingConfig( + type="random", + name="a", + traffic_percentage=0.6, + tags=[], + limits=[LimitDefinition(type="daily", value=100)], + ), + CloseToThresholdSampling( + type="close_to_threshold", + name="b", + probability=0.7, + threshold=0.5, + epsilon=0.25, + max_batch_images=None, + only_top_classes=True, + minimum_objects_close_to_threshold=1, + selected_class_names=None, + tags=["some"], + limits=[LimitDefinition(type="daily", value=200)], + ), + ClassesBasedSampling( + type="classes_based", + name="c", + probability=0.8, + selected_class_names=["a", "b", "c"], + tags=[], + limits=[LimitDefinition(type="daily", value=300)], + ), + DetectionsBasedSampling( + type="detections_number_based", + name="d", + probability=0.9, + more_than=3, + less_than=5, + selected_class_names=None, + tags=[], + limits=[LimitDefinition(type="daily", value=400)], + ), + ], + batching_strategy=ActiveLearningBatchingStrategy( + batches_name_prefix="my_batches", + recreation_interval="monthly", + ), + tags=[], + max_image_size=None, + jpeg_compression_level=95, + ), + ) @pytest.mark.parametrize("image_selector", [1, None, "some", 1.3, True]) From 8746357979773424626da618f008b4b2483d45ae Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 13:23:50 +0100 Subject: [PATCH 09/18] Connect AL configuration stated at step level to the middleware --- .../core/active_learning/configuration.py | 65 +++++++++++++++---- inference/core/active_learning/middlewares.py | 38 +++++++++++ .../active_learning_middlewares.py | 47 ++++++++++++-- .../complier/steps_executors/auxiliary.py | 1 + 4 files changed, 130 insertions(+), 21 deletions(-) diff --git a/inference/core/active_learning/configuration.py b/inference/core/active_learning/configuration.py index 3e79a0581..86274cf39 100644 --- a/inference/core/active_learning/configuration.py +++ b/inference/core/active_learning/configuration.py @@ -58,23 +58,37 @@ def prepare_active_learning_configuration( f"project: {project_metadata.dataset_id} of type: {project_metadata.dataset_type}. " f"AL configuration: {project_metadata.active_learning_configuration}" ) - sampling_methods = initialize_sampling_methods( - sampling_strategies_configs=project_metadata.active_learning_configuration[ - "sampling_strategies" - ], + return initialise_active_learning_configuration( + project_metadata=project_metadata, ) - target_workspace_id = project_metadata.active_learning_configuration.get( - "target_workspace", project_metadata.workspace_id + + +def prepare_active_learning_configuration_inplace( + api_key: str, + model_id: str, + active_learning_configuration: Optional[dict], +) -> Optional[ActiveLearningConfiguration]: + if ( + active_learning_configuration is None + or active_learning_configuration.get("enabled", False) is False + ): + return None + dataset_id, version_id = get_model_id_chunks(model_id=model_id) + workspace_id = get_roboflow_workspace(api_key=api_key) + dataset_type = get_roboflow_dataset_type( + api_key=api_key, + workspace_id=workspace_id, + dataset_id=dataset_id, ) - target_dataset_id = project_metadata.active_learning_configuration.get( - "target_project", project_metadata.dataset_id + project_metadata = RoboflowProjectMetadata( + dataset_id=dataset_id, + version_id=version_id, + workspace_id=workspace_id, + dataset_type=dataset_type, + active_learning_configuration=active_learning_configuration, ) - return ActiveLearningConfiguration.init( - roboflow_api_configuration=project_metadata.active_learning_configuration, - sampling_methods=sampling_methods, - workspace_id=target_workspace_id, - dataset_id=target_dataset_id, - model_id=model_id, + return initialise_active_learning_configuration( + project_metadata=project_metadata, ) @@ -145,6 +159,29 @@ def parse_cached_roboflow_project_metadata( ) from error +def initialise_active_learning_configuration( + project_metadata: RoboflowProjectMetadata, +) -> ActiveLearningConfiguration: + sampling_methods = initialize_sampling_methods( + sampling_strategies_configs=project_metadata.active_learning_configuration[ + "sampling_strategies" + ], + ) + target_workspace_id = project_metadata.active_learning_configuration.get( + "target_workspace", project_metadata.workspace_id + ) + target_dataset_id = project_metadata.active_learning_configuration.get( + "target_project", project_metadata.dataset_id + ) + return ActiveLearningConfiguration.init( + roboflow_api_configuration=project_metadata.active_learning_configuration, + sampling_methods=sampling_methods, + workspace_id=target_workspace_id, + dataset_id=target_dataset_id, + model_id=f"{project_metadata.dataset_id}/{project_metadata.version_id}", + ) + + def initialize_sampling_methods( sampling_strategies_configs: List[Dict[str, Any]] ) -> List[SamplingMethod]: diff --git a/inference/core/active_learning/middlewares.py b/inference/core/active_learning/middlewares.py index c6984c603..695d64c4d 100644 --- a/inference/core/active_learning/middlewares.py +++ b/inference/core/active_learning/middlewares.py @@ -8,6 +8,7 @@ from inference.core.active_learning.batching import generate_batch_name from inference.core.active_learning.configuration import ( prepare_active_learning_configuration, + prepare_active_learning_configuration_inplace, ) from inference.core.active_learning.core import ( execute_datapoint_registration, @@ -72,6 +73,21 @@ def init( cache=cache, ) + @classmethod + def init_from_config( + cls, api_key: str, model_id: str, cache: BaseCache, config: Optional[dict] + ) -> "ActiveLearningMiddleware": + configuration = prepare_active_learning_configuration_inplace( + api_key=api_key, + model_id=model_id, + active_learning_configuration=config, + ) + return cls( + api_key=api_key, + configuration=configuration, + cache=cache, + ) + def __init__( self, api_key: str, @@ -178,6 +194,28 @@ def init( task_queue=task_queue, ) + @classmethod + def init_from_config( + cls, + api_key: str, + model_id: str, + cache: BaseCache, + config: Optional[dict], + max_queue_size: int = MAX_REGISTRATION_QUEUE_SIZE, + ) -> "ThreadingActiveLearningMiddleware": + configuration = prepare_active_learning_configuration_inplace( + api_key=api_key, + model_id=model_id, + active_learning_configuration=config, + ) + task_queue = Queue(max_queue_size) + return cls( + api_key=api_key, + configuration=configuration, + cache=cache, + task_queue=task_queue, + ) + def __init__( self, api_key: str, diff --git a/inference/enterprise/workflows/complier/steps_executors/active_learning_middlewares.py b/inference/enterprise/workflows/complier/steps_executors/active_learning_middlewares.py index 98aa10ab0..200b7118d 100644 --- a/inference/enterprise/workflows/complier/steps_executors/active_learning_middlewares.py +++ b/inference/enterprise/workflows/complier/steps_executors/active_learning_middlewares.py @@ -1,4 +1,4 @@ -from typing import Dict, List, Optional +from typing import Dict, List, Optional, Union from fastapi import BackgroundTasks @@ -6,6 +6,10 @@ from inference.core.active_learning.middlewares import ActiveLearningMiddleware from inference.core.cache.base import BaseCache from inference.core.env import DISABLE_PREPROC_AUTO_ORIENT +from inference.enterprise.workflows.entities.steps import ( + DisabledActiveLearningConfiguration, + EnabledActiveLearningConfiguration, +) class WorkflowsActiveLearningMiddleware: @@ -27,6 +31,11 @@ def register( prediction_type: str, active_learning_disabled_for_request: bool, background_tasks: Optional[BackgroundTasks] = None, + active_learning_configuration: Optional[ + Union[ + EnabledActiveLearningConfiguration, DisabledActiveLearningConfiguration + ] + ] = None, ) -> None: model_id = f"{dataset_name}/workflows" if api_key is None or active_learning_disabled_for_request: @@ -38,6 +47,7 @@ def register( predictions=predictions, api_key=api_key, prediction_type=prediction_type, + active_learning_configuration=active_learning_configuration, ) return None background_tasks.add_task( @@ -47,6 +57,7 @@ def register( predictions=predictions, api_key=api_key, prediction_type=prediction_type, + active_learning_configuration=active_learning_configuration, ) def _register( @@ -56,9 +67,18 @@ def _register( predictions: List[dict], api_key: str, prediction_type: str, + active_learning_configuration: Optional[ + Union[ + EnabledActiveLearningConfiguration, DisabledActiveLearningConfiguration + ] + ], ) -> None: try: - self._ensure_middleware_initialised(model_id=model_id, api_key=api_key) + self._ensure_middleware_initialised( + model_id=model_id, + api_key=api_key, + active_learning_configuration=active_learning_configuration, + ) self._middlewares[model_id].register_batch( inference_inputs=images, predictions=predictions, @@ -76,11 +96,24 @@ def _ensure_middleware_initialised( self, model_id: str, api_key: str, + active_learning_configuration: Optional[ + Union[ + EnabledActiveLearningConfiguration, DisabledActiveLearningConfiguration + ] + ], ) -> None: if model_id in self._middlewares: return None - self._middlewares[model_id] = ActiveLearningMiddleware.init( - api_key=api_key, - model_id=model_id, - cache=self._cache, - ) + if active_learning_configuration is not None: + self._middlewares[model_id] = ActiveLearningMiddleware.init_from_config( + api_key=api_key, + model_id=model_id, + cache=self._cache, + config=active_learning_configuration.dict(), + ) + else: + self._middlewares[model_id] = ActiveLearningMiddleware.init( + api_key=api_key, + model_id=model_id, + cache=self._cache, + ) diff --git a/inference/enterprise/workflows/complier/steps_executors/auxiliary.py b/inference/enterprise/workflows/complier/steps_executors/auxiliary.py index 1d6ad1019..d6a791fa7 100644 --- a/inference/enterprise/workflows/complier/steps_executors/auxiliary.py +++ b/inference/enterprise/workflows/complier/steps_executors/auxiliary.py @@ -911,5 +911,6 @@ async def run_active_learning_data_collector( active_learning_disabled_for_request=disable_active_learning, prediction_type=prediction_type, background_tasks=background_tasks, + active_learning_configuration=step.active_learning_configuration, ) return None, outputs_lookup From ecfc67a214762f2eb0690e7520540a6e3fe8a271 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 13:38:25 +0100 Subject: [PATCH 10/18] Fix issue spotted in initial testing --- inference/enterprise/workflows/entities/steps.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/inference/enterprise/workflows/entities/steps.py b/inference/enterprise/workflows/entities/steps.py index 7f1380a40..c0643d0d0 100644 --- a/inference/enterprise/workflows/entities/steps.py +++ b/inference/enterprise/workflows/entities/steps.py @@ -1147,6 +1147,15 @@ def _validate_required_objects_binding(self, value: Any) -> None: class DisabledActiveLearningConfiguration(BaseModel): enabled: bool + @field_validator("enabled") + @classmethod + def ensure_only_false_is_valid(cls, value: Any) -> bool: + if value is not False: + raise ValueError( + "One can only specify enabled=False in `DisabledActiveLearningConfiguration`" + ) + return value + class LimitDefinition(BaseModel): type: Literal["minutely", "hourly", "daily"] @@ -1223,7 +1232,7 @@ class EnabledActiveLearningConfiguration(BaseModel): @field_validator("jpeg_compression_level") @classmethod - def validate_json_compression_level(cls, value: Any): + def validate_json_compression_level(cls, value: Any) -> int: validate_field_has_given_type( field_name="jpeg_compression_level", allowed_types=[int], value=value ) From e66b9bf8de50306e3a169047da43c8e181150afa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 13:48:01 +0100 Subject: [PATCH 11/18] Fix problem with docker build --- docker/config/cpu_http.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/docker/config/cpu_http.py b/docker/config/cpu_http.py index 2de4d1706..862dce75e 100644 --- a/docker/config/cpu_http.py +++ b/docker/config/cpu_http.py @@ -10,8 +10,6 @@ from prometheus_fastapi_instrumentator import Instrumentator from inference.core.env import MAX_ACTIVE_MODELS, ACTIVE_LEARNING_ENABLED, LAMBDA -from inference.enterprise.workflows.complier.steps_executors.active_learning_middlewares import \ - BackgroundTaskWorkflowsActiveLearningMiddleware from inference.models.utils import ROBOFLOW_MODEL_TYPES model_registry = RoboflowModelRegistry(ROBOFLOW_MODEL_TYPES) From 99a7cf049ff3ae586707d9b899011cf5b7daeb91 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 14:21:52 +0100 Subject: [PATCH 12/18] Bring back old build --- docker/publish/deploy_docker_image.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/publish/deploy_docker_image.sh b/docker/publish/deploy_docker_image.sh index e80a800ab..db7b7d47e 100755 --- a/docker/publish/deploy_docker_image.sh +++ b/docker/publish/deploy_docker_image.sh @@ -12,7 +12,7 @@ VERSION=`python inference/core/version.py` REPOSITORY=$1 DOCKERFILE=$2 -docker buildx build --platform linux/amd64 --pull -t $REPOSITORY:$VERSION -f $DOCKERFILE . --push +docker build --pull -t $REPOSITORY:$VERSION -f $DOCKERFILE . check_return_code $? "Docker build" docker tag $REPOSITORY:$VERSION $REPOSITORY:latest From efbae9e9f6cb21509dd481eff2ff675153ee8d2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 14:24:22 +0100 Subject: [PATCH 13/18] Revert change --- docker/publish/deploy_docker_image.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/publish/deploy_docker_image.sh b/docker/publish/deploy_docker_image.sh index db7b7d47e..e80a800ab 100755 --- a/docker/publish/deploy_docker_image.sh +++ b/docker/publish/deploy_docker_image.sh @@ -12,7 +12,7 @@ VERSION=`python inference/core/version.py` REPOSITORY=$1 DOCKERFILE=$2 -docker build --pull -t $REPOSITORY:$VERSION -f $DOCKERFILE . +docker buildx build --platform linux/amd64 --pull -t $REPOSITORY:$VERSION -f $DOCKERFILE . --push check_return_code $? "Docker build" docker tag $REPOSITORY:$VERSION $REPOSITORY:latest From e2b7c92d6842bd4f2e58445de32ae2eeed93ffeb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 14:59:00 +0100 Subject: [PATCH 14/18] Update notebook example --- examples/notebooks/workflows.ipynb | 39 +++++++++++++++--------------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/examples/notebooks/workflows.ipynb b/examples/notebooks/workflows.ipynb index c27c63f0a..6ba7669cd 100644 --- a/examples/notebooks/workflows.ipynb +++ b/examples/notebooks/workflows.ipynb @@ -43,7 +43,7 @@ "Requirement already satisfied: scipy<2.0.0,>=1.10.0 in /usr/local/lib/python3.9/site-packages (from supervision==0.18.0) (1.12.0)\n", "Requirement already satisfied: contourpy>=1.0.1 in /usr/local/lib/python3.9/site-packages (from matplotlib>=3.6.0->supervision==0.18.0) (1.2.0)\n", "Requirement already satisfied: cycler>=0.10 in /usr/local/lib/python3.9/site-packages (from matplotlib>=3.6.0->supervision==0.18.0) (0.12.1)\n", - "Requirement already satisfied: fonttools>=4.22.0 in /usr/local/lib/python3.9/site-packages (from matplotlib>=3.6.0->supervision==0.18.0) (4.47.2)\n", + "Requirement already satisfied: fonttools>=4.22.0 in /usr/local/lib/python3.9/site-packages (from matplotlib>=3.6.0->supervision==0.18.0) (4.48.1)\n", "Requirement already satisfied: kiwisolver>=1.3.1 in /usr/local/lib/python3.9/site-packages (from matplotlib>=3.6.0->supervision==0.18.0) (1.4.5)\n", "Requirement already satisfied: packaging>=20.0 in /usr/local/lib/python3.9/site-packages (from matplotlib>=3.6.0->supervision==0.18.0) (23.2)\n", "Requirement already satisfied: pillow>=8 in /usr/local/lib/python3.9/site-packages (from matplotlib>=3.6.0->supervision==0.18.0) (10.2.0)\n", @@ -81,16 +81,16 @@ "name": "stdout", "output_type": "stream", "text": [ - "--2024-02-06 13:28:44-- https://media.roboflow.com/workflows_examples_images.zip\n", + "--2024-02-14 13:51:16-- https://media.roboflow.com/workflows_examples_images.zip\n", "Resolving media.roboflow.com (media.roboflow.com)... 34.110.133.209\n", "Connecting to media.roboflow.com (media.roboflow.com)|34.110.133.209|:443... connected.\n", "HTTP request sent, awaiting response... 200 OK\n", "Length: 8229834 (7.8M) [application/zip]\n", "Saving to: ‘workflows_examples_images.zip’\n", "\n", - "workflows_examples_ 100%[===================>] 7.85M 18.8MB/s in 0.4s \n", + "workflows_examples_ 100%[===================>] 7.85M 10.7MB/s in 0.7s \n", "\n", - "2024-02-06 13:28:45 (18.8 MB/s) - ‘workflows_examples_images.zip’ saved [8229834/8229834]\n", + "2024-02-14 13:51:17 (10.7 MB/s) - ‘workflows_examples_images.zip’ saved [8229834/8229834]\n", "\n" ] } @@ -223,7 +223,7 @@ { "data": { "text/plain": [ - "" + "" ] }, "execution_count": 7, @@ -363,7 +363,7 @@ "outputs": [ { "data": { - "image/png": "", + "image/png": "", "text/plain": [ "
" ] @@ -791,7 +791,7 @@ }, { "cell_type": "code", - "execution_count": 29, + "execution_count": 36, "id": "6ce81674-4cdc-4074-a39a-c61b1188076b", "metadata": {}, "outputs": [ @@ -807,10 +807,11 @@ } ], "source": [ - "annotator = sv.BoundingBoxAnnotator(thickness=20)\n", - "detections = sv.Detections.from_inference(detection_coco_and_plates)\n", - "plt.imshow(annotator.annotate(multiple_cars_image_2.copy(), detections)[:, :, ::-1])\n", - "plt.show()" + "for predictions, image in zip(detection_coco_and_plates[\"predictions\"], detection_coco_and_plates[\"image\"]):\n", + " annotator = sv.BoundingBoxAnnotator(thickness=20)\n", + " detections = sv.Detections.from_inference({\"predictions\": predictions, \"image\": image})\n", + " plt.imshow(annotator.annotate(multiple_cars_image_2.copy(), detections)[:, :, ::-1])\n", + " plt.show()" ] }, { @@ -826,7 +827,7 @@ }, { "cell_type": "code", - "execution_count": 38, + "execution_count": 37, "id": "27e8f8c5-48d4-47d5-a542-2101112f51ae", "metadata": {}, "outputs": [], @@ -868,7 +869,7 @@ }, { "cell_type": "code", - "execution_count": 39, + "execution_count": 38, "id": "2b3516fe-74eb-4fe7-8ebd-9e3001a7b2d8", "metadata": {}, "outputs": [], @@ -879,7 +880,7 @@ }, { "cell_type": "code", - "execution_count": 40, + "execution_count": 39, "id": "b7e0bc65-f05f-4ea0-8ef7-72b1939192ae", "metadata": {}, "outputs": [ @@ -901,7 +902,7 @@ }, { "cell_type": "code", - "execution_count": 41, + "execution_count": 40, "id": "c189c354-8ec7-47b3-8751-1aae6806b6e4", "metadata": {}, "outputs": [], @@ -914,7 +915,7 @@ }, { "cell_type": "code", - "execution_count": 42, + "execution_count": 41, "id": "4d9fad4a-7c59-42dd-bbc1-ebe6839e6aa7", "metadata": {}, "outputs": [ @@ -948,7 +949,7 @@ }, { "cell_type": "code", - "execution_count": 43, + "execution_count": 42, "id": "2094f57e-676d-4ec1-9d03-7a89301dd783", "metadata": {}, "outputs": [ @@ -970,7 +971,7 @@ }, { "cell_type": "code", - "execution_count": 44, + "execution_count": 43, "id": "1bf96383-9908-4f2f-90f8-e13ca0162174", "metadata": {}, "outputs": [], @@ -983,7 +984,7 @@ }, { "cell_type": "code", - "execution_count": 45, + "execution_count": 44, "id": "546a10ac-b24c-43df-9d46-db24199918e8", "metadata": {}, "outputs": [ From 69c88aa72e198295b42016f6f18bb425079e81ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 15:42:09 +0100 Subject: [PATCH 15/18] Add changed to enable persistence of AL predictions in workflows + tests --- inference/core/active_learning/core.py | 2 +- .../core/active_learning/test_core.py | 30 ++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/inference/core/active_learning/core.py b/inference/core/active_learning/core.py index 35765e5a6..55a26596d 100644 --- a/inference/core/active_learning/core.py +++ b/inference/core/active_learning/core.py @@ -215,5 +215,5 @@ def is_prediction_registration_forbidden( roboflow_image_id is None or persist_predictions is False or prediction.get("is_stub", False) is True - or len(prediction.get("predictions", [])) == 0 + or (len(prediction.get("predictions", [])) == 0 and "top" not in prediction) ) diff --git a/tests/inference/unit_tests/core/active_learning/test_core.py b/tests/inference/unit_tests/core/active_learning/test_core.py index a5f81c4c8..2865252d1 100644 --- a/tests/inference/unit_tests/core/active_learning/test_core.py +++ b/tests/inference/unit_tests/core/active_learning/test_core.py @@ -739,7 +739,7 @@ def test_is_prediction_registration_forbidden_when_prediction_should_be_rejected ) # then - assert result is True + assert result is False def test_is_prediction_registration_forbidden_when_prediction_should_be_registered() -> ( @@ -754,3 +754,31 @@ def test_is_prediction_registration_forbidden_when_prediction_should_be_register # then assert result is False + + +def test_is_prediction_registration_forbidden_when_classification_output_only_with_top_category_provided() -> ( + None +): + # when + result = is_prediction_registration_forbidden( + prediction={"top": "cat"}, + persist_predictions=True, + roboflow_image_id="some+id", + ) + + # then + assert result is False + + +def test_is_prediction_registration_forbidden_when_detection_output_without_predictions_provided() -> ( + None +): + # when + result = is_prediction_registration_forbidden( + prediction={"predictions": []}, + persist_predictions=True, + roboflow_image_id="some+id", + ) + + # then + assert result is True From d60d007f12acf6f00af07f13922fc347ec75fdcd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 16:41:40 +0100 Subject: [PATCH 16/18] Add changes into docs --- inference/enterprise/workflows/README.md | 55 +++++++++++++++++++ .../enterprise/workflows/entities/steps.py | 2 +- 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/inference/enterprise/workflows/README.md b/inference/enterprise/workflows/README.md index 92d868fac..677738cf3 100644 --- a/inference/enterprise/workflows/README.md +++ b/inference/enterprise/workflows/README.md @@ -261,6 +261,7 @@ input parameter * `confidence` - confidence of prediction * `parent_id` - identifier of parent image / associated detection that helps to identify predictions with RoI in case of multi-step pipelines +* `prediction_type` - denoting `classification` model #### `MultiLabelClassificationModel` This step represents inference from multi-label classification model. @@ -281,6 +282,7 @@ input parameter * `predicted_classes` - top classes * `parent_id` - identifier of parent image / associated detection that helps to identify predictions with RoI in case of multi-step pipelines +* `prediction_type` - denoting `classification` model #### `ObjectDetectionModel` This step represents inference from object detection model. @@ -309,6 +311,7 @@ input parameter. Default: `0.3`. * `image` - size of input image, that `predictions` coordinates refers to * `parent_id` - identifier of parent image / associated detection that helps to identify predictions with RoI in case of multi-step pipelines +* `prediction_type` - denoting `object-detection` model #### `KeypointsDetectionModel` This step represents inference from keypoints detection model. @@ -339,6 +342,7 @@ input parameter * `image` - size of input image, that `predictions` coordinates refers to * `parent_id` - identifier of parent image / associated detection that helps to identify predictions with RoI in case of multi-step pipelines +* `prediction_type` - denoting `keypoint-detection` model #### `InstanceSegmentationModel` This step represents inference from instance segmentation model. @@ -370,6 +374,7 @@ input parameter * `image` - size of input image, that `predictions` coordinates refers to * `parent_id` - identifier of parent image / associated detection that helps to identify predictions with RoI in case of multi-step pipelines +* `prediction_type` - denoting `instance-segmentation` model #### `OCRModel` This step represents inference from OCR model. @@ -384,6 +389,7 @@ This step represents inference from OCR model. * `result` - details of predictions * `parent_id` - identifier of parent image / associated detection that helps to identify predictions with RoI in case of multi-step pipelines +* `prediction_type` - denoting `ocr` model #### `Crop` This step produces **dynamic** crops based on detections from detections-based model. @@ -466,6 +472,7 @@ This let user define recursive structure of filters. * `image` - size of input image, that `predictions` coordinates refers to * `parent_id` - identifier of parent image / associated detection that helps to identify predictions with RoI in case of multi-step pipelines +* `prediction_type` - denoting parent model type #### `DetectionOffset` This step is responsible for applying fixed offset on width and height of detections. @@ -484,6 +491,7 @@ This step is responsible for applying fixed offset on width and height of detect * `image` - size of input image, that `predictions` coordinates refers to * `parent_id` - identifier of parent image / associated detection that helps to identify predictions with RoI in case of multi-step pipelines +* `prediction_type` - denoting parent model type #### `AbsoluteStaticCrop` and `RelativeStaticCrop` @@ -596,6 +604,53 @@ of multi-step pipelines (can be `undefined` if all sources of predictions give n objects specified in config are present * `presence_confidence` - for each input image, for each present class - aggregated confidence indicating presence of objects +* `prediction_type` - denoting `object-detection` prediction (as this format is effective even if other detections +models are combined) + +#### `ActiveLearningDataCollector` +Step that is supposed to be a solution for anyone who wants to collect data and predictions that flow through the +`workflows`. The block is build on the foundations of Roboflow Active Learning capabilities implemented in +[`active_learning` module](../../core/active_learning/README.md) - so all the capabilities should be preserved. +There are **very important** considerations regarding collecting data with AL at the `workflows` level and in +scope of specific models. Read `important notes` section to discover nuances. +General use-cases for this block: +* grab data and predictions from single model / ensemble of models +* posting the data in different project that the origin of models used in `workflow` - in particular **one may now use +open models - like `yolov8n-640` and start sampling data to their own project!** +* defining multiple different sampling strategies for different `workflows` (step allows to provide custom config of AL +data collection - so you are not bounded to configuration of AL at the project level - and multiple instances of +configs can co-exist) + +##### Step parameters +* `type`: must be `ActiveLearningDataCollector` (required) +* `name`: must be unique within all steps - used as identifier (required) +* `image`: must be a reference to input of type `InferenceImage` or `crops` output from steps executing cropping ( +`Crop`, `AbsoluteStaticCrop`, `RelativeStaticCrop`) (required) +* `predictions` - selector pointing to outputs of detections models output of the detections model: [`ObjectDetectionModel`, +`KeypointsDetectionModel`, `InstanceSegmentationModel`, `DetectionFilter`, `DetectionsConsensus`] (then use `$steps..predictions`) +or outputs of classification [`ClassificationModel`] (then use `$steps..top`) (required) +* `target_dataset` - name of Roboflow dataset / project to be used as target for collected data (required) +* `target_dataset_api_key` - optional API key to be used for data registration. This may help in a scenario when data +are to be registered cross-workspaces. If not provided - the API key from a request would be used to register data ( +applicable for Universe models predictions to be saved in private workspaces and for models that were trained in the same +workspace (not necessarily within the same project)). +* `disable_active_learning` - boolean flag that can be also reference to input - to arbitrarily disable data collection +for specific request - overrides all AL config. (optional, default: `False`) +* `active_learning_configuration` - optional configuration of Active Learning data sampling in the exact format provided +in [`active_learning` docs](../../core/active_learning/README.md) + +##### Step outputs +No outputs are declared - step is supposed to cause side effect in form of data sampling and registration. + +##### Important notes +* this block is implemented in non-async way - which means that in certain cases it can block event loop causing +parallelization not feasible. This is not the case when running in `inference` HTTP container. At Roboflow +hosted platform - registration cannot be executed as background task - so its duration must be added into expected +latency +* **important :exclamation:** be careful in enabling / disabling AL at the level of steps - remember that when +predicting from each model, `inference` HTTP API tries to get Active Learning config from the project that model +belongs to and register datapoint. To prevent that from happening - model steps can be provided with +`disable_active_learning=True` parameter. Then the only place where AL registration happens is `ActiveLearningDataCollector`. ## Different modes of execution Workflows can be executed in `local` environment, or `remote` environment can be used. `local` means that model steps diff --git a/inference/enterprise/workflows/entities/steps.py b/inference/enterprise/workflows/entities/steps.py index c0643d0d0..f7baebff1 100644 --- a/inference/enterprise/workflows/entities/steps.py +++ b/inference/enterprise/workflows/entities/steps.py @@ -668,7 +668,7 @@ def get_input_names(self) -> Set[str]: return {"predictions", "offset_x", "offset_y"} def get_output_names(self) -> Set[str]: - return {"predictions", "parent_id", "image", "predictions_type"} + return {"predictions", "parent_id", "image", "prediction_type"} def validate_field_selector( self, field_name: str, input_step: GraphNone, index: Optional[int] = None From b227529efee2f637468a9f4a029805d1ace702b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 17:26:10 +0100 Subject: [PATCH 17/18] Add changes to notebook to illustrate AL usage: --- examples/notebooks/workflows.ipynb | 165 ++++++++++++++++++++++++++--- 1 file changed, 149 insertions(+), 16 deletions(-) diff --git a/examples/notebooks/workflows.ipynb b/examples/notebooks/workflows.ipynb index 6ba7669cd..0094409b7 100644 --- a/examples/notebooks/workflows.ipynb +++ b/examples/notebooks/workflows.ipynb @@ -81,16 +81,16 @@ "name": "stdout", "output_type": "stream", "text": [ - "--2024-02-14 13:51:16-- https://media.roboflow.com/workflows_examples_images.zip\n", + "--2024-02-14 16:23:28-- https://media.roboflow.com/workflows_examples_images.zip\n", "Resolving media.roboflow.com (media.roboflow.com)... 34.110.133.209\n", "Connecting to media.roboflow.com (media.roboflow.com)|34.110.133.209|:443... connected.\n", "HTTP request sent, awaiting response... 200 OK\n", "Length: 8229834 (7.8M) [application/zip]\n", "Saving to: ‘workflows_examples_images.zip’\n", "\n", - "workflows_examples_ 100%[===================>] 7.85M 10.7MB/s in 0.7s \n", + "workflows_examples_ 100%[===================>] 7.85M 19.2MB/s in 0.4s \n", "\n", - "2024-02-14 13:51:17 (10.7 MB/s) - ‘workflows_examples_images.zip’ saved [8229834/8229834]\n", + "2024-02-14 16:23:28 (19.2 MB/s) - ‘workflows_examples_images.zip’ saved [8229834/8229834]\n", "\n" ] } @@ -223,7 +223,7 @@ { "data": { "text/plain": [ - "" + "" ] }, "execution_count": 7, @@ -738,7 +738,7 @@ }, { "cell_type": "code", - "execution_count": 26, + "execution_count": 28, "id": "7d7c3db9-c690-4139-8584-2b150594a1a0", "metadata": {}, "outputs": [], @@ -748,7 +748,7 @@ }, { "cell_type": "code", - "execution_count": 27, + "execution_count": 29, "id": "7a08d122-7959-474f-8857-503bbd4281df", "metadata": {}, "outputs": [ @@ -770,7 +770,7 @@ }, { "cell_type": "code", - "execution_count": 28, + "execution_count": 30, "id": "d66a4b8e-8353-4a10-928a-24d00e70f13a", "metadata": {}, "outputs": [], @@ -791,7 +791,7 @@ }, { "cell_type": "code", - "execution_count": 36, + "execution_count": 31, "id": "6ce81674-4cdc-4074-a39a-c61b1188076b", "metadata": {}, "outputs": [ @@ -827,7 +827,7 @@ }, { "cell_type": "code", - "execution_count": 37, + "execution_count": 32, "id": "27e8f8c5-48d4-47d5-a542-2101112f51ae", "metadata": {}, "outputs": [], @@ -869,7 +869,7 @@ }, { "cell_type": "code", - "execution_count": 38, + "execution_count": 33, "id": "2b3516fe-74eb-4fe7-8ebd-9e3001a7b2d8", "metadata": {}, "outputs": [], @@ -880,7 +880,7 @@ }, { "cell_type": "code", - "execution_count": 39, + "execution_count": 34, "id": "b7e0bc65-f05f-4ea0-8ef7-72b1939192ae", "metadata": {}, "outputs": [ @@ -902,7 +902,7 @@ }, { "cell_type": "code", - "execution_count": 40, + "execution_count": 35, "id": "c189c354-8ec7-47b3-8751-1aae6806b6e4", "metadata": {}, "outputs": [], @@ -915,7 +915,7 @@ }, { "cell_type": "code", - "execution_count": 41, + "execution_count": 36, "id": "4d9fad4a-7c59-42dd-bbc1-ebe6839e6aa7", "metadata": {}, "outputs": [ @@ -949,7 +949,7 @@ }, { "cell_type": "code", - "execution_count": 42, + "execution_count": 37, "id": "2094f57e-676d-4ec1-9d03-7a89301dd783", "metadata": {}, "outputs": [ @@ -971,7 +971,7 @@ }, { "cell_type": "code", - "execution_count": 43, + "execution_count": 38, "id": "1bf96383-9908-4f2f-90f8-e13ca0162174", "metadata": {}, "outputs": [], @@ -984,7 +984,7 @@ }, { "cell_type": "code", - "execution_count": 44, + "execution_count": 39, "id": "546a10ac-b24c-43df-9d46-db24199918e8", "metadata": {}, "outputs": [ @@ -1015,6 +1015,139 @@ " plt.imshow(crop[\"value\"][:, :, ::-1])\n", " plt.show()" ] + }, + { + "cell_type": "markdown", + "id": "50f0cadf-906a-4c2b-b572-6e9e9193e98a", + "metadata": {}, + "source": [ + "## Introduce Active Learning block\n", + "\n", + "In this example, we present on how to introduce Active Learning data collection block to the workflow. You would need to have example object-detection project created in Roboflow app ([docs](https://docs.roboflow.com/datasets/create-a-project))." + ] + }, + { + "cell_type": "code", + "execution_count": 40, + "id": "d49fed0c-62a9-480d-a8cc-8015317275e4", + "metadata": {}, + "outputs": [], + "source": [ + "YOUR_PROJECT_NAME = ... # place your project" + ] + }, + { + "cell_type": "code", + "execution_count": 42, + "id": "abb130ea-361b-4bf7-9a7a-132692009dec", + "metadata": {}, + "outputs": [], + "source": [ + "ACTIVE_LEARNING_WORKFLOW = {\n", + " \"specification\": {\n", + " \"version\": \"1.0\",\n", + " \"inputs\": [\n", + " { \"type\": \"InferenceImage\", \"name\": \"image\" },\n", + " ],\n", + " \"steps\": [\n", + " {\n", + " \"type\": \"ObjectDetectionModel\", # definition of object detection model - generic detection based on COCO classes\n", + " \"name\": \"general_detection\",\n", + " \"image\": \"$inputs.image\", # linking input image into detection model\n", + " \"model_id\": \"yolov8n-640\", # pointing model to be used\n", + " \"disable_active_learning\": True, # we are disabling Active Learning for model \n", + " # (it is advised to do so, when ActiveLearningDataCollector is in use)\n", + " },\n", + " {\n", + " \"type\": \"ActiveLearningDataCollector\", # definition of data collector block\n", + " \"name\": \"active_learning_block\",\n", + " \"image\": \"$inputs.image\", # we need to point image that is reference point for predictions\n", + " \"predictions\": \"$steps.general_detection.predictions\", # we need to point `predictions` output from detection model\n", + " \"target_dataset\": YOUR_PROJECT_NAME,\n", + " \"active_learning_configuration\": { # this is standard AL data collection config - see: https://inference.roboflow.com/enterprise/active-learning/active_learning/\n", + " \"enabled\": True,\n", + " \"persist_predictions\": True,\n", + " \"sampling_strategies\": [\n", + " {\n", + " \"type\": \"random\",\n", + " \"name\": \"a\",\n", + " \"traffic_percentage\": 1.0,\n", + " \"limits\": [{\"type\": \"daily\", \"value\": 100}],\n", + " },\n", + " ],\n", + " \"batching_strategy\": {\n", + " \"batches_name_prefix\": \"al_in_workflows\",\n", + " \"recreation_interval\": \"daily\"\n", + " }\n", + " }\n", + " }\n", + " ],\n", + " \"outputs\": [\n", + " { \"type\": \"JsonField\", \"name\": \"predictions\", \"selector\": \"$steps.general_detection.predictions\" },\n", + " ] \n", + " }\n", + "}" + ] + }, + { + "cell_type": "code", + "execution_count": 43, + "id": "b41e5216-0759-426c-9e5e-cc668b955343", + "metadata": {}, + "outputs": [], + "source": [ + "al_results = CLIENT.infer_from_workflow(\n", + " specification=ACTIVE_LEARNING_WORKFLOW[\"specification\"],\n", + " images={\"image\": multiple_cars_image_2},\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 44, + "id": "20badcd1-7650-4ec3-b6bc-67c25fb74664", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'predictions': [[{'x': 2363.5,\n", + " 'y': 3634.0,\n", + " 'width': 1283.0,\n", + " 'height': 840.0,\n", + " 'confidence': 0.8711808919906616,\n", + " 'class': 'car',\n", + " 'class_id': 2,\n", + " 'detection_id': '8c5602f6-1c76-4d92-a729-db3a38cc9061',\n", + " 'parent_id': '$inputs.image'},\n", + " {'x': 3205.5,\n", + " 'y': 3682.0,\n", + " 'width': 493.0,\n", + " 'height': 746.0,\n", + " 'confidence': 0.7880513668060303,\n", + " 'class': 'car',\n", + " 'class_id': 2,\n", + " 'detection_id': 'ef9482d0-de3d-425e-a406-0a2018c03166',\n", + " 'parent_id': '$inputs.image'},\n", + " {'x': 1026.5,\n", + " 'y': 3615.0,\n", + " 'width': 1197.0,\n", + " 'height': 914.0,\n", + " 'confidence': 0.7465572953224182,\n", + " 'class': 'car',\n", + " 'class_id': 2,\n", + " 'detection_id': '7eaeefd0-1b31-4346-80c0-713f6fd7f44c',\n", + " 'parent_id': '$inputs.image'}]]}" + ] + }, + "execution_count": 44, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "al_results" + ] } ], "metadata": { From 85e2eadf0d0a32c53996b17c24fda076491c746b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pawe=C5=82=20P=C4=99czek?= Date: Wed, 14 Feb 2024 17:30:50 +0100 Subject: [PATCH 18/18] Update dosc --- inference/enterprise/workflows/README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/inference/enterprise/workflows/README.md b/inference/enterprise/workflows/README.md index 677738cf3..c425fa268 100644 --- a/inference/enterprise/workflows/README.md +++ b/inference/enterprise/workflows/README.md @@ -651,6 +651,9 @@ latency predicting from each model, `inference` HTTP API tries to get Active Learning config from the project that model belongs to and register datapoint. To prevent that from happening - model steps can be provided with `disable_active_learning=True` parameter. Then the only place where AL registration happens is `ActiveLearningDataCollector`. +* **important :exclamation:** be careful with names of sampling strategies if you define Active Learning configuration - +you should keep them unique not only within a single config, but globally in project - otherwise limits accounting may +not work well ## Different modes of execution Workflows can be executed in `local` environment, or `remote` environment can be used. `local` means that model steps