Skip to content

Commit

Permalink
WIP replacing temporal with airflow
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Dec 19, 2024
1 parent fda14f9 commit ce88b9a
Show file tree
Hide file tree
Showing 23 changed files with 86 additions and 662 deletions.
6 changes: 3 additions & 3 deletions oonipipeline/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,24 @@ dependencies = [
"tqdm ~= 4.64",
"lz4 ~= 4.0",
"requests ~= 2.27",
"cryptography ~= 38.0.3",
"cryptography ~= 41.0.0",
"clickhouse-driver ~= 0.2",
"click ~= 8.0.0",
"lxml ~= 4.9",
"maxminddb ~= 2.2",
"orjson ~= 3.8",
"mashumaro ~= 3.0",
"pyOpenSSL ~= 22.1",
"fastapi ~= 0.108.0",
"tabulate ~= 0.9.0",
"warcio ~= 1.7.4",
"msgpack ~= 1.0.4",
"click-loglevel ~= 0.5.0",
"temporalio ~= 1.7.0",
"temporalio[opentelemetry] ~= 1.7.0",
"opentelemetry-exporter-otlp-proto-grpc ~= 1.18.0",
"opentelemetry-exporter-otlp-proto-grpc ~= 1.29.0",
"uvicorn ~= 0.25.0",
"pydantic-settings ~= 2.4.0",
"apache-airflow == 2.10.4"
]

[project.optional-dependencies]
Expand Down
6 changes: 3 additions & 3 deletions oonipipeline/src/oonipipeline/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,17 @@
list_partitions_to_delete,
list_duplicates_in_buckets,
)
from oonipipeline.temporal.client_operations import (
from oonipipeline.tasks.client_operations import (
TemporalConfig,
get_status,
temporal_connect,
)
from oonipipeline.temporal.schedules import (
from oonipipeline.tasks.schedules import (
clear_all_schedules,
schedule_all,
schedule_backfill,
)
from oonipipeline.temporal.workers import start_workers
from oonipipeline.tasks.workers import start_workers

import click
from click_loglevel import LogLevel
Expand Down
65 changes: 65 additions & 0 deletions oonipipeline/src/oonipipeline/dags/dags.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.models import Variable, Param
from oonipipeline.tasks.observations import (
MakeObservationsParams,
make_observations,
)
from oonipipeline.tasks.analysis import (
MakeAnalysisParams,
make_analysis_in_a_day,
)

with DAG(
dag_id="batch_measurement_processing",
params={
"probe_cc": Param([], type="array"),
"test_name": Param([], type="array"),
},
start_date=datetime.datetime(2012, 12, 4),
schedule="@daily",
) as dag:
start_day = "{{ ds }}"
op_make_observations = PythonOperator(
task_id="make_observations",
python_callable=make_observations,
op_args=[
MakeObservationsParams(
probe_cc=dag.params["probe_cc"],
test_name=dag.params["test_name"],
clickhouse=Variable.get("clickhouse_url", default_var=""),
data_dir=Variable.get("data_dir", default_var=""),
fast_fail=False,
bucket_date=start_day,
)
],
)

op_make_analysis = PythonOperator(
task_id="make_analysis",
python_callable=make_analysis_in_a_day,
op_args=[
MakeAnalysisParams(
probe_cc=dag.params["probe_cc"],
test_name=dag.params["test_name"],
day=start_day,
)
],
)

op_make_observations >> op_make_analysis

# dag.log.info(
# f"finished make_observations for bucket_date={start_day} in "
# f"{total_t.pretty} speed: {obs_res['mb_per_sec']}MB/s ({obs_res['measurement_per_sec']}msmt/s)"
# )

# return {
# "measurement_count": obs_res["measurement_count"],
# "size": obs_res["total_size"],
# "mb_per_sec": obs_res["mb_per_sec"],
# "bucket_date": params.bucket_date,
# "measurement_per_sec": obs_res["measurement_per_sec"],
# }
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from oonidata.models.nettests import SupportedDataformats
from oonipipeline.db.connections import ClickhouseConnection
from oonipipeline.netinfo import NetinfoDB
from oonipipeline.temporal.activities.common import process_pool_executor, update_assets
from oonipipeline.tasks.common import process_pool_executor, update_assets
from oonipipeline.settings import config
from opentelemetry import trace

Expand Down
141 changes: 0 additions & 141 deletions oonipipeline/src/oonipipeline/temporal/client_operations.py

This file was deleted.

37 changes: 0 additions & 37 deletions oonipipeline/src/oonipipeline/temporal/common.py

This file was deleted.

Loading

0 comments on commit ce88b9a

Please sign in to comment.