Skip to content

Commit

Permalink
Merge pull request #476 from roboflow/471-usage-tracking
Browse files Browse the repository at this point in the history
Usage Tracking
  • Loading branch information
grzegorz-roboflow authored Jul 18, 2024
2 parents 36bb5de + d022f8a commit a3b3d50
Show file tree
Hide file tree
Showing 18 changed files with 1,329 additions and 4 deletions.
1 change: 1 addition & 0 deletions inference/core/interfaces/camera/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class VideoFrame:
image: np.ndarray
frame_id: FrameID
frame_timestamp: FrameTimestamp
fps: float = 0
source_id: Optional[int] = None


Expand Down
3 changes: 3 additions & 0 deletions inference/core/interfaces/camera/video_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,7 @@ def _consume_stream_frame(
buffer=buffer,
decoding_pace_monitor=self._decoding_pace_monitor,
source_id=source_id,
fps=declared_source_fps,
)
if self._buffer_filling_strategy in DROP_OLDEST_STRATEGIES:
return self._process_stream_frame_dropping_oldest(
Expand Down Expand Up @@ -1082,6 +1083,7 @@ def decode_video_frame_to_buffer(
buffer: Queue,
decoding_pace_monitor: sv.FPSMonitor,
source_id: Optional[int],
fps: float = 0,
) -> bool:
success, image = video.retrieve()
if not success:
Expand All @@ -1091,6 +1093,7 @@ def decode_video_frame_to_buffer(
image=image,
frame_id=frame_id,
frame_timestamp=frame_timestamp,
fps=fps,
source_id=source_id,
)
buffer.put(video_frame)
Expand Down
12 changes: 12 additions & 0 deletions inference/core/interfaces/http/http_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from fastapi.responses import JSONResponse, RedirectResponse, Response
from fastapi.staticfiles import StaticFiles
from fastapi_cprofile.profiler import CProfileMiddleware
from starlette.middleware.base import BaseHTTPMiddleware

from inference.core import logger
from inference.core.cache import cache
Expand Down Expand Up @@ -164,6 +165,7 @@
discover_blocks_connections,
)
from inference.models.aliases import resolve_roboflow_model_alias
from inference.usage_tracking.collector import usage_collector

if LAMBDA:
from inference.core.usage import trackUsage
Expand Down Expand Up @@ -346,6 +348,14 @@ async def wrapped_route(*args, **kwargs):
return wrapped_route


class LambdaMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request, call_next):
response = await call_next(request)
logger.info("Lambda is terminating, handle unsent usage payloads.")
await usage_collector.async_push_usage_payloads()
return response


class HttpInterface(BaseInterface):
"""Roboflow defined HTTP interface for a general-purpose inference server.
Expand Down Expand Up @@ -393,6 +403,8 @@ def __init__(
app.add_middleware(
ASGIMiddleware, host="https://app.metlo.com", api_key=METLO_KEY
)
if LAMBDA:
app.add_middleware(LambdaMiddleware)

if len(ALLOW_ORIGINS) > 0:
app.add_middleware(
Expand Down
3 changes: 3 additions & 0 deletions inference/core/interfaces/stream/inference_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
from inference.core.workflows.core_steps.common.entities import StepExecutionMode
from inference.models.aliases import resolve_roboflow_model_alias
from inference.models.utils import ROBOFLOW_MODEL_TYPES, get_model
from inference.usage_tracking.collector import usage_collector

INFERENCE_PIPELINE_CONTEXT = "inference_pipeline"
SOURCE_CONNECTION_ATTEMPT_FAILED_EVENT = "SOURCE_CONNECTION_ATTEMPT_FAILED"
Expand Down Expand Up @@ -562,6 +563,8 @@ def init_with_workflow(
execution_engine = ExecutionEngine.init(
workflow_definition=workflow_specification,
init_parameters=workflow_init_parameters,
api_key=api_key,
workflow_id=workflow_id,
)
workflow_runner = WorkflowRunner()
on_video_frame = partial(
Expand Down
6 changes: 5 additions & 1 deletion inference/core/interfaces/stream/model_handlers/workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,13 @@ def run_workflow(
self._event_loop = event_loop
if workflows_parameters is None:
workflows_parameters = {}
# TODO: pass fps reflecting each stream to workflows_parameters
fps = video_frames[0].fps
workflows_parameters[image_input_name] = [
video_frame.image for video_frame in video_frames
]
return execution_engine.run(
runtime_parameters=workflows_parameters, event_loop=self._event_loop
runtime_parameters=workflows_parameters,
event_loop=self._event_loop,
fps=fps,
)
2 changes: 1 addition & 1 deletion inference/core/version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "0.14.1"
__version__ = "0.15.0"


if __name__ == "__main__":
Expand Down
2 changes: 2 additions & 0 deletions inference/core/workflows/execution_engine/compiler/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ def compile_workflow(
dump_execution_graph(execution_graph=execution_graph)
return CompiledWorkflow(
workflow_definition=parsed_workflow_definition,
workflow_json=workflow_definition,
init_parameters=init_parameters,
execution_graph=execution_graph,
steps=steps_by_name,
input_substitutions=input_substitutions,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ class CompiledWorkflow:
execution_graph: nx.DiGraph
steps: Dict[str, InitialisedStep]
input_substitutions: List[InputSubstitution]
workflow_json: Dict[str, Any]
init_parameters: Dict[str, Any]


class NodeCategory(Enum):
Expand Down
20 changes: 18 additions & 2 deletions inference/core/workflows/execution_engine/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from asyncio import AbstractEventLoop
from typing import Any, Dict, List, Optional

from inference.core.env import API_KEY
from inference.core.workflows.execution_engine.compiler.core import compile_workflow
from inference.core.workflows.execution_engine.compiler.entities import CompiledWorkflow
from inference.core.workflows.execution_engine.executor.core import run_workflow
Expand All @@ -22,7 +23,11 @@ def init(
init_parameters: Optional[Dict[str, Any]] = None,
max_concurrent_steps: int = 1,
prevent_local_images_loading: bool = False,
api_key: Optional[str] = None,
workflow_id: Optional[str] = None,
) -> "ExecutionEngine":
if api_key is None:
api_key = API_KEY
if init_parameters is None:
init_parameters = {}
compiled_workflow = compile_workflow(
Expand All @@ -33,34 +38,43 @@ def init(
compiled_workflow=compiled_workflow,
max_concurrent_steps=max_concurrent_steps,
prevent_local_images_loading=prevent_local_images_loading,
api_key=api_key,
workflow_id=workflow_id,
)

def __init__(
self,
compiled_workflow: CompiledWorkflow,
max_concurrent_steps: int,
prevent_local_images_loading: bool,
api_key: Optional[str] = None,
workflow_id: Optional[str] = None,
):
self._compiled_workflow = compiled_workflow
self._max_concurrent_steps = max_concurrent_steps
self._prevent_local_images_loading = prevent_local_images_loading
self._api_key = api_key
self._workflow_id = workflow_id

def run(
self,
runtime_parameters: Dict[str, Any],
event_loop: Optional[AbstractEventLoop] = None,
fps: float = 0,
) -> List[Dict[str, Any]]:
if event_loop is None:
try:
event_loop = asyncio.get_event_loop()
except:
event_loop = asyncio.new_event_loop()
return event_loop.run_until_complete(
self.run_async(runtime_parameters=runtime_parameters)
self.run_async(runtime_parameters=runtime_parameters, fps=fps)
)

async def run_async(
self, runtime_parameters: Dict[str, Any]
self,
runtime_parameters: Dict[str, Any],
fps: float = 0,
) -> List[Dict[str, Any]]:
runtime_parameters = assembly_runtime_parameters(
runtime_parameters=runtime_parameters,
Expand All @@ -75,4 +89,6 @@ async def run_async(
workflow=self._compiled_workflow,
runtime_parameters=runtime_parameters,
max_concurrent_steps=self._max_concurrent_steps,
usage_fps=fps,
usage_workflow_id=self._workflow_id,
)
2 changes: 2 additions & 0 deletions inference/core/workflows/execution_engine/executor/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
construct_workflow_output,
)
from inference.core.workflows.prototypes.block import WorkflowBlock
from inference.usage_tracking.collector import usage_collector
from inference_sdk.http.utils.iterables import make_batches


@usage_collector
async def run_workflow(
workflow: CompiledWorkflow,
runtime_parameters: Dict[str, Any],
Expand Down
5 changes: 5 additions & 0 deletions inference/usage_tracking/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
"""
Inference utilizes Roboflow's cloud services and requires telemetry during deployment.
Customers with an offline deployment can turn the telemetry off by setting TELEMETRY_OPT_OUT environment variable to True.
For more information please consult our licensing page [roboflow.com/licensing] or contact sales [roboflow.com/sales].
"""
Loading

0 comments on commit a3b3d50

Please sign in to comment.