Skip to content

Commit

Permalink
Replace observation_id with observation_idx
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Sep 10, 2024
1 parent ff1fc86 commit 83ef335
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 37 deletions.
6 changes: 0 additions & 6 deletions oonidata/src/oonidata/models/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,3 @@ class TableModelProtocol(Protocol):

probe_meta: Any
measurement_meta: Any


@dataclass
class ProcessingMeta:
processing_start_time: datetime
processing_end_time: Optional[datetime] = None
32 changes: 23 additions & 9 deletions oonidata/src/oonidata/models/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
Tuple,
)

from oonidata.models.base import table_model, ProcessingMeta
from oonidata.models.base import table_model
from oonidata.models.dataformats import Failure


Expand Down Expand Up @@ -181,15 +181,19 @@ class TCPObservation:

@table_model(
table_name="obs_web_ctrl",
table_index=("measurement_uid", "observation_id", "measurement_start_time"),
table_index=(
"measurement_start_time",
"hostname",
"measurement_uid",
"observation_idx",
),
)
@dataclass
class WebControlObservation:
measurement_meta: MeasurementMeta
processing_meta: ProcessingMeta

hostname: str
observation_id: str = ""
observation_idx: int = 0

created_at: Optional[datetime] = None

Expand Down Expand Up @@ -220,16 +224,22 @@ class WebControlObservation:

@table_model(
table_name="obs_web",
table_index=("measurement_uid", "observation_id", "measurement_start_time"),
table_index=(
"measurement_start_time",
"hostname",
"probe_cc",
"probe_asn",
"measurement_uid",
"observation_idx",
),
)
@dataclass
class WebObservation:
measurement_meta: MeasurementMeta
probe_meta: ProbeMeta
processing_meta: ProcessingMeta

# These fields are added by the processor
observation_id: str = ""
observation_idx: int = 0
created_at: Optional[datetime] = None
processing_time: Optional[float] = None

Expand Down Expand Up @@ -338,14 +348,18 @@ class WebObservation:

@table_model(
table_name="obs_http_middlebox",
table_index=("measurement_uid", "measurement_start_time"),
table_index=(
"measurement_start_time",
"measurement_uid",
"observation_idx",
),
)
@dataclass
class HTTPMiddleboxObservation:
measurement_meta: MeasurementMeta
probe_meta: ProbeMeta

observation_id: str = ""
observation_idx: int = 0

created_at: Optional[datetime] = None

Expand Down
2 changes: 0 additions & 2 deletions oonipipeline/src/oonipipeline/db/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ def _consume_rows(
d.update(d.pop("probe_meta"))
if "measurement_meta" in d:
d.update(d.pop("measurement_meta"))
if "processing_meta" in d:
d.update(d.pop("processing_meta"))

# TODO(art): this custom_remap should not be here
if "loni_list" in d:
Expand Down
7 changes: 1 addition & 6 deletions oonipipeline/src/oonipipeline/db/create_tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
from dataclasses import Field, fields
import typing

from oonidata.models.base import TableModelProtocol, ProcessingMeta
from oonidata.models.base import TableModelProtocol
from oonidata.models.experiment_result import (
ExperimentResult,
MeasurementExperimentResult,
Expand Down Expand Up @@ -129,11 +129,6 @@ def iter_table_fields(
type_str = typing_to_clickhouse(f.type)
yield f, type_str
continue
if f.type == ProcessingMeta:
for f in fields(ProcessingMeta):
type_str = typing_to_clickhouse(f.type)
yield f, type_str
continue

try:
type_str = typing_to_clickhouse(f.type)
Expand Down
23 changes: 12 additions & 11 deletions oonipipeline/src/oonipipeline/transforms/measurement_transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
)
from collections import defaultdict

from oonidata.models.base import ProcessingMeta
from oonidata.models.dataformats import (
DNSAnswer,
DNSQuery,
Expand Down Expand Up @@ -502,6 +501,7 @@ def make_web_observation(
msmt_meta: MeasurementMeta,
probe_meta: ProbeMeta,
netinfodb: NetinfoDB,
observation_idx: int = 0,
dns_o: Optional[DNSObservation] = None,
tcp_o: Optional[TCPObservation] = None,
tls_o: Optional[TLSObservation] = None,
Expand All @@ -518,9 +518,8 @@ def make_web_observation(
probe_analysis=probe_analysis,
measurement_meta=msmt_meta,
probe_meta=probe_meta,
processing_meta=ProcessingMeta(
processing_start_time=datetime.now(timezone.utc)
),
observation_idx=observation_idx,
created_at=datetime.now(timezone.utc),
)
dns_ip = None
if dns_o and dns_o.answer:
Expand Down Expand Up @@ -565,7 +564,6 @@ def make_web_observation(
maybe_set_web_fields(
src_obs=http_o, prefix="http_", web_obs=web_obs, field_names=WEB_OBS_FIELDS
)
web_obs.processing_meta.processing_end_time = datetime.now(timezone.utc)
return web_obs


Expand Down Expand Up @@ -753,6 +751,7 @@ def __init__(
msmt=measurement, bucket_date=bucket_date
)
self.probe_meta = make_probe_meta(msmt=measurement, netinfodb=netinfodb)
self.observation_idx = 1

def make_http_observations(
self,
Expand Down Expand Up @@ -903,6 +902,7 @@ def consume_web_observations(
http_o=http_o,
target_id=target_id,
probe_analysis=probe_analysis,
observation_idx=self.observation_idx,
)
)
if tcp_o:
Expand All @@ -911,6 +911,7 @@ def consume_web_observations(
tls_observations.remove(tls_o)
if http_o:
http_observations.remove(http_o)
self.observation_idx += 1

for tcp_o in tcp_observations:
_, tls_o, http_o = find_relevant_observations(
Expand All @@ -933,8 +934,10 @@ def consume_web_observations(
http_o=http_o,
target_id=target_id,
probe_analysis=probe_analysis,
observation_idx=self.observation_idx,
)
)
self.observation_idx += 1

for tls_o in tls_observations:
_, _, http_o = find_relevant_observations(
Expand All @@ -953,8 +956,10 @@ def consume_web_observations(
http_o=http_o,
target_id=target_id,
probe_analysis=probe_analysis,
observation_idx=self.observation_idx,
)
)
self.observation_idx += 1

for http_o in http_observations:
web_obs_list.append(
Expand All @@ -965,14 +970,10 @@ def consume_web_observations(
http_o=http_o,
target_id=target_id,
probe_analysis=probe_analysis,
observation_idx=self.observation_idx,
)
)

for idx, obs in enumerate(web_obs_list):
obs.observation_id = f"{obs.measurement_meta.measurement_uid}_{idx}"
obs.created_at = datetime.now(timezone.utc).replace(
microsecond=0, tzinfo=None
)
self.observation_idx += 1

return web_obs_list

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def make_observations(
) -> Tuple[List[HTTPMiddleboxObservation]]:
mb_obs = HTTPMiddleboxObservation(
hfm_success=True,
observation_id=f"{msmt.measurement_uid}_0",
observation_idx=1,
created_at=datetime.now(timezone.utc).replace(microsecond=0, tzinfo=None),
measurement_meta=self.measurement_meta,
probe_meta=self.probe_meta,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def make_observations(
) -> Tuple[List[HTTPMiddleboxObservation]]:
mb_obs = HTTPMiddleboxObservation(
hirl_success=True,
observation_id=f"{msmt.measurement_uid}_0",
observation_idx=1,
created_at=datetime.now(timezone.utc).replace(microsecond=0, tzinfo=None),
measurement_meta=self.measurement_meta,
probe_meta=self.probe_meta,
Expand Down
52 changes: 51 additions & 1 deletion oonipipeline/src/oonipipeline/transforms/observations.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,26 @@
from typing import List, Tuple, Union
from typing import List, Optional, Tuple, Union, overload
from datetime import datetime, timezone

from oonidata.models.observations import (
HTTPMiddleboxObservation,
WebControlObservation,
WebObservation,
)

from oonidata.models.nettests import (
Signal,
SupportedDataformats,
Whatsapp,
Telegram,
StunReachability,
Tor,
FacebookMessenger,
HTTPHeaderFieldManipulation,
UrlGetter,
WebConnectivity,
HTTPInvalidRequestLine,
)

from .nettests.dnscheck import DNSCheckTransformer
from .nettests.http_header_field_manipulation import (
HTTPHeaderFieldManipulationTransformer,
Expand Down Expand Up @@ -46,6 +62,40 @@
TypeHTTPMiddleboxObservations = Tuple[List[HTTPMiddleboxObservation]]


@overload
def measurement_to_observations(
msmt: Union[HTTPHeaderFieldManipulation, HTTPInvalidRequestLine],
netinfodb: NetinfoDB,
bucket_date: str,
) -> TypeHTTPMiddleboxObservations: ...


@overload
def measurement_to_observations(
msmt: WebConnectivity,
netinfodb: NetinfoDB,
bucket_date: str,
) -> TypeWebConnectivityObservations: ...


@overload
def measurement_to_observations(
msmt: Union[
Signal, Whatsapp, Telegram, StunReachability, Tor, FacebookMessenger, UrlGetter
],
netinfodb: NetinfoDB,
bucket_date: str,
) -> TypeWebObservations: ...


@overload
def measurement_to_observations(
msmt: SupportedDataformats,
netinfodb: NetinfoDB,
bucket_date: str,
) -> TypeWebObservations: ...


def measurement_to_observations(
msmt,
netinfodb: NetinfoDB,
Expand Down

0 comments on commit 83ef335

Please sign in to comment.