diff --git a/detector/Dockerfile b/detector/Dockerfile new file mode 100644 index 00000000..4e65d68c --- /dev/null +++ b/detector/Dockerfile @@ -0,0 +1,21 @@ + +FROM debian:bullseye +ENV PYTHONUNBUFFERED 1 +# ENV PYTHONPATH /opt/detector +ENV DEBIAN_FRONTEND noninteractive + +RUN mkdir /scripts +COPY build_runner.sh /scripts +# COPY debian/ooni_download_geoip.py /scripts +# COPY api.conf.example /scripts +WORKDIR /scripts + +# Run runner setup +RUN ./build_runner.sh + +# # Download geoip files +# RUN ./ooni_download_geoip.py + +RUN rm -rf /scripts + +WORKDIR /opt/detector diff --git a/detector/build_runner.sh b/detector/build_runner.sh new file mode 100755 index 00000000..aa4b9e08 --- /dev/null +++ b/detector/build_runner.sh @@ -0,0 +1,46 @@ +#!/bin/bash +# +# WARNING: run only in a dedicated container +# Prepares a container to run the detector +# Called from spawnrunner or docker +# +set -eu +export DEBIAN_FRONTEND=noninteractive + +#echo 'deb http://deb.debian.org/debian bullseye-backports main' \ +# > /etc/apt/sources.list.d/backports.list + +# Install ca-certificates and gnupg first +apt-get update +apt-get install --no-install-recommends -y ca-certificates gnupg locales apt-transport-https dirmngr +locale-gen en_US.UTF-8 + +# Set up OONI archive +#echo 'deb http://deb-ci.ooni.org unstable main' \ +# > /etc/apt/sources.list.d/ooni.list +#apt-key adv --keyserver hkp://keyserver.ubuntu.com \ +# --recv-keys "B5A08F01796E7F521861B449372D1FF271F2DD50" + +apt-get update -q +#apt-get dist-upgrade +# Keep this in sync with debian/control +# hint: grep debdeps **/*.py +apt-get install --no-install-recommends -qy \ + python3-clickhouse-driver \ + python3-feedgenerator \ + python3-geoip2 \ + python3-mock \ + python3-numpy \ + python3-pandas \ + python3-pytest \ + python3-pytest-cov \ + python3-pytest-mock \ + python3-setuptools \ + python3-statsd \ + python3-systemd \ + python3-ujson +apt-get autoremove -y +apt-get clean +rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* + +mkdir -p /etc/ooni/ diff --git a/detector/debian/changelog b/detector/debian/changelog index fa2ee243..90956e4f 100644 --- a/detector/debian/changelog +++ b/detector/debian/changelog @@ -1,3 +1,9 @@ +detector (0.3) unstable; urgency=medium + + * Implement event detector for social media using Clickhouse + + -- Federico Ceratto Fri, 11 Nov 2022 18:08:26 +0000 + detector (0.2) unstable; urgency=medium * run webapp as a dedicated service diff --git a/detector/debian/control b/detector/debian/control index 8c1fb036..7531b331 100644 --- a/detector/debian/control +++ b/detector/debian/control @@ -2,33 +2,23 @@ Source: detector Section: python Priority: optional Maintainer: Federico Ceratto -Build-Depends: debhelper-compat (= 12), +Build-Depends: debhelper-compat (= 13), python3, - dh-systemd (>= 1.5), dh-python, - python3-boto3, - python3-lz4, - python3-paramiko, - python3-psycopg2, + python3-clickhouse-driver, python3-setuptools, python3-statsd, - python3-systemd, - python3-ujson -Standards-Version: 4.1.3 + python3-systemd +Standards-Version: 4.6.0 Package: detector Architecture: all Depends: ${misc:Depends}, ${python3:Depends}, - python3-boto3, - python3-lz4, - python3-paramiko, - python3-psycopg2, + python3-clickhouse-driver, python3-setuptools, python3-statsd, - python3-systemd, - python3-ujson, - nginx + python3-systemd Suggests: bpython3, python3-pytest, diff --git a/detector/debian/detector-webapp.service b/detector/debian/detector-webapp.service deleted file mode 100644 index 2c9be94e..00000000 --- a/detector/debian/detector-webapp.service +++ /dev/null @@ -1,36 +0,0 @@ -[Unit] -Description=OONI Detector Webapp -Wants=network-online.target -After=network-online.target - -[Service] -ExecStart=/usr/bin/detector --webapp -Restart=on-abort -Type=simple -RestartSec=2s -WorkingDirectory=/var/lib/detector - -User=fastpath -Group=fastpath -ReadOnlyDirectories=/ -ReadWriteDirectories=/proc/self - -StandardOutput=syslog+console -StandardError=syslog+console - -PermissionsStartOnly=true -LimitNOFILE=65536 - -# Hardening -CapabilityBoundingSet=CAP_SETUID CAP_SETGID -SystemCallFilter=~@clock @debug @cpu-emulation @keyring @module @mount @obsolete @raw-io @reboot @swap -NoNewPrivileges=yes -PrivateDevices=yes -PrivateTmp=yes -ProtectHome=yes -ProtectSystem=full -ProtectKernelModules=yes -ProtectKernelTunables=yes - -[Install] -WantedBy=multi-user.target diff --git a/detector/debian/detector.timer b/detector/debian/detector.timer new file mode 100644 index 00000000..7a6eefb4 --- /dev/null +++ b/detector/debian/detector.timer @@ -0,0 +1,11 @@ +[Unit] +Description=Run event detector +Requires=detector.service + +[Timer] +Unit=detector.service +OnCalendar=*-*-* *:10:00 +Persistent=true + +[Install] +WantedBy=timers.target diff --git a/detector/debian/postinst b/detector/debian/postinst new file mode 100644 index 00000000..fb9f3a67 --- /dev/null +++ b/detector/debian/postinst @@ -0,0 +1,26 @@ +#!/bin/sh +set -e + +case "$1" in + configure) + #addgroup --system --quiet fastpath + #adduser --system --quiet --ingroup fastpath --no-create-home --home /var/lib/fastpath fastpath + mkdir -p /var/lib/detector + chown fastpath /var/lib/detector -R + ;; + + abort-upgrade|abort-remove|abort-deconfigure) + ;; + + *) + echo "postinst called with unknown argument \`$1'" >&2 + exit 1 + ;; +esac + +# dh_installdeb will replace this with shell code automatically +# generated by other debhelper scripts. + +#DEBHELPER# + +exit 0 diff --git a/detector/detector/detector.py b/detector/detector/detector.py index 46a2a2b6..b1fe22f0 100755 --- a/detector/detector/detector.py +++ b/detector/detector/detector.py @@ -1,481 +1,87 @@ #!/usr/bin/env python3 # # -*- coding: utf-8 -*- - """ OONI Event detector -Run sequence: - -Fetch already-processed mean values from internal data directory. -This is done to speed up restarts as processing historical data from the database -would take a very long time. - -Fetch historical msmt from the fastpath and measurement/report/input tables - -Fetch realtime msmt by subscribing to the notifications channel `fastpath` +Fetch historical msmt from the fastpath tables Analise msmt score with moving average to detect blocking/unblocking +See the run_detection function for details -Save outputs to local directories: - - RSS feed /var/lib/detector/rss/ - rss/global.xml All events, globally - rss/by-country/.xml Events by country - rss/type-inp/.xml Events by test_name and input - rss/cc-type-inp/.xml Events by CC, test_name and input - - JSON files with block/unblock events /var/lib/detector/events/ - - JSON files with current blocking status /var/lib/detector/status/ - - Internal data /var/lib/detector/_internal/ +Save RSS feeds to local directories: + - /var/lib/detector/rss/.xml Events by CC, ASN, test_name and input The tree under /var/lib/detector/ is served by Nginx with the exception of _internal Events are defined as changes between blocking and non-blocking on single CC / test_name / input tuples -Outputs are "upserted" where possible. New runs overwrite/update old data. - Runs as a service "detector" in a systemd unit and sandbox -See README.adoc +The --reprocess mode is only for debugging and it's destructive to +the blocking_* DB tables. """ -# Compatible with Python3.6 and 3.7 - linted with Black +# Compatible with Python3.9 - linted with Black # debdeps: python3-setuptools from argparse import ArgumentParser -from collections import namedtuple, deque -from configparser import ConfigParser -from datetime import date, datetime, timedelta + +# from configparser import ConfigParser +from datetime import datetime, timedelta from pathlib import Path from site import getsitepackages -import hashlib +from typing import Generator, Tuple, Optional, Any, Dict +from urllib.parse import urlunsplit, urlencode import logging import os -import pickle -import select -import signal import sys from systemd.journal import JournalHandler # debdeps: python3-systemd -import psycopg2 # debdeps: python3-psycopg2 -import psycopg2.extensions -import psycopg2.extras import ujson # debdeps: python3-ujson import feedgenerator # debdeps: python3-feedgenerator +import statsd # debdeps: python3-statsd -from detector.metrics import setup_metrics -import detector.scoring as scoring - -log = logging.getLogger("detector") -metrics = setup_metrics(name="detector") - -DB_USER = "shovel" -DB_NAME = "metadb" -DB_PASSWORD = "yEqgNr2eXvgG255iEBxVeP" # This is already made public - -RO_DB_USER = "amsapi" -RO_DB_PASSWORD = "b2HUU6gKM19SvXzXJCzpUV" # This is already made public - -DEFAULT_STARTTIME = datetime(2016, 1, 1) +import pandas as pd # debdeps: python3-pandas +import numpy as np # debdeps: python3-numpy -BASEURL = "http://fastpath.ooni.nu:8080" -WEBAPP_URL = BASEURL + "/webapp" - -PKGDIR = getsitepackages()[-1] - -conf = None -cc_to_country_name = None # set by load_country_name_map - -# Speed up psycopg2's JSON load -psycopg2.extras.register_default_jsonb(loads=ujson.loads, globally=True) -psycopg2.extras.register_default_json(loads=ujson.loads, globally=True) - - -def fetch_past_data(conn, start_date): - """Fetch past data in large chunks ordered by measurement_start_time - """ - q = """ - SELECT - coalesce(false) as anomaly, - coalesce(false) as confirmed, - input, - measurement_start_time, - probe_cc, - scores::text, - coalesce('') as report_id, - test_name, - tid - FROM fastpath - WHERE measurement_start_time >= %(start_date)s - AND measurement_start_time < %(end_date)s - - UNION - - SELECT - anomaly, - confirmed, - input, - measurement_start_time, - probe_cc, - coalesce('') as scores, - report_id, - test_name, - coalesce('') as tid - - FROM measurement - JOIN report ON report.report_no = measurement.report_no - JOIN input ON input.input_no = measurement.input_no - WHERE measurement_start_time >= %(start_date)s - AND measurement_start_time < %(end_date)s - ORDER BY measurement_start_time - """ - assert start_date - - end_date = start_date + timedelta(weeks=1) - - chunk_size = 20000 - with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: - while True: - # Iterate across time blocks - now = datetime.utcnow() - # Ignore measurements with future timestamp - if end_date > now: - end_date = now - log.info("Last run") - log.info("Query from %s to %s", start_date, end_date) - p = dict(start_date=str(start_date), end_date=str(end_date)) - cur.execute(q, p) - while True: - # Iterate across chunks of rows - rows = cur.fetchmany(chunk_size) - if not rows: - break - log.info("Fetched msmt chunk of size %d", len(rows)) - for r in rows: - d = dict(r) - if d["scores"]: - d["scores"] = ujson.loads(d["scores"]) - - yield d - - if end_date == now: - break - - start_date += timedelta(weeks=1) - end_date += timedelta(weeks=1) - - -def fetch_past_data_selective(conn, start_date, cc, test_name, inp): - """Fetch past data in large chunks - """ - chunk_size = 200_000 - q = """ - SELECT - coalesce(false) as anomaly, - coalesce(false) as confirmed, - input, - measurement_start_time, - probe_cc, - probe_asn, - scores::text, - test_name, - tid - FROM fastpath - WHERE measurement_start_time >= %(start_date)s - AND probe_cc = %(cc)s - AND test_name = %(test_name)s - AND input = %(inp)s - - UNION - - SELECT - anomaly, - confirmed, - input, - measurement_start_time, - probe_cc, - probe_asn, - coalesce('') as scores, - test_name, - coalesce('') as tid - - FROM measurement - JOIN report ON report.report_no = measurement.report_no - JOIN input ON input.input_no = measurement.input_no - WHERE measurement_start_time >= %(start_date)s - AND probe_cc = %(cc)s - AND test_name = %(test_name)s - AND input = %(inp)s - - ORDER BY measurement_start_time - """ - p = dict(cc=cc, inp=inp, start_date=start_date, test_name=test_name) - - with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cur: - cur.execute(q, p) - while True: - rows = cur.fetchmany(chunk_size) - if not rows: - break - log.info("Fetched msmt chunk of size %d", len(rows)) - for r in rows: - d = dict(r) - if d["scores"]: - d["scores"] = ujson.loads(d["scores"]) - - yield d - - -def backfill_scores(d): - """Generate scores dict for measurements from the traditional pipeline - """ - if d.get("scores", None): - return - b = ( - scoring.anomaly - if d["anomaly"] - else 0 + scoring.confirmed - if d["confirmed"] - else 0 - ) - d["scores"] = dict(blocking_general=b) - - -def detect_blocking_changes_1s_g(g, cc, test_name, inp, start_date): - """Used by webapp - :returns: (msmts, changes) - """ - means = {} - msmts = [] - changes = [] - - for msm in g: - backfill_scores(msm) - k = (msm["probe_cc"], msm["test_name"], msm["input"]) - assert isinstance(msm["scores"], dict), repr(msm["scores"]) - change = detect_blocking_changes(means, msm, warmup=True) - date, mean, bblocked = means[k] - val = msm["scores"]["blocking_general"] - if change: - changes.append(change) - - msmts.append((date, val, mean)) - - log.debug("%d msmts processed", len(msmts)) - assert isinstance(msmts[0][0], datetime) - return (msmts, changes) - - -def detect_blocking_changes_one_stream(conn, cc, test_name, inp, start_date): - """Used by webapp - :returns: (msmts, changes) - """ - # TODO: move into webapp? - g = fetch_past_data_selective(conn, start_date, cc, test_name, inp) - return detect_blocking_changes_1s_g(g, cc, test_name, inp, start_date) - - -def load_asn_db(): - db_f = conf.vardir / "ASN.csv" - log.info("Loading %s", db_f) - if not db_f.is_file(): - log.info("No ASN file") - return {} - - d = {} - with db_f.open() as f: - for line in f: - try: - asn, name = line.split(",", 1) - asn = int(asn) - name = name.strip()[1:-2].strip() - d[asn] = name - except: - continue - - log.info("%d ASNs loaded", len(d)) - return d +from clickhouse_driver import Client as Clickhouse +try: + from tqdm import tqdm +except ImportError: -def prevent_future_date(msm): - """If the msmt time is in the future replace it with utcnow - """ - # Timestamp are untrusted as they are generated by the probes - # This makes the process non-deterministic and non-reproducible - # but we can run unit tests against good inputs or mock utctime - # - # Warning: measurement_start_time is used for ranged queries against the DB - # and to pinpoint measurements and changes - now = datetime.utcnow() - if msm["measurement_start_time"] > now: - delta = msm["measurement_start_time"] - now - some_id = msm.get("tid", None) or msm.get("report_id", "") - log.info("Masking measurement %s %s in the future", some_id, delta) - msm["measurement_start_time"] = now - - -def detect_blocking_changes_asn_one_stream(conn, cc, test_name, inp, start_date): - """Used by webapp - :returns: (msmts, changes) - """ - g = fetch_past_data_selective(conn, start_date, cc, test_name, inp) - means = {} - msmts = [] - changes = [] - asn_breakdown = {} - - for msm in g: - backfill_scores(msm) - prevent_future_date(msm) - k = (msm["probe_cc"], msm["test_name"], msm["input"]) - assert isinstance(msm["scores"], dict), repr(msm["scores"]) - change = detect_blocking_changes(means, msm, warmup=True) - date, mean, _ = means[k] - val = msm["scores"]["blocking_general"] - if change: - changes.append(change) - - msmts.append((date, val, mean)) - del date - del val - del mean - del change - - # Generate charts for popular AS - asn = msm["probe_asn"] - a = asn_breakdown.get(asn, dict(means={}, msmts=[], changes=[])) - change = detect_blocking_changes(a["means"], msm, warmup=True) - date, mean, _ = a["means"][k] - val = msm["scores"]["blocking_general"] - a["msmts"].append((date, val, mean)) - if change: - a["changes"].append(change) - asn_breakdown[asn] = a - - log.debug("%d msmts processed", len(msmts)) - return (msmts, changes, asn_breakdown) - - -Change = namedtuple( - "Change", - [ - "probe_cc", - "test_name", - "input", - "blocked", - "mean", - "measurement_start_time", - "tid", - "report_id", - ], -) + def tqdm(x, *a, **kw): # type: ignore + return x -MeanStatus = namedtuple("MeanStatus", ["measurement_start_time", "val", "blocked"]) +log = logging.getLogger("detector") +metrics = statsd.StatsClient("localhost", 8125, prefix="detector") -def detect_blocking_changes(means: dict, msm: dict, warmup=False): - """Detect changes in blocking patterns - :returns: Change or None - """ - # TODO: move out params - upper_limit = 0.10 - lower_limit = 0.05 - # P: averaging value - # p=1: no averaging - # p=0.000001: very strong averaging - p = 0.02 - - inp = msm["input"] - if inp is None: - return +DBURI = "clickhouse://detector:detector@localhost/default?use_numpy=True" +TCAI = ["test_name", "probe_cc", "probe_asn", "input"] +tTCAI = ["t", "test_name", "probe_cc", "probe_asn", "input"] - if not isinstance(inp, str): - # Some inputs are lists. TODO: handle them? - log.debug("odd input") - return +conf: Any = None +click: Clickhouse = None +cc_to_country_name: Dict[str, str] = {} # CC-> name, see load_country_name_map - k = (msm["probe_cc"], msm["test_name"], inp) - tid = msm.get("tid", None) - report_id = msm.get("report_id", None) or None - - assert isinstance(msm["scores"], dict), repr(msm["scores"]) - blocking_general = msm["scores"]["blocking_general"] - measurement_start_time = msm["measurement_start_time"] - assert isinstance(measurement_start_time, datetime), repr(measurement_start_time) - - if k not in means: - # cc/test_name/input tuple never seen before - blocked = blocking_general > upper_limit - means[k] = MeanStatus(measurement_start_time, blocking_general, blocked) - if blocked: - if not warmup: - log.info("%r new and blocked", k) - metrics.incr("detected_blocked") - - return Change( - measurement_start_time=measurement_start_time, - blocked=blocked, - mean=blocking_general, - probe_cc=msm["probe_cc"], - input=msm["input"], - test_name=msm["test_name"], - tid=tid, - report_id=report_id, - ) - - else: - return None - - old = means[k] - assert isinstance(old, MeanStatus) - # tdelta = measurement_start_time - old.time - # TODO: average weighting by time delta; add timestamp to status and means - # TODO: record msm leading to status change - new_val = (1 - p) * old.val + p * blocking_general - means[k] = MeanStatus(measurement_start_time, new_val, old.blocked) - - if old.blocked and new_val < lower_limit: - # blocking cleared - means[k] = MeanStatus(measurement_start_time, new_val, False) - if not warmup: - log.info("%r cleared %.2f", k, new_val) - metrics.incr("detected_cleared") - - return Change( - measurement_start_time=measurement_start_time, - blocked=False, - mean=new_val, - probe_cc=msm["probe_cc"], - input=msm["input"], - test_name=msm["test_name"], - tid=tid, - report_id=report_id, - ) - if not old.blocked and new_val > upper_limit: - means[k] = MeanStatus(measurement_start_time, new_val, True) - if not warmup: - log.info("%r blocked %.2f", k, new_val) - metrics.incr("detected_blocked") - - return Change( - measurement_start_time=measurement_start_time, - blocked=True, - mean=new_val, - probe_cc=msm["probe_cc"], - input=msm["input"], - test_name=msm["test_name"], - tid=tid, - report_id=report_id, - ) +def query(*a, **kw): + settings = {} + if conf.reprocess: + settings["log_query"] = 0 + else: + log.info(a) + return click.execute(*a, settings=settings, **kw) -def parse_date(d): - return datetime.strptime(d, "%Y-%m-%d").date() +def parse_date(d: str) -> datetime: + return datetime.strptime(d, "%Y-%m-%d %H:%M") -def setup_dirs(conf, root): - """Setup directories, creating them if needed - """ +def setup_dirs(root: Path) -> None: + """Setup directories, creating them if needed""" conf.vardir = root / "var/lib/detector" conf.outdir = conf.vardir / "output" conf.rssdir = conf.outdir / "rss" @@ -484,7 +90,6 @@ def setup_dirs(conf, root): conf.rssdir_by_cc_tname_inp = conf.rssdir / "cc-type-inp" conf.eventdir = conf.outdir / "events" conf.statusdir = conf.outdir / "status" - conf.pickledir = conf.outdir / "_internal" for p in ( conf.vardir, conf.outdir, @@ -494,7 +99,6 @@ def setup_dirs(conf, root): conf.rssdir_by_cc_tname_inp, conf.eventdir, conf.statusdir, - conf.pickledir, ): p.mkdir(parents=True, exist_ok=True) @@ -503,423 +107,647 @@ def setup(): os.environ["TZ"] = "UTC" global conf ap = ArgumentParser(__doc__) + # TODO cleanup args used for debugging ap.add_argument("--devel", action="store_true", help="Devel mode") - ap.add_argument("--webapp", action="store_true", help="Run webapp") + ap.add_argument("-v", action="store_true", help="High verbosity") + ap.add_argument("--reprocess", action="store_true", help="Reprocess events") ap.add_argument("--start-date", type=lambda d: parse_date(d)) - ap.add_argument("--db-host", default=None, help="Database hostname") - ap.add_argument( - "--ro-db-host", default=None, help="Read-only database hostname" - ) + ap.add_argument("--end-date", type=lambda d: parse_date(d)) + ap.add_argument("--interval-mins", type=int, default=60) + ap.add_argument("--db-uri", default=DBURI, help="Database hostname") conf = ap.parse_args() if conf.devel: format = "%(relativeCreated)d %(process)d %(levelname)s %(name)s %(message)s" - logging.basicConfig(stream=sys.stdout, level=logging.DEBUG, format=format) + lvl = logging.DEBUG if conf.v else logging.DEBUG + logging.basicConfig(stream=sys.stdout, level=lvl, format=format) else: log.addHandler(JournalHandler(SYSLOG_IDENTIFIER="detector")) log.setLevel(logging.DEBUG) - + conf.interval = timedelta(minutes=conf.interval_mins) # Run inside current directory in devel mode root = Path(os.getcwd()) if conf.devel else Path("/") - conf.conffile = root / "etc/detector.conf" - log.info("Using conf file %r", conf.conffile.as_posix()) - cp = ConfigParser() - with open(conf.conffile) as f: - cp.read_file(f) - conf.db_host = conf.db_host or cp["DEFAULT"]["db-host"] - conf.ro_db_host = conf.ro_db_host or cp["DEFAULT"]["ro-db-host"] - - setup_dirs(conf, root) - - -@metrics.timer("handle_new_measurement") -def handle_new_msg(msg, means, rw_conn): - """Handle one measurement received in realtime from PostgreSQL - notifications - """ - msm = ujson.loads(msg.payload) - assert isinstance(msm["scores"], dict), type(msm["scores"]) - msm["measurement_start_time"] = datetime.strptime( - msm["measurement_start_time"], "%Y-%m-%d %H:%M:%S" + setup_dirs(root) + # conf.conffile = root / "etc/ooni/detector.conf" + # log.info("Using conf file %r", conf.conffile.as_posix()) + # cp = ConfigParser() + # with open(conf.conffile) as f: + # cp.read_file(f) + # conf.db_uri = conf.db_uri or cp["DEFAULT"]["clickhouse_url"] + + +# # RSS feed generation + + +def explorer_mat_url( + test_name: str, inp: str, probe_cc: str, probe_asn: int, t: datetime +) -> str: + """Generates a link to the MAT to display an event""" + since = str(t - timedelta(days=7)) + until = str(t + timedelta(days=7)) + p = dict( + test_name=test_name, + axis_x="measurement_start_day", + since=since, + until=until, + probe_asn=f"AS{probe_asn}", + probe_cc=probe_cc, + input=inp, ) - log.debug("Notify for msmt from %s", msm.get("probe_cc", "")) - prevent_future_date(msm) - change = detect_blocking_changes(means, msm, warmup=False) - if change is not None: - upsert_change(change) - - -def connect_to_db(db_host, db_user, db_name, db_password): - dsn = f"host={db_host} user={db_user} dbname={db_name} password={db_password}" - log.info("Connecting to database: %r", dsn) - conn = psycopg2.connect(dsn) - conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) - return conn - - -def snapshot_means(msm, last_snapshot_date, means): - """Save means to disk every month - """ - # TODO: add config parameter to recompute past data - t = msm["measurement_start_time"] - month = date(t.year, t.month, 1) - if month == last_snapshot_date: - return last_snapshot_date - - log.info("Saving %s monthly snapshot", month) - save_means(means, month) - return month - - -def process_historical_data(ro_conn, rw_conn, start_date, means): - """Process past data - """ - assert start_date - log.info("Running process_historical_data from %s", start_date) - t = metrics.timer("process_historical_data").start() - cnt = 0 - # fetch_past_data returns measurements ordered by measurement_start_time - last_snap = None - for past_msm in fetch_past_data(ro_conn, start_date): - backfill_scores(past_msm) - prevent_future_date(past_msm) - last_snap = snapshot_means(past_msm, last_snap, means) - change = detect_blocking_changes(means, past_msm, warmup=True) - cnt += 1 - if change is not None: - upsert_change(change) - - metrics.incr("processed_msmt") - - t.stop() - for m in means.values(): - assert isinstance(m[2], bool), m - - blk_cnt = sum(m[2] for m in means.values()) # count blocked - p = 100 * blk_cnt / len(means) - log.info("%d tracked items, %d blocked (%.3f%%)", len(means), blk_cnt, p) - log.info("Processed %d measurements. Speed: %d K-items per second", cnt, cnt / t.ms) - - -def create_url(change): - return f"{WEBAPP_URL}/chart?cc={change.probe_cc}&test_name={change.test_name}&input={change.input}&start_date=" - - -def basefn(cc, test_name, inp): - """Generate opaque filesystem-safe filename - inp can be "" or None (returning different hashes) - """ - d = f"{cc}:{test_name}:{inp}" - h = hashlib.shake_128(d.encode()).hexdigest(16) - return h - - -# TODO rename changes to events? - - -def explorer_url(c: Change) -> str: - return f"https://explorer.ooni.org/measurement/{c.report_id}?input={c.input}" - - -# TODO: regenerate RSS feeds (only) once after the warmup terminates + return urlunsplit(("https", "explorer.ooni.org", "/chart/mat", urlencode(p), "")) @metrics.timer("write_feed") def write_feed(feed, p: Path) -> None: """Write out RSS feed atomically""" tmp_p = p.with_suffix(".tmp") - with tmp_p.open("w") as f: - feed.write(f, "utf-8") - + tmp_p.write_text(feed) tmp_p.rename(p) -global_feed_cache = deque(maxlen=1000) - - -@metrics.timer("update_rss_feed_global") -def update_rss_feed_global(change: Change) -> None: - """Generate RSS feed for global events and write it in - /var/lib/detector/rss/global.xml +@metrics.timer("generate_rss_feed") +def generate_rss_feed(events: pd.DataFrame, update_time: datetime) -> Tuple[str, Path]: """ - # The files are served by Nginx - global global_feed_cache - if not change.input: - return - global_feed_cache.append(change) + Generate RSS feed for a single TCAI into /var/lib/detector/rss/.xml + The files are then served by Nginx + """ + x = events.iloc[0] + minp = x.input.replace("://", "_").replace("/", "_") + fname = f"{x.test_name}-{x.probe_cc}-AS{x.probe_asn}-{minp}" + log.info(f"Generating feed for {fname}. {len(events)} events.") + feed = feedgenerator.Rss201rev2Feed( title="OONI events", link="https://explorer.ooni.org", description="Blocked services and websites detected by OONI", language="en", ) - for c in global_feed_cache: - un = "" if c.blocked else "un" - country = cc_to_country_name.get(c.probe_cc.upper(), c.probe_cc) + for e in events.itertuples(): + cc = e.probe_cc.upper() + # TODO use country + country = cc_to_country_name.get(cc, cc) + status2 = "unblocked" if e.status == "OK" else "blocked" + link = explorer_mat_url(e.test_name, e.input, e.probe_cc, e.probe_asn, e.time) feed.add_item( - title=f"{c.input} {un}blocked in {country}", - link=explorer_url(c), - description=f"Change detected on {c.measurement_start_time}", - pubdate=c.measurement_start_time, - updateddate=datetime.utcnow(), + title=f"{e.input} {status2} in {e.probe_cc} AS{e.probe_asn}", + link=link, + description=f"Change detected on {e.time}", + pubdate=e.time, + updateddate=update_time, ) - write_feed(feed, conf.rssdir / "global.xml") - -by_cc_feed_cache = {} + path = conf.rssdir / f"{fname}.xml" + return feed.writeString("utf-8"), path -@metrics.timer("update_rss_feed_by_country") -def update_rss_feed_by_country(change: Change) -> None: - """Generate RSS feed for events grouped by country and write it in - /var/lib/detector/rss/by-country/.xml +@metrics.timer("rebuild_feeds") +def rebuild_feeds(events: pd.DataFrame) -> int: + """Rebuild whole feeds for each TCAI""" + # When generated in real time "events" only contains the recent events for + # each TCAI. We need the full history. + # Changes are rare enough that running a query on blocking_events for each + # change is not too heavy + cnt = 0 + sql = """SELECT test_name, probe_cc, probe_asn, input, time, status + FROM blocking_events + WHERE test_name = %(test_name)s AND input = %(inp)s + AND probe_cc = %(cc)s AND probe_asn = %(asn)s + ORDER BY time """ - # The files are served by Nginx - global by_cc_feed_cache - if not change.input: - return - cc = change.probe_cc - if cc not in by_cc_feed_cache: - by_cc_feed_cache[cc] = deque(maxlen=1000) - by_cc_feed_cache[cc].append(change) - feed = feedgenerator.Rss201rev2Feed( - title=f"OONI events in {cc}", - link="https://explorer.ooni.org", - description="Blocked services and websites detected by OONI", - language="en", - ) - for c in by_cc_feed_cache[cc]: - un = "" if c.blocked else "un" - country = cc_to_country_name.get(c.probe_cc.upper(), c.probe_cc) - feed.add_item( - title=f"{c.input} {un}blocked in {country}", - link=explorer_url(c), - description=f"Change detected on {c.measurement_start_time}", - pubdate=c.measurement_start_time, - updateddate=datetime.utcnow(), - ) - write_feed(feed, conf.rssdir_by_cc / f"{cc}.xml") + events = events.reset_index() + unique_tcais = events[TCAI].drop_duplicates() + update_time = datetime.utcnow() + for x in unique_tcais.itertuples(): + d = dict(test_name=x.test_name, inp=x.input, cc=x.probe_cc, asn=x.probe_asn) + history = click.query_dataframe(sql, d) + if len(history): + feed_data, path = generate_rss_feed(history, update_time) + write_feed(feed_data, path) + cnt += len(history) + + log.info(f"[re]created {cnt} feeds") + return cnt + + +# # Initialization -@metrics.timer("update_rss_feeds_by_cc_tname_inp") -def update_rss_feeds_by_cc_tname_inp(events, hashfname): - """Generate RSS feed by cc / test_name / input and write - /var/lib/detector/rss/cc-type-inp/.xml +def load_country_name_map(devel: bool) -> dict: + """Loads country-list.json and creates a lookup dictionary""" + try: + fi = "detector/data/country-list.json" + log.info("Loading %s", fi) + with open(fi) as f: + clist = ujson.load(f) + except FileNotFoundError: + pkgdir = getsitepackages()[-1] + fi = f"{pkgdir}/detector/data/country-list.json" + log.info("Loading %s", fi) + with open(fi) as f: + clist = ujson.load(f) + + # The file is deployed with the detector: crash out if it's broken + d = {} + for c in clist: + cc = c["iso3166_alpha2"] + name = c["name"] + assert cc and (len(cc) == 2) and name + d[cc.upper()] = name + + log.info("Loaded %d country names", len(d)) + return d + + +def create_tables() -> None: + # Requires admin privileges + sql = """ + CREATE TABLE IF NOT EXISTS blocking_status + ( + `test_name` String, + `input` String, + `probe_cc` String, + `probe_asn` Int32, + `confirmed_perc` Float32, + `pure_anomaly_perc` Float32, + `accessible_perc` Float32, + `cnt` Float32, + `status` String, + `old_status` String, + `change` Float32, + `stability` Float32, + `update_time` DateTime64(0) MATERIALIZED now64() + ) + ENGINE = ReplacingMergeTree + ORDER BY (test_name, input, probe_cc, probe_asn) + SETTINGS index_granularity = 4 """ - # The files are served by Nginx - feed = feedgenerator.Rss201rev2Feed( - title="OONI events", - link="https://explorer.ooni.org", - description="Blocked services and websites detected by OONI", - language="en", + query(sql) + sql = """ + CREATE TABLE IF NOT EXISTS blocking_events + ( + `test_name` String, + `input` String, + `probe_cc` String, + `probe_asn` Int32, + `status` String, + `time` DateTime64(3), + `detection_time` DateTime64(0) MATERIALIZED now64() ) - # TODO: render date properly and add blocked/unblocked - # TODO: put only recent events in the feed (based on the latest event time) - for e in events: - if not e["input"]: - continue + ENGINE = ReplacingMergeTree + ORDER BY (test_name, input, probe_cc, probe_asn, time) + SETTINGS index_granularity = 4 + """ + query(sql) + sql = "CREATE USER IF NOT EXISTS detector IDENTIFIED WITH plaintext_password BY 'detector'" + query(sql) + query("GRANT SELECT,INSERT,OPTIMIZE,SHOW ON blocking_status TO detector") + query("GRANT SELECT,INSERT,OPTIMIZE,SHOW ON blocking_events TO detector") + query("GRANT SELECT ON * TO detector") + + +def create_empty_status_df() -> pd.DataFrame: + status = pd.DataFrame( + columns=[ + "status", + "old_status", + "change", + "stability", + "test_name", + "probe_cc", + "probe_asn", + "input", + "accessible_perc", + "cnt", + "confirmed_perc", + "pure_anomaly_perc", + ] + ) + status.set_index(TCAI, inplace=True) + return status + + +def reprocess_inner( + gen, time_slots_cnt: int, collect_hist=False +) -> Tuple[pd.DataFrame, pd.DataFrame, Optional[pd.DataFrame]]: + # df = pd.DataFrame({'Courses': pd.Series(dtype='str'), + # 'Fee': pd.Series(dtype='int'), + # 'Duration': pd.Series(dtype='str'), + # 'Discount': pd.Series(dtype='float')}) + status = create_empty_status_df() + events_tmp = [] + status_history_tmp = [] + + log.info(f"Processing {time_slots_cnt} time slots") + for new in tqdm(gen, total=time_slots_cnt): + assert "Unnamed: 0" not in sorted(new.columns), sorted(new.columns) + # assert new.index.names == TCAI, new.index.names + status, events = process_data(status, new) + if events is not None and len(events): + events_tmp.append(events) + if collect_hist: + status_history_tmp.append(status) + + if events_tmp: + events = pd.concat(events_tmp) + else: + events = None + status_history = pd.concat(status_history_tmp) if collect_hist else None + return events, status, status_history + + +@metrics.timer("process_historical_data") +def process_historical_data( + start_date: datetime, + end_date: datetime, + interval: timedelta, + services: dict, + probe_cc=None, + collect_hist=False, +) -> Tuple[pd.DataFrame, pd.DataFrame, Optional[pd.DataFrame]]: + """Processes past data. Rebuilds blocking_status table and events + Keeps blocking_status and blocking_events in memory during the run. + """ + log.info(f"Running process_historical_data from {start_date} to {end_date}") + urls = sorted(set(u for urls in services.values() for u in urls)) + time_slots_cnt = int((end_date - interval - start_date) / interval) - country = cc_to_country_name.get(c.probe_cc.upper(), c.probe_cc) - feed.add_item( - title=f"{c.input} {un}blocked in {country}", - link=explorer_url(c), - description=f"Change detected on {c.measurement_start_time}", - pubdate=c.measurement_start_time, - updateddate=datetime.utcnow(), - ) + gen = gen_input(click, start_date, end_date, interval, urls) + events, status, status_history = reprocess_inner(gen, time_slots_cnt) - write_feed(feed, conf.rssdir / f"{hashfname}.xml") + log.debug("Replacing blocking_status table") + click.execute("TRUNCATE TABLE blocking_status SYNC") + tmp_s = status.reset_index() # .convert_dtypes() + click.insert_dataframe("INSERT INTO blocking_status VALUES", tmp_s) -def update_status_files(blocking_events): - # The files are served by Nginx - return # FIXME + log.debug("Replacing blocking_events table") + click.execute("TRUNCATE TABLE blocking_events SYNC") + if events is not None and len(events): + sql = "INSERT INTO blocking_events VALUES" + click.insert_dataframe(sql, events.reset_index(drop=True)) - # This contains the last status change for every cc/test_name/input - # that ever had a block/unblock event - status = {k: v[-1] for k, v in blocking_events.items()} + log.info("Done") + return events, status, status_history - statusfile = conf.statusdir / f"status.json" - d = dict(format=1, status=status) - with statusfile.open("w") as f: - ujson.dump(d, f) - log.debug("Wrote %s", statusfile) +@metrics.timer("process_fresh_data") +def process_fresh_data( + start_date: datetime, + end_date: datetime, + interval: timedelta, + services: dict, + probe_cc=None, + collect_hist=False, +) -> Tuple[pd.DataFrame, pd.DataFrame]: + """Processes current data.""" + log.info(f"Running process_fresh_data from {start_date} to {end_date}") + urls = sorted(set(u for urls in services.values() for u in urls)) + status = load_blocking_status() + metrics.gauge("blocking_status_tblsize", len(status)) -@metrics.timer("upsert_change") -def upsert_change(change): - """Create / update RSS and JSON files with a new change - """ - # Create DB tables in future if needed - debug_url = create_url(change) - log.info("Change! %r %r", change, debug_url) - if not change.report_id: - log.error("Empty report_id") - return + gen = gen_input(click, start_date, end_date, interval, urls) + new = None + for x in gen: + new = x + pass - try: - update_rss_feed_global(change) - update_rss_feed_by_country(change) - except Exception as e: - log.error(e, exc_info=1) - - # TODO: currently unused - return - - # Append change to a list in a JSON file - # It contains all the block/unblock events for a given cc/test_name/input - hashfname = basefn(change.probe_cc, change.test_name, change.input) - events_f = conf.eventdir / f"{hashfname}.json" - if events_f.is_file(): - with events_f.open() as f: - ecache = ujson.load(f) - else: - ecache = dict(format=1, blocking_events=[]) + if new is None or len(new) == 0: + log.error("Empty measurament batch received") + sys.exit(1) - ecache["blocking_events"].append(change._asdict()) - log.info("Saving %s", events_f) - with events_f.open("w") as f: - ujson.dump(ecache, f) + log.info(f"New rows: {len(new)} Status rows: {len(status)}") + status, events = process_data(status, new) - update_rss_feeds_by_cc_tname_inp(ecache["blocking_events"], hashfname) + assert len(status) - update_status_files(ecache["blocking_events"]) + log.debug("Updating blocking_status table") + wanted_cols = [ + "test_name", + "input", + "probe_cc", + "probe_asn", + "confirmed_perc", + "pure_anomaly_perc", + "accessible_perc", + "cnt", + "status", + "old_status", + "change", + "stability", + ] + status.status.fillna("UNKNOWN", inplace=True) + status.old_status.fillna("UNKNOWN", inplace=True) + status.change.fillna(0, inplace=True) + tmp_s = status.reset_index()[wanted_cols].convert_dtypes() + click.execute("TRUNCATE TABLE blocking_status SYNC") + click.insert_dataframe("INSERT INTO blocking_status VALUES", tmp_s) + + if events is not None and len(events): + log.debug(f"Appending {len(events)} events to blocking_events table") + ev = events.reset_index() + ev = ev.drop(columns=["old_status"]) + ev["time"] = end_date # event detection time + log.info(ev) + assert ev.columns.values.tolist() == [ + "test_name", + "probe_cc", + "probe_asn", + "input", + "status", + "time", + ] + sql = "INSERT INTO blocking_events (test_name, probe_cc, probe_asn, input, status, time) VALUES" + click.insert_dataframe(sql, ev) + + log.info("Done") + return events, status + + +def gen_stats(): + """Generates gauge metrics showing the table sizes""" + sql = "SELECT count() FROM blocking_status FINAL" + bss = query(sql)[0][0] + metrics.gauge("blocking_status_tblsize", bss) + sql = "SELECT count() FROM blocking_events" + bes = query(sql)[0][0] + metrics.gauge("blocking_events_tblsize", bes) + + +def process_data( + blocking_status: pd.DataFrame, new: pd.DataFrame +) -> Tuple[pd.DataFrame, pd.DataFrame]: + """Detects blocking. The inputs are the current blocking status and a df + with new data from a single timeslice. + Returns an updated blocking status and a df with new blocking events. + """ + if len(blocking_status) == 0 and len(new) == 0: + return blocking_status, [] + + m = blocking_status.merge(new, how="outer", on=TCAI, suffixes=("_BS", "")) + assert "index" not in m.columns + m = m.reset_index().set_index(TCAI) # performance improvement? + assert m.index.names == TCAI + + m["input_cnt"] = m.cnt + m = m.fillna(value=dict(cnt=0, cnt_BS=0, accessible_perc_BS=m.accessible_perc)) + tau = 0.9 + + mavg_cnt = m.cnt * (1 - tau) + m.cnt_BS * tau + # totcnt = m.cnt + m.cnt_BS + # cp = (new.confirmed_perc * new.cnt * mu + blocking_status.confirmed_perc * blocking_status.cnt * tau / totcnt #AS confirmed_perc, + # ap = (new.pure_anomaly_perc * new.cnt * mu + blocking_status.pure_anomaly_perc * blocking_status.cnt * tau) / totcnt #AS pure_anomaly_perc, + # NOTE: using fillna(0) on percentages looks like a bug but the value is going to be ignored due to the cnt set to 0 + + m["input_ap"] = m.accessible_perc + tmp_ap = m.accessible_perc.fillna(m.accessible_perc_BS) + delta = (tmp_ap - m.accessible_perc_BS) / 100 + + # Weight the amount of datapoints in the current timeslot with + nu = m.cnt / (m.cnt + m.cnt_BS) + nu = nu * tau + + m.accessible_perc = ( + m.accessible_perc.fillna(m.accessible_perc_BS) * (1 - nu) + + m.accessible_perc_BS * nu + ) -def load_means(): - """Load means from a pkl file - The file is safely owned by the detector. + # Stability moves slowly towards 1 when accessible_perc is constant over + # time but drop quickly towards 0 when accessible_perc changes a lot. + # It is later on used to decide when we are confident enough to make + # statements on BLOCKED/OK status. It is also immediately set to 0 when we + # detect a blocking change event to mitigate flapping. + s_def = 0.7 # default stability + stability_thr = 0.8 # threshold to consider a TCAI stable + gtau = 0.99 # moving average tau for + btau = 0.7 + + stability = np.cos(3.14 / 2 * delta) + m["stab_insta"] = stability # used for charting + good = stability * (1 - gtau) + m.stability.fillna(s_def) * gtau + gstable = stability >= stability_thr + m.loc[gstable, "stability"] = good[gstable] + + bad = stability * (1 - btau) + m.stability.fillna(s_def) * btau + bstable = stability < stability_thr + m.loc[bstable, "stability"] = bad[bstable] + + m.status = m.status.fillna("UNKNOWN") + m.old_status = m.status.fillna("UNKNOWN") + + # Use different stability thresholds for OK vs BLOCKED? + stable = (m.stability > 0.98) & (stability > 0.98) + m.loc[(m.accessible_perc < 80) & stable, "status"] = "BLOCKED" + m.loc[(m.accessible_perc > 95) & stable, "status"] = "OK" + + # Detect status changes AKA events + # Always use braces on both expressions + sel = (m.status != m.old_status) & (m.old_status != "UNKNOWN") + ww = m[sel] + if len(ww): + ww = ww[["status", "old_status"]] + # Drop stability to 0 after an event to prevent noisy detection + m.loc[sel, "stability"] = 0 + + events = m[sel][["status", "old_status"]] + if conf.devel: + exp_cols = [ + "old_status", + "status", + ] + assert sorted(events.columns) == exp_cols, sorted(events.columns) + + m.confirmed_perc.fillna(m.confirmed_perc_BS, inplace=True) + m.pure_anomaly_perc.fillna(m.pure_anomaly_perc_BS, inplace=True) + # m.accessible_perc.fillna(m.accessible_perc_BS, inplace=True) + m = m.drop( + [ + "confirmed_perc_BS", + "pure_anomaly_perc_BS", + "accessible_perc_BS", + "cnt", + "cnt_BS", + ], + axis=1, + ) + + # moving average on cnt + m["cnt"] = mavg_cnt + assert m.index.names == TCAI + + # m.reset_index(inplace=True) + # if "index" in m.columns: + # m.drop(["index"], axis=1, inplace=True) + + # if conf.devel: + # exp_cols = [ + # "accessible_perc", + # "change", + # "cnt", + # "confirmed_perc", + # "input", + # "input_ap", + # "input_cnt", + # "old_status", + # "probe_asn", + # "probe_cc", + # "pure_anomaly_perc", + # "stab_insta", + # "stability", + # "status", + # "test_name", + # ] + # assert exp_cols == sorted(m.columns), sorted(m.columns) + + return m, events + + +def gen_input( + click, + start_date: datetime, + end_date: datetime, + interval: timedelta, + urls: list[str], +) -> Generator[pd.DataFrame, None, None]: + """Queries the fastpath table for measurament counts grouped by TCAI. + Yields a dataframe for each time interval. Use read-ahead where needed + to speed up reprocessing. """ - pf = conf.pickledir / "means.pkl" - log.info("Loading means from %s", pf) - if pf.is_file(): - perms = pf.stat().st_mode - assert (perms & 2) == 0, "Insecure pickle permissions %s" % oct(perms) - with pf.open("rb") as f: - means = pickle.load(f) - - assert means - earliest = min(m.measurement_start_time for m in means.values()) - latest = max(m.measurement_start_time for m in means.values()) - # Cleanup - # t = datetime.utcnow() - # for k, m in means.items(): - # if m.measurement_start_time > t: - # log.info("Fixing time") - # means[k] = MeanStatus(t, m.val, m.blocked) - - latest = max(m[0] for m in means.values()) - log.info("Earliest mean: %s", earliest) - return means, latest - - log.info("Creating new means file") - return {}, None - - -def save_means(means, date): - """Save means atomically. Protocol 4 + assert start_date < end_date + assert interval == timedelta(minutes=60) + read_ahead = interval * 6 * 24 + sql = """ + SELECT test_name, probe_cc, probe_asn, input, + countIf(confirmed = 't') * 100 / cnt AS confirmed_perc, + countIf(anomaly = 't') * 100 / cnt - confirmed_perc AS pure_anomaly_perc, + countIf(anomaly = 'f') * 100 / cnt AS accessible_perc, + count() AS cnt, + toStartOfHour(measurement_start_time) AS t + FROM fastpath + WHERE test_name IN ['web_connectivity'] + AND msm_failure = 'f' + AND measurement_start_time >= %(start_date)s + AND measurement_start_time < %(end_date)s + AND input IN %(urls)s + GROUP BY test_name, probe_cc, probe_asn, input, t + ORDER BY t """ - tstamp = date.strftime(".%Y-%m-%d") if date else "" - pf = conf.pickledir / f"means{tstamp}.pkl" - pft = pf.with_suffix(".tmp") - if not means: - log.error("No means to save") - return - log.info("Saving %d means to %s", len(means), pf) - latest = max(m[0] for m in means.values()) - log.info("Latest mean: %s", latest) - with pft.open("wb") as f: - pickle.dump(means, f, protocol=4) - pft.rename(pf) - log.info("Saving completed") + cache = None + t = start_date + while t < end_date: + # Load chunk from DB doing read-ahead (if needed) + partial_end_date = min(end_date, t + read_ahead) + d = dict(start_date=t, end_date=partial_end_date, urls=urls) + log.info(f"Querying fastpath from {t} to {partial_end_date}") + cache = click.query_dataframe(sql, d) + if len(cache) == 0: + t = partial_end_date + log.info("No data") + continue + while t < partial_end_date: + out = cache[cache.t == t] + if len(out): + out = out.drop(["t"], axis=1) + log.info(f"Returning {len(out)} rows {t}") + yield out + t += interval + + +def load_blocking_status() -> pd.DataFrame: + """Loads the current blocking status into a dataframe.""" + log.debug("Loading blocking status") + sql = """SELECT status, old_status, change, stability, + test_name, probe_cc, probe_asn, input, + accessible_perc, cnt, confirmed_perc, pure_anomaly_perc + FROM blocking_status FINAL + """ + blocking_status = click.query_dataframe(sql) + if len(blocking_status) == 0: + log.info("Starting with empty blocking_status") + blocking_status = create_empty_status_df() + return blocking_status -# FIXME: for performance reasons we want to minimize heavy DB queries. -# Means are cached in a pickle file to allow restarts and we pick up where -# we stopped based on measurement_start_time. Yet the timestamp are untrusted -# as they are generated by the probes. +def reprocess_data_from_df(idf, debug=False): + """Reprocess data using Pandas. Used for testing.""" + assert len(idf.index.names) == 5 + assert "Unnamed: 0" not in sorted(idf.columns), sorted(idf.columns) + timeslots = idf.reset_index().t.unique() -def load_country_name_map(): - """Load country-list.json and create a lookup dictionary - """ - fi = f"{PKGDIR}/detector/data/country-list.json" - log.info("Loading %s", fi) - with open(fi) as f: - clist = ujson.load(f) + def gen(): + for tslot in timeslots: + new = idf[idf.index.get_level_values(0) == tslot] + assert len(new.index.names) == 5, new.index.names + assert "Unnamed: 0" not in sorted(new.columns), sorted(new.columns) + yield new - # The file is deployed with the detector: crash out if it's broken - d = {} - for c in clist: - cc = c["iso3166_alpha2"] - name = c["name"] - assert cc and (len(cc) == 2) and name - d[cc.upper()] = name + events, status, status_history = reprocess_inner(gen(), len(timeslots), True) + assert status.index.names == TCAI + return events, status, status_history + + +def process(start, end, interval, services) -> None: + events, status = process_fresh_data(start, end, interval, services) + log.info(f"Events: {len(events)}") + if events is not None and len(events): + log.info("Rebuilding feeds") + rebuild_feeds(events) + # TODO: create an index of available RSS feeds - log.info("Loaded %d country names", len(d)) - return d +def reprocess(conf, services) -> None: + click.execute("TRUNCATE TABLE blocking_status SYNC") + click.execute("TRUNCATE TABLE blocking_events SYNC") -# TODO: handle input = None both in terms of filename collision and RSS feed -# and add functional tests + t = conf.start_date + while t < conf.end_date: + te = t + conf.interval + process(t, te, conf.interval, services) + t += conf.interval def main(): + global click setup() log.info("Starting") + cc_to_country_name = load_country_name_map(conf.devel) + click = Clickhouse.from_url(conf.db_uri) + if "use_numpy" not in conf.db_uri: + log.error("Add use_numpy to db_uri") + return - global cc_to_country_name - cc_to_country_name = load_country_name_map() - - ro_conn = connect_to_db(conf.ro_db_host, RO_DB_USER, DB_NAME, RO_DB_PASSWORD) - - if conf.webapp: - import detector.detector_webapp as wa - - wa.db_conn = ro_conn - wa.asn_db = load_asn_db() - log.info("Starting webapp") - wa.bottle.TEMPLATE_PATH.insert(0, f"{PKGDIR}/detector/views") - wa.bottle.run(port=8880, debug=conf.devel) - log.info("Exiting webapp") + # create_tables() + # TODO: configure services + services = { + "Facebook": ["https://www.facebook.com/"], + "Twitter": ["https://twitter.com/"], + "YouTube": ["https://www.youtube.com/"], + "Instagram": ["https://www.instagram.com/"], + } + if conf.reprocess: + # Destructing reprocess + assert conf.start_date and conf.end_date, "Dates not set" + reprocess(conf, services) return + # assert conf.start_date and conf.end_date, "Dates not set" + # events, status, _ = process_historical_data( + # conf.start_date, conf.end_date, conf.interval, services + # ) + # s = status.reset_index() + # log.info((s.accessible_perc, s.cnt, s.status)) - means, latest_mean = load_means() - log.info("Latest mean: %s", latest_mean) - - # Register exit handlers - def save_means_on_exit(*a): - log.info("Received SIGINT / SIGTERM") - save_means(means, None) - log.info("Exiting") - sys.exit() - - signal.signal(signal.SIGINT, save_means_on_exit) - signal.signal(signal.SIGTERM, save_means_on_exit) - - rw_conn = connect_to_db(conf.db_host, DB_USER, DB_NAME, DB_PASSWORD) - - td = timedelta(weeks=6) - start_date = latest_mean - td if latest_mean else DEFAULT_STARTTIME - process_historical_data(ro_conn, rw_conn, start_date, means) - save_means(means, None) - - log.info("Starting real-time processing") - with rw_conn.cursor() as cur: - cur.execute("LISTEN fastpath;") - - while True: - if select.select([rw_conn], [], [], 60) == ([], [], []): - continue # timeout - - rw_conn.poll() - while rw_conn.notifies: - msg = rw_conn.notifies.pop(0) - try: - handle_new_msg(msg, means, rw_conn) - except Exception as e: - log.exception(e) + else: + # Process fresh data + if conf.end_date is None: + # Beginning of current UTC hour + conf.end_date = datetime(*datetime.utcnow().timetuple()[:4]) + conf.start_date = conf.end_date - conf.interval + process(conf.start_date, conf.end_date, conf.interval, services) + + gen_stats() + log.info("Done") if __name__ == "__main__": diff --git a/detector/detector/detector_webapp.py b/detector/detector/detector_webapp.py deleted file mode 100644 index 23ab5f78..00000000 --- a/detector/detector/detector_webapp.py +++ /dev/null @@ -1,170 +0,0 @@ -""" -Detector web application - -Currently used only for tuning the detector, it runs event detection for one -cc / test_name / input independently from the event detector daemon. -The output are only displayed in charts and not used to generate RSS feeds -or other. - -""" - -# TODO: cleanup - -from datetime import datetime, timedelta -import logging -import json - -from bottle import request -import bottle - -from detector.detector import ( - detect_blocking_changes_asn_one_stream, -) - -from detector.metrics import setup_metrics - -log = logging.getLogger("detector") -metrics = setup_metrics(name="detector") - -db_conn = None # Set by detector.py or during functional testing - -asn_db = None # Set by detector.py - -def _datetime_handler(x): - if isinstance(x, datetime): - return x.isoformat() - raise TypeError("unknown type") - - -bottle.install( - bottle.JSONPlugin(json_dumps=lambda o: json.dumps(o, default=_datetime_handler)) -) - - -def generate_chart(start_d, end_d, msmts, changes, title): - """Render measurements and changes into a SVG chart - :returns: dict - """ - assert isinstance(msmts[0][0], datetime) - x1 = 100 - x2 = 1100 - y1 = 50 - y2 = 300 - # scale x - delta = (end_d - start_d).total_seconds() - assert delta != 0 - x_scale = (x2 - x1) / delta - - return dict( - msmts=msmts, - changes=changes, - x_scale=x_scale, - start_d=start_d, - end_d=end_d, - x1=x1, - x2=x2, - y1=y1, - y2=y2, - title=title, - ) - - -@bottle.route("/") -@bottle.view("form") -def index(): - log.debug("Serving index") - return {} - - -def plot_series(conn, ccs, test_names, inputs, start_date, split_asn): - """Generates time-series for CC / test_name / input - to be rendered as SVG charts - :returns: list of charts - """ - log.error(repr(split_asn)) - charts = [] - for cc in ccs: - for test_name in test_names: - for inp in inputs: - log.info("Generating chart for %r %r %r", cc, test_name, inp) - # TODO: merge inputs here and in event detection? - - (msmts, changes, asn_breakdown) = detect_blocking_changes_asn_one_stream( - conn, cc, test_name, inp, start_date - ) - if len(msmts) < 2: - log.debug("Not enough data") - continue - - # Time range - assert isinstance(msmts[0][0], datetime) - start_d = min(e[0] for e in msmts) - end_d = max(e[0] for e in msmts) - delta = (end_d - start_d).total_seconds() - assert delta > 0 - log.debug(delta) - - title = f"{cc} {test_name} {inp} {start_d} - {end_d}" - country_chart = generate_chart(start_d, end_d, msmts, changes, title) - - charts.append(country_chart) - - if split_asn: - # Most popular ASNs - popular = sorted( - asn_breakdown, key=lambda asn: len(asn_breakdown[asn]["msmts"]), reverse=True - ) - popular = popular[:20] - for asn in popular: - title = "AS{} {}".format(asn, asn_db.get(asn, "")) - a = asn_breakdown[asn] - try: - c = generate_chart(start_d, end_d, a["msmts"], a["changes"], title) - charts.append(c) - except: - log.error(a) - - return charts - - - -@bottle.route("/chart") -@bottle.view("page") -@metrics.timer("generate_charts") -def genchart(): - params = ("ccs", "test_names", "inputs", "start_date", "split_asn") - q = {k: (request.query.get(k, "").strip() or None) for k in params} - assert q["ccs"], "missing ccs query param" - - ccs = [i.strip() for i in q["ccs"].split(",") if i.strip()] - for cc in ccs: - assert len(cc) == 2, "CC must be 2 letters" - - test_names = q["test_names"].split(",") or ["web_connectivity",] - inputs = q["inputs"] - assert inputs, "Inputs are required" - inputs = [i.strip() for i in inputs.split(",") if i.strip()] - split_asn = q["split_asn"] is not None - start_date = q["start_date"] - if start_date: - start_date = datetime.strptime(start_date, "%Y-%m-%d") - else: - start_date = datetime.now() - timedelta(days=10) - - log.debug("Serving query %s %s %s %s", ccs, test_names, inputs, start_date) - - charts = plot_series(db_conn, ccs, test_names, inputs, start_date, split_asn) - form = dict( - inputs=",".join(inputs), - test_names=",".join(test_names), - ccs=",".join(ccs), - start_date=start_date.strftime("%Y-%m-%d"), - split_asn=split_asn, - ) - return dict(charts=charts, title="Detector", form=form) - - -@bottle.error(500) -def error_handler_500(error): - log.error(error.exception) - return repr(error.exception) diff --git a/detector/detector/tests/test_functional_pandas.py b/detector/detector/tests/test_functional_pandas.py new file mode 100644 index 00000000..6e932b72 --- /dev/null +++ b/detector/detector/tests/test_functional_pandas.py @@ -0,0 +1,378 @@ +""" +Event detector functional tests + +Run as: +pytest tests/test_functional_pandas.py -s + +""" + +from argparse import Namespace +from pathlib import Path + +from clickhouse_driver import Client as Clickhouse +from numpy import nan +import altair as alt +import pandas as pd +import pytest + +try: + from tqdm import tqdm +except ImportError: + + def tqdm(x): + return x + + +import detector.detector as dt +from detector.detector import TCAI + + +@pytest.fixture(scope="session", autouse=True) +def setup(): + dt.conf = Namespace() + dt.conf.reprocess = False + dt.conf.devel = True + DBURI = "clickhouse://localhost/default" + dt.click = Clickhouse.from_url(DBURI) + dt.conf.rssdir = Path("tests/out_rss") + dt.setup_dirs(Path("devel")) + + +S3BASEURL = "https://ooni-data-eu-fra-test.s3.eu-central-1.amazonaws.com/ci/" + +# # utils and fixtures # # + +# input dataframe # + + +def load_cached_input_df(fn): + """Creates the input dataframe from CSV from S3. + The CSV was generated with a query on the fastpath. + Use locally cached file if found. + """ + try: + return pd.read_csv(fn) + except IOError: + print("Dloading ", S3BASEURL + fn) + df = pd.read_csv(S3BASEURL + fn) + print("Saving dowloaded file to ", fn) + df.to_csv(fn) + return df + + +def load_idf_1(): + fn = "event_detector_20221013_20221213_input.csv.gz" + idf = load_cached_input_df(fn) + if "Unnamed: 0" in idf.columns: + idf.drop(["Unnamed: 0"], axis=1, inplace=True) + + assert sorted(idf.columns) == [ + "accessible_perc", + "cnt", + "confirmed_perc", + "input", + "probe_asn", + "probe_cc", + "pure_anomaly_perc", + "t", + "test_name", + ] + # granularity: 10 minutes + assert idf.t.nunique() == 24 * 6 * 61 # every 10m, 61 days + assert idf.shape == (274524, 9) + assert idf.t.min() == "2022-10-13 00:00:00" + assert idf.t.max() == "2022-12-12 23:50:00" + x = idf.groupby(["input"]).t.nunique() + assert x["https://twitter.com/"] == 24 * 6 * 61 + assert x["https://www.whatsapp.com/"] == 5697 + return idf + + +@pytest.fixture(scope="session") +def idf_1(): + return load_idf_1() + + +def filter_idf(idf, days=1, skipdays=0, probe_cc=None, probe_asn=None, inp=""): + # Filter input dataframe by time, cc, asn, input + timeslots = idf.t.unique() + start = 6 * 24 * skipdays + end = 6 * 24 * (skipdays + days) + timeslots = timeslots[start:end] + # The data processing assumes that we do a run for each timeslot without + # skipping any interval. + assert timeslots[0].endswith("00:00:00"), "Missing time slots" + # We should reach the last timeslot of the day: + assert timeslots[-1].endswith("23:50:00"), "Missing time slots" + idf = idf[idf.t.isin(timeslots)] + idf = idf.set_index(["t", "test_name", "probe_cc", "probe_asn", "input"]) + if probe_cc is not None: + idf = idf[idf.index.get_level_values(2) == probe_cc] + + if isinstance(probe_asn, int): + idf = idf[idf.index.get_level_values(3) == probe_asn] + elif isinstance(probe_asn, list): + idf = idf[idf.index.get_level_values(3).isin(probe_asn)] + else: + assert 0, type(probe_asn) + + if inp: + idf = idf[idf.index.get_level_values(4) == inp] + + return idf + + +# end of input dataframe # + + +def plot(h, tcai, exp): + """Generate status_history plot into an HTML file""" + tn, cc, asn, inp = tcai + scale = alt.Scale( + domain=["BLOCKED", "OK", "unknown"], + range=["red", "green", "blue"], + ) + col = alt.Color("status", scale=scale) + h = h.reset_index() + h = h.loc[(h.probe_cc == cc) & (h.probe_asn == asn) & (h.input == inp)] + assert len(h.groupby(TCAI).count()) == 1 + + # Add exp col + h["exp"] = "na" + for tstamp, st in exp: + if tstamp == "last": + # hh.iloc[-1].status == st + continue + + if " " not in tstamp: + tstamp += " 00:00:00" + # h.loc[h.t == tstamp, "exp"] = h[h.t == tstamp].iloc[0].status == st + h.loc[h.t == tstamp, "exp"] = st + + c = alt.Chart(h).properties(width=1000, height=60) + x = alt.X("t:T", axis=alt.Axis(labels=True)) + cnt = c.mark_line().encode(x=x, y="cnt", color="status") + ap = c.mark_line().encode(x=x, y="accessible_perc", color="status") + stability = c.mark_circle().encode( + x=x, y=alt.Y("stability", scale=alt.Scale(zero=False)), color=col + ) + rap = c.mark_circle().encode(x=x, y="input_ap") + rcnt = c.mark_circle(opacity=0.4).encode(x=x, y="input_cnt", color=col) + # Plot expected values as big dots + expchart = c.mark_circle(size=200).encode( + x=x, + y="exp", + color=col, + opacity=alt.condition("datum.exp == 'na'", alt.value(0), alt.value(1)), + ) + stack = rcnt & rap & cnt & ap & stability & expchart + stack.save(f"history_{asn}_{cc}.html") + + +def check(status_history, tcai, exp): + """Validate detection but also generate plots""" + h = filter_history(status_history, tcai) + plot(h, tcai, exp) + for tstamp, st in exp: + expect_status(h, tstamp, st) + + +def expect_status(h, tstamp, st): + if tstamp == "last": + assert h.iloc[-1].status == st + return + + if " " not in tstamp: + tstamp += " 00:00:00" + assert h[h.t == tstamp].iloc[0].status == st + + +def filter_history(status_history, tcai): + h = status_history.reset_index() + tn, cc, asn, inp = tcai + if asn: + h = h[h.probe_asn == asn] + if cc: + h = h[h.probe_cc == cc] + if inp: + h = h[h.input == inp] + return h + + +@pytest.fixture(scope="session") +def ru_twitter(): + # Process data for Twitter in RU for a set of ASNs + cc = "RU" + inp = "https://twitter.com/" + days = 90 + inp2 = inp.replace("/", "_").replace(":", "_") + cache_fn = f"_cache_processed_{cc}_{inp2}_{days}.csv.gz" + kinds = ("events", "status", "history") + try: + # Use cached data + out = tuple(pd.read_csv(k + cache_fn) for k in kinds) + return out + except FileNotFoundError: + print("Cached ru_twitter output not found, processing now") + + idf = load_idf_1() + asns = [8331, 12668, 13335, 44020, 44493, 48642, 49392, 51659] + idf = filter_idf(idf, days=days, probe_cc=cc, inp=inp, probe_asn=asns) + # idf is indexed on t+TCAI + assert len(idf.index.names) == 5 + # Reprocess now and save to cache + out = dt.reprocess_data_from_df(idf, debug=True) + for item, k in zip(out, kinds): + print("Writing out ", k, cache_fn) + if item is None and k == "events": + # fixme columns + item = pd.DataFrame(["probe_asn", "probe_cc", "time", "input", "test_name"]) + item.to_csv(k + cache_fn) + + return out + + +# # tests # # + + +def NO_test_1(idf_1): + idf = idf_1 + events, status, status_history = dt.reprocess_data( + idf, + days=10, + probe_asn=12389, + inp="https://twitter.com/", + debug=True, + ) + status_history.to_csv("status_history_1.csv.zstd") + assert status_history.shape[0] == 142 + assert status.to_dict() == { + "status": {0: nan}, + "old_status": {0: nan}, + "change": {0: nan}, + "stability": {0: 0.950923061161454}, + "test_name": {0: "web_connectivity"}, + "probe_cc": {0: "RU"}, + "probe_asn": {0: 12389}, + "input": {0: "https://twitter.com/"}, + "accessible_perc": {0: 0.0}, + "cnt": {0: 0.5823395977809029}, + "confirmed_perc": {0: 0.0}, + "pure_anomaly_perc": {0: 100.0}, + } + assert events.to_dict() == { + 0: { + 0: "test_name", + 1: "input", + 2: "probe_cc", + 3: "probe_asn", + 4: "status", + 5: "time", + } + } + + +def NO_test_2(idf_1): + idf = idf_1 + events, status, status_history = dt.reprocess_data( + idf, + days=10, + inp="https://twitter.com/", + debug=True, + ) + status_history.to_csv("status_history_2.csv.zstd") + assert status_history.shape[0] == 9336 + assert status.shape[0] == 88 + assert events.shape[0] == 0 + + +# Test specific ASNs (in numerical order) + + +def test_ru_asn_8331(ru_twitter): + events, status, status_history = ru_twitter + assert len(status) > 10, status + tcai = ("web_connectivity", "RU", 8331, "https://twitter.com/") + exp = [("2022-11-02", "BLOCKED"), ("last", "BLOCKED")] + check(status_history, tcai, exp) + + +def test_ru_asn_12668(ru_twitter): + events, status, status_history = ru_twitter + assert len(status) > 10, status + tcai = ("web_connectivity", "RU", 12668, "https://twitter.com/") + exp = [ + ("2022-10-20", "UNKNOWN"), + ("2022-10-23", "BLOCKED"), + ("2022-10-25", "BLOCKED"), + ("2022-11-06", "BLOCKED"), + ("last", "BLOCKED"), + ] + check(status_history, tcai, exp) + h = status_history.reset_index() + h = h[h.probe_asn == tcai[2]] + + +def test_ru_asn_13335(ru_twitter): + events, status, status_history = ru_twitter + assert len(status) > 10, status + tcai = ("web_connectivity", "RU", 13335, "https://twitter.com/") + exp = [("2022-10-20", "OK"), ("2022-10-23", "OK"), ("last", "OK")] + check(status_history, tcai, exp) + + +def test_ru_asn_44020(ru_twitter): + events, status, status_history = ru_twitter + assert len(status) > 10, status + tcai = ("web_connectivity", "RU", 44020, "https://twitter.com/") + exp = [("2022-10-20", "OK"), ("2022-10-23", "OK"), ("last", "BLOCKED")] + check(status_history, tcai, exp) + + +def test_ru_asn_44493(ru_twitter): + events, status, status_history = ru_twitter + assert len(status) > 10, status + tcai = ("web_connectivity", "RU", 44493, "https://twitter.com/") + exp = [("2022-10-20", "OK"), ("2022-10-23", "OK")] + check(status_history, tcai, exp) + # ("last", "OK") + + +def test_ru_asn_48642(ru_twitter): + events, status, status_history = ru_twitter + assert len(status) > 10, status + tcai = ("web_connectivity", "RU", 48642, "https://twitter.com/") + exp = [("2022-10-20", "OK"), ("2022-10-28", "BLOCKED"), ("last", "BLOCKED")] + check(status_history, tcai, exp) + + +def test_ru_asn_49392(ru_twitter): + events, status, status_history = ru_twitter + assert len(status) > 10, status + tcai = ("web_connectivity", "RU", 49392, "https://twitter.com/") + exp = [("2022-10-30", "OK"), ("2022-11-20", "BLOCKED"), ("last", "BLOCKED")] + check(status_history, tcai, exp) + + +def test_ru_asn_51659(ru_twitter): + events, status, status_history = ru_twitter + assert len(status) > 10, status + tcai = ("web_connectivity", "RU", 51659, "https://twitter.com/") + exp = [] + check(status_history, tcai, exp) + + +def test_summarize_changes(ru_twitter): + events, status, status_history = ru_twitter + assert events.shape == (7, 6) + print(events) + + # status_history = status_history[status_history.probe_asn == 51659] + # print("") + # print(status_history.shape) + # assert 0 + # assert status_history.shape == (1259383, 16) + # s = status_history.groupby(TCAI).count() + # assert s.shape == (281, 12) + # print(s) + # assert 0 diff --git a/detector/detector/tests/test_functional_pandas_feeds.py b/detector/detector/tests/test_functional_pandas_feeds.py new file mode 100644 index 00000000..3bc5d5a2 --- /dev/null +++ b/detector/detector/tests/test_functional_pandas_feeds.py @@ -0,0 +1,95 @@ +""" +Event detector functional tests: feed generation + +Run as: +pytest tests/test_functional_pandas.py -s + +""" +from argparse import Namespace +from datetime import datetime +from pathlib import Path + +import pandas as pd +import pytest +from clickhouse_driver import Client as Clickhouse + +import detector.detector as dt + + +@pytest.fixture(scope="session", autouse=True) +def setup(): + dt.conf = Namespace() + dt.conf.reprocess = False + dt.conf.devel = True + DBURI = "clickhouse://localhost/default" + dt.click = Clickhouse.from_url(DBURI) + dt.conf.rssdir = Path("tests/out_rss") + dt.setup_dirs(Path("devel")) + + +def test_generate_rss_feeds2(setup): + be = pd.read_csv("tests/data/be.csv") # blocking events + be.drop(["Unnamed: 0"], axis=1, inplace=True) + be.time = pd.to_datetime(be.time) + be2 = be[ + (be.probe_cc == "US") + & (be.test_name == "web_connectivity") + & (be.probe_asn == 11427) + & (be.input == "https://twitter.com/") + ] + assert len(be2) == 2 + assert sorted(be2.columns) == [ + "input", + "probe_asn", + "probe_cc", + "status", + "test_name", + "time", + ] + s, p = dt.generate_rss_feed(be2, datetime(2023, 5, 22)) + assert p == Path("devel/var/lib/detector/output/rss/web_connectivity-US-AS11427-https_twitter.com_.xml") + s = s.replace("\n", "") + exp = """ + + + + OONI events + https://explorer.ooni.org + Blocked services and websites detected by OONI + en + Mon, 22 May 2023 00:00:00 -0000 + + https://twitter.com/ unblocked in US AS11427 + https://explorer.ooni.org/chart/mat?test_name=web_connectivity&axis_x=measurement_start_day&since=2022-12-03+14%3A20%3A00&until=2022-12-17+14%3A20%3A00&probe_asn=AS11427&probe_cc=US&input=https%3A%2F%2Ftwitter.com%2F + Change detected on 2022-12-10 14:20:00 + Sat, 10 Dec 2022 14:20:00 -0000 + + + https://twitter.com/ blocked in US AS11427 + https://explorer.ooni.org/chart/mat?test_name=web_connectivity&axis_x=measurement_start_day&since=2022-12-12+16%3A50%3A00&until=2022-12-26+16%3A50%3A00&probe_asn=AS11427&probe_cc=US&input=https%3A%2F%2Ftwitter.com%2F + Change detected on 2022-12-19 16:50:00 + Mon, 19 Dec 2022 16:50:00 -0000 + + + +""" + exp = "".join(line.strip() for line in exp.splitlines()) + assert s == exp + + +def test_rebuild_feeds(setup): + d = { + "test_name": "web_connectivity", + "input": "https://twitter.com/", + "probe_cc": "IT", + "probe_asn": "123", + "status": "BLOCKED", + "time": "", + } + d = {k: [v] for k, v in d.items()} + events = pd.DataFrame.from_dict(d) + dt.conf.reprocess = False + cnt = dt.rebuild_feeds(events) + assert cnt == 0 + + diff --git a/detector/detector/tests/test_unit.py b/detector/detector/tests/test_unit.py index 9c8b3f0c..9e8a01e4 100644 --- a/detector/detector/tests/test_unit.py +++ b/detector/detector/tests/test_unit.py @@ -1,221 +1,60 @@ # -# Fastpath - event detector unit tests +# Event detector unit tests # from datetime import datetime from pathlib import Path -import json - - -import bottle -import pytest +import re import detector.detector as dt -import detector.detector_webapp as webapp -bottle.TEMPLATE_PATH.insert(0, "detector/views") data = Path("detector/tests/data") -def datadir(p): - return data / p - - -def save(o, p): - data.joinpath(p).write_text(json.dumps(o, sort_keys=True, default=lambda x: str(x))) - - -def load(p): - with data.joinpath(p).open() as f: - return json.load(f) - - -def jd(o): - return json.dumps(o, indent=2, sort_keys=True, default=lambda x: str(x)) - - -def trim(chart): - [chart.pop(k) for k in list(chart) if k not in ("msmts", "changes")] - - -@pytest.fixture(scope="session") -def dbconn(): - # Used manually to access the database to gather and save test data - # when writing new tests. Not used during unit testing. - conn = dt.connect_to_db("127.0.0.1", "readonly", dt.DB_NAME, "") - webapp.db_conn = conn - return conn - - -def notest_chart_ww_BR_1(dbconn): - cc = "BR" - inp = "https://www.womenonwaves.org/" - start_date = datetime(2018, 11, 1) - test_name = "web_connectivity" - chart = webapp.plot_series(webapp.db_conn, cc, test_name, inp, start_date) - [chart.pop(k) for k in chart if k not in ("events", "changes")] - expected = datadir("detector_chart_1.svg").read_text() - assert chart == expected - - -def dbgenerator(fname): - ## mock DB query - for row in load(fname): - st = row["measurement_start_time"] - if isinstance(st, int): - row["measurement_start_time"] = datetime.utcfromtimestamp(st) - else: - row["measurement_start_time"] = datetime.strptime(st, "%Y-%m-%d %H:%M:%S") - - yield row - - -def urgh(o): - # FIXME: temporary hack to transform datetimes and truncate float numbers - # to allow comparison - return json.loads(jd(o)) - - -def notest_chart_ww_BR_2(): - cc = "BR" - inp = "https://www.womenonwaves.org/" - start_date = datetime(2019, 1, 1) - test_name = "web_connectivity" - - g = dbgenerator("detector_query_ww_BR_2.json") - - (msmts, changes) = dt.detect_blocking_changes_1s_g( - g, cc, test_name, inp, start_date - ) - # save(dict(msmts=msmts, changes=changes) , "detector_ww_BR_2_output.json") - - expected = load("detector_ww_BR_2_output.json") - assert len(msmts) == 809 - assert urgh(msmts)[0] == expected["msmts"][0] - assert urgh(msmts) == expected["msmts"] - assert urgh(changes) == expected["changes"] - - -def test_chart_ww_BR_2019_generate_chart(): - cc = "BR" - inp = "https://www.womenonwaves.org/" - start_date = datetime(2019, 1, 1) - test_name = "web_connectivity" - - g = dbgenerator("detector_query_ww_BR_2.json") - - (msmts, changes) = dt.detect_blocking_changes_1s_g( - g, cc, test_name, inp, start_date - ) - assert isinstance(msmts[0][0], datetime) - # save(dict(msmts=msmts, changes=changes) , "detector_ww_BR_2019_output.json") - expected = load("detector_ww_BR_2019_output.json") - assert len(msmts) == 809 - u = urgh(msmts) - for i in range(len(msmts)): - assert u[i] == expected["msmts"][i] - - assert len(changes) == len(expected["changes"]) - for e, c in zip(expected["changes"], changes): - ts, m, oldcode = e - assert ts == str(c.measurement_start_time) - assert m == c.mean - - with Path("detector/views/chart_alone.tpl").open() as f: - tpl = webapp.bottle.SimpleTemplate(f.read()) - - cd = webapp.generate_chart(msmts, changes, cc, test_name, inp) - chart = tpl.render(**cd) - assert chart - data.joinpath("output/chart_ww_BR_2019.html").write_text(chart) - - -def test_chart_ww_BR_2018_generate_chart(): - cc = "BR" - inp = "https://www.womenonwaves.org/" - start_date = datetime(2018, 11, 1) - test_name = "web_connectivity" - - # g = dt.fetch_past_data_selective(dbconn, start_date, cc, test_name, inp) - # save(list(g), "detector_query_ww_BR_2018.json") - g = dbgenerator("detector_query_ww_BR_2018.json") - - (msmts, changes) = dt.detect_blocking_changes_1s_g( - g, cc, test_name, inp, start_date - ) - assert isinstance(msmts[0][0], datetime) - # save(dict(msmts=msmts, changes=changes) , "detector_ww_BR_2018_output.json") - expected = load("detector_ww_BR_2018_output.json") - assert len(msmts) == 911 - u = urgh(msmts) - for i in range(len(msmts)): - assert u[i] == expected["msmts"][i] - - assert len(changes) == len(expected["changes"]) - for e, c in zip(expected["changes"], changes): - ts, m, oldcode = e - assert ts == str(c.measurement_start_time) - assert m == c.mean - - with Path("detector/views/chart_alone.tpl").open() as f: - tpl = webapp.bottle.SimpleTemplate(f.read()) - - cd = webapp.generate_chart(msmts, changes, cc, test_name, inp) - chart = tpl.render(**cd) - assert chart - data.joinpath("output/chart_ww_BR_2018.html").write_text(chart) - - -def bench_detect_blocking_changes_1s_g(g): - cc = "BR" - inp = "foo" - start_date = datetime(2019, 1, 1) - test_name = "web_connectivity" - return dt.detect_blocking_changes_1s_g( - g, cc, test_name, inp, start_date - ) - - -def test_bench_detect_blocking_changes(benchmark): - # debdeps: python3-pytest-benchmark - g = [] - for x in range(1020): - v = { - "anomaly": None if (int(x / 100) % 2 == 0) else True, - "confirmed": None, - "input": "foo", - "measurement_start_time": datetime.utcfromtimestamp(x + 1234567890), - "probe_cc": "BR", - "scores": "", - "test_name": "web_connectivity", - "tid": "", - } - g.append(v) - (msmts, changes) = benchmark(bench_detect_blocking_changes_1s_g, g) - - with Path("detector/views/chart_alone.tpl").open() as f: - tpl = webapp.bottle.SimpleTemplate(f.read()) - - cc = "BR" - inp = "foo" - test_name = "web_connectivity" - cd = webapp.generate_chart(msmts, changes, cc, test_name, inp) - chart = tpl.render(**cd) - assert chart - data.joinpath("output/chart_ww_BR_bench.html").write_text(chart) - last_mean = msmts[-1][-1] - assert pytest.approx(0.415287511652) == last_mean - - -def notest_chart_ww_BR(): - cc = "BR" - inp = "https://www.womenonwaves.org/" - start_date = datetime(2018, 11, 1) - test_name = "web_connectivity" - chart1 = webapp.plot_series(webapp.db_conn, cc, test_name, inp, start_date) - start_date = datetime(2019, 1, 1) - chart2 = webapp.plot_series(webapp.db_conn, cc, test_name, inp, start_date) - chart = chart1 + chart2 - # data.joinpath("detector_chart_ww_BR.svg").write_text(chart) - # expected = data.joinpath("detector_chart_2.svg").read_text() - # assert chart == expected +def test_generate_rss_feed_empty(): + feed = dt.generate_rss_feed([]) + feed = re.sub(r".*", "", feed) + exp = """ + + +OONI events +https://explorer.ooni.org +Blocked services and websites detected by OONI +en + + +""" + assert feed.replace("\n", "") == exp.replace("\n", "") + + +def test_generate_rss_feed_one(): + # test_name, inp, probe_cc, probe_asn, status, time + e = [ + ( + "web_connectivity", + "https://twitter.com/", + "IE", + "1234", + "OK", + datetime(2022, 1, 2), + ) + ] + feed = dt.generate_rss_feed(e) + feed = re.sub(r".*", "", feed) + exp = """ + + +OONI events +https://explorer.ooni.org +Blocked services and websites detected by OONI +en + + +https://twitter.com/ unblocked in IE AS1234 +https://explorer.ooni.org/chart/mat?test_name=web_connectivity&axis_x=measurement_start_day&since=2021-12-26+00%3A00%3A00&until=2022-01-09+00%3A00%3A00&probe_asn=AS1234&probe_cc=IE&input=https%3A%2F%2Ftwitter.com%2F +Change detected on 2022-01-02 00:00:00 +Sun, 02 Jan 2022 00:00:00 -0000 + + +""" + assert feed.replace("\n", "") == exp.replace("\n", "") diff --git a/detector/detector/views/chart.tpl b/detector/detector/views/chart.tpl deleted file mode 100644 index 33ac494f..00000000 --- a/detector/detector/views/chart.tpl +++ /dev/null @@ -1,60 +0,0 @@ - - - {{title}} - - % pcx = pcy = None - % for d, val, mean in msmts: - % cx = (d - start_d).total_seconds() * x_scale + x1 - % cy = y2 - min(max(val, 0) * 200, 300) - % #r = "{:02x}".format(min(int(max(val, 0) * 170), 255)) - % col = "d60000" if val > .5 else "00d600" - - - % # moving average - % cy = y2 - min(max(mean, 0) * 200, 300) - % if pcy is not None: - - % end - % pcx, pcy = cx, cy - % end - - % # changes in blocking - % for c in changes: - % cx = (c.measurement_start_time - start_d).total_seconds() * x_scale + x1 - % col = "ff3333" if c.blocked else "33ff33" - - % end - - - % # start/end date labels - {{start_d}} - {{end_d}} - - - - - % for val in (0.0, 1.0, 2.0): - % cy = y2 - min(max(val, 0) * 100, (y2 - y1)) - - % end - diff --git a/detector/detector/views/chart_alone.tpl b/detector/detector/views/chart_alone.tpl deleted file mode 100644 index aed6bcad..00000000 --- a/detector/detector/views/chart_alone.tpl +++ /dev/null @@ -1,58 +0,0 @@ - - - {{cc}} {{test_name}} {{inp}} - - % pcx = pcy = None - % for d, val, mean in msmts: - % cx = (d - start_d).total_seconds() * x_scale + x1 - % cy = y2 - min(max(val, 0) * 100, 300) - % r = "{:02x}".format(min(int(max(val, 0) * 170), 255)) - - - % # moving average - % cy = y2 - min(max(mean, 0) * 100, 300) - % if pcy is not None: - - % end - % pcx, pcy = cx, cy - % end - - % # changes in blocking - % for c in changes: - % cx = (c.measurement_start_time - start_d).total_seconds() * x_scale + x1 - % col = "ff3333" if c.blocked else "33ff33" - - % end - - - {{start_d}} - {{d}} - - - - - % for val in (0.0, 1.0, 2.0): - % cy = y2 - min(max(val, 0) * 100, (y2 - y1)) - - % end - diff --git a/detector/detector/views/form.tpl b/detector/detector/views/form.tpl deleted file mode 100644 index 0e613d16..00000000 --- a/detector/detector/views/form.tpl +++ /dev/null @@ -1,27 +0,0 @@ - - - -
-
- - -
-
- - -
-
- - -
-
- - -
- -
- diff --git a/detector/detector/views/page.tpl b/detector/detector/views/page.tpl deleted file mode 100644 index ac32ea4b..00000000 --- a/detector/detector/views/page.tpl +++ /dev/null @@ -1,136 +0,0 @@ - - - - {{title}} - - - -
- - - - - - - - - - - - - - - -
- % for c in charts: - % msmts = c["msmts"] - % changes = c["changes"] - % x_scale = c["x_scale"] - % start_d = c["start_d"] - % end_d = c["end_d"] - % x1 = c["x1"] - % x2 = c["x2"] - % y1 = c["y1"] - % y2 = c["y2"] - % title = c["title"] - - - {{title}} - - % pcx = pcy = None - % for d, val, mean in msmts: - % cx = (d - start_d).total_seconds() * x_scale + x1 - % cy = y2 - min(max(val, 0) * 200, 300) - % #r = "{:02x}".format(min(int(max(val, 0) * 170), 255)) - % col = "d60000" if val > .5 else "00d600" - - - % # moving average - % cy = y2 - min(max(mean, 0) * 200, 300) - % if pcy is not None: - - % end - % pcx, pcy = cx, cy - % end - - % # changes in blocking - % for c in changes: - % cx = (c.measurement_start_time - start_d).total_seconds() * x_scale + x1 - % col = "ff3333" if c.blocked else "33ff33" - - % end - - - % # start/end date labels - {{start_d}} - {{end_d}} - - - - - % for val in (0.0, 1.0, 2.0): - % cy = y2 - min(max(val, 0) * 100, (y2 - y1)) - - % end - - % end - - diff --git a/detector/docker-compose.yml b/detector/docker-compose.yml new file mode 100644 index 00000000..0090903a --- /dev/null +++ b/detector/docker-compose.yml @@ -0,0 +1,13 @@ +# Usage: docker-compose run --rm detector ./detector/detector.py --devel + +version: '3.9' + +services: + detector: + restart: always + build: + context: . + command: detector + volumes: + - ./:/opt/detector + - ../:/repo diff --git a/detector/setup.py b/detector/setup.py index 3572e5bf..b99eb7d8 100644 --- a/detector/setup.py +++ b/detector/setup.py @@ -12,9 +12,13 @@ name=NAME, python_requires=">=3.6.0", packages=["detector", "detector.tests"], - entry_points={"console_scripts": ["detector=detector.detector:main",]}, + entry_points={ + "console_scripts": [ + "detector=detector.detector:main", + ] + }, install_requires=REQUIRED, include_package_data=True, zip_safe=False, - package_data={"detector": ["views/*.tpl", "static/*", "data/country-list.json"]}, + package_data={"detector": ["data/country-list.json"]}, )