Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(oonimeasurements): redesign measurements service #895

Draft
wants to merge 9 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions ooniapi/common/src/common/clickhouse.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from clickhouse_sqlalchemy import get_declarative_base


Base = get_declarative_base()
1 change: 1 addition & 0 deletions ooniapi/common/src/common/routers.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import date, datetime
from typing import Union
from pydantic import BaseModel as PydandicBaseModel
from pydantic import ConfigDict

Expand Down
1 change: 1 addition & 0 deletions ooniapi/services/oonimeasurements/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ dependencies = [
"fastapi ~= 0.108.0",
"psycopg2 ~= 2.9.5",
"clickhouse-driver ~= 0.2.6",
"clickhouse-sqlalchemy ~= 0.3.2",
"sqlalchemy ~= 2.0.27",
"ujson ~= 5.9.0",
"urllib3 ~= 2.1.0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
from .common.config import Settings
from .common.dependencies import get_settings


def get_clickhouse_session(settings: Annotated[Settings, Depends(get_settings)]):
db = Clickhouse.from_url(settings.clickhouse_url)
try:
yield db
finally:
finally:
db.disconnect()
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@

from prometheus_fastapi_instrumentator import Instrumentator

from .routers import aggregation, measurements
from .routers.v1 import aggregation
from .routers.v1 import measurements

from .dependencies import get_clickhouse_session
from .common.dependencies import get_settings
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
"""
Aggregation API
The routes are mounted under /api
"""

from datetime import datetime, timedelta, date
from datetime import datetime, timedelta, date, timezone
from typing import List, Any, Dict, Optional, Union
import logging

from fastapi import APIRouter, Depends, Query, HTTPException
from fastapi.responses import Response
from pydantic import BaseModel
from typing_extensions import Annotated

from clickhouse_driver import Client as ClickhouseClient
Expand All @@ -20,8 +18,8 @@

from oonimeasurements.common.clickhouse_utils import query_click, query_click_one_row
from oonimeasurements.common.utils import jerror, commasplit, convert_to_csv
from ..dependencies import get_clickhouse_session

from ...dependencies import get_clickhouse_session
from ...common.routers import BaseModel

router = APIRouter()

Expand Down Expand Up @@ -116,7 +114,7 @@ class AggregationResult(BaseModel):
failure_count: int
ok_count: int
measurement_count: int
measurement_start_day: Optional[date] = None
measurement_start_day: Optional[str] = None
blocking_type: Optional[str] = None
category_code: Optional[str] = None
domain: Optional[str] = None
Expand All @@ -134,8 +132,9 @@ class MeasurementAggregation(BaseModel):


@router.get(
"/v1/aggregation",
response_model_exclude_none=True
"/v1/aggregation",
response_model_exclude_none=True,
response_model=MeasurementAggregation,
)
async def get_measurements(
response: Response,
Expand Down Expand Up @@ -247,7 +246,9 @@ async def get_measurements(
int(i[2:]) if i.startswith("AS") else i for i in commasplit(probe_asn)
]
except ValueError:
raise HTTPException(status_code=400, detail="Invalid ASN value in parameter probe_asn")
raise HTTPException(
status_code=400, detail="Invalid ASN value in parameter probe_asn"
)

probe_cc_s = []
if probe_cc:
Expand Down Expand Up @@ -342,12 +343,16 @@ async def get_measurements(
group_by: List = []
try:
if axis_x == "measurement_start_day":
group_by_date(since, until, time_grain, cols, colnames, group_by)
time_grain = group_by_date(
since, until, time_grain, cols, colnames, group_by
)
elif axis_x:
add_axis(axis_x, cols, colnames, group_by)

if axis_y == "measurement_start_day":
group_by_date(since, until, time_grain, cols, colnames, group_by)
time_grain = group_by_date(
since, until, time_grain, cols, colnames, group_by
)
elif axis_y:
add_axis(axis_y, cols, colnames, group_by)

Expand All @@ -372,7 +377,17 @@ async def get_measurements(

try:
if dimension_cnt > 0:
r: Any = list(query_click(db, query, query_params, query_prio=4))
str_format = "%Y-%m-%d"
if time_grain == "hour":
str_format = "%Y-%m-%dT%H:%M:%SZ"
r: Any = []
for row in query_click(db, query, query_params, query_prio=4):
## Handle the difference in formatting between hourly and daily measurement_start_day
if "measurement_start_day" in row:
row["measurement_start_day"] = row[
"measurement_start_day"
].strftime(str_format)
r.append(row)
else:
r = query_click_one_row(db, query, query_params, query_prio=4)

Expand Down Expand Up @@ -410,7 +425,8 @@ async def get_measurements(
elapsed_seconds=pq.elapsed,
),
result=r,
).model_dump(exclude_none=True)
)

except Exception as e:
print(e)
raise HTTPException(status_code=400, detail=str(e))
Loading
Loading