diff --git a/ooni/agent/scheduler.py b/ooni/agent/scheduler.py index 3389db115..54551f616 100644 --- a/ooni/agent/scheduler.py +++ b/ooni/agent/scheduler.py @@ -1,3 +1,6 @@ +import os +import errno + from datetime import datetime from twisted.application import service @@ -8,6 +11,7 @@ from ooni.scripts import oonireport from ooni import resources from ooni.utils import log, SHORT_DATE +from ooni.utils.files import human_size_to_bytes, directory_usage from ooni.deck.store import input_store, deck_store from ooni.settings import config from ooni.contrib import croniter @@ -146,6 +150,66 @@ def task(self): measurement_path.child(measurement['id']).remove() +class CheckMeasurementQuota(ScheduledTask): + """ + This task is run to ensure we don't run out of disk space and deletes + older reports to avoid filling the quota. + """ + identifier = 'check-measurement-quota' + schedule = '@hourly' + _warn_when = 0.8 + + def task(self): + if config.basic.measurement_quota is None: + return + maximum_bytes = human_size_to_bytes(config.basic.measurement_quota) + available_bytes = directory_usage(config.measurements_directory) + warning_path = os.path.join(config.running_path, 'quota_warning') + + if (float(available_bytes) / float(maximum_bytes)) >= self._warn_when: + log.warn("You are about to reach the maximum allowed quota. Be careful") + with open(warning_path, "w") as out_file: + out_file.write("{0} {1}".split(available_bytes, maximum_bytes)) + else: + try: + os.remove(warning_path) + except OSError as ose: + if ose.errno != errno.ENOENT: + raise + + if float(available_bytes) < float(maximum_bytes): + # We are within the allow quota exit. + return + + # We should begin to delete old reports + amount_to_delete = float(maximum_bytes) - float(available_bytes) + amount_deleted = 0 + measurement_path = FilePath(config.measurements_directory) + + kept_measurements = [] + stale_measurements = [] + remaining_measurements = [] + measurements_by_date = sorted(list_measurements(compute_size=True), + key=lambda k: k['test_start_time']) + for measurement in measurements_by_date: + if measurement['keep'] is True: + kept_measurements.append(measurement) + elif measurement['stale'] is True: + stale_measurements.append(measurement) + else: + remaining_measurements.append(measurement) + + # This is the order in which we should begin deleting measurements. + ordered_measurements = (stale_measurements + + remaining_measurements + + kept_measurements) + while amount_deleted < amount_to_delete: + measurement = ordered_measurements.pop(0) + log.warn("Deleting report {0}".format(measurement["id"])) + measurement_path.child(measurement['id']).remove() + amount_deleted += measurement['size'] + + class RunDeck(ScheduledTask): """ This will run the decks that have been configured on the system as the @@ -196,6 +260,7 @@ def task(self): UpdateInputsAndResources ] + @defer.inlineCallbacks def run_system_tasks(no_input_store=False): task_classes = SYSTEM_TASKS[:] @@ -215,6 +280,7 @@ def run_system_tasks(no_input_store=False): log.err("Failed to run task {0}".format(task.identifier)) log.exception(exc) + class SchedulerService(service.MultiService): """ This service is responsible for running the periodic tasks. @@ -271,6 +337,7 @@ def startService(self): self.schedule(UpdateInputsAndResources()) self.schedule(UploadReports()) self.schedule(DeleteOldReports()) + self.schedule(CheckMeasurementQuota()) self.schedule(RefreshDeckList(self)) self._looping_call.start(self.interval) diff --git a/ooni/measurements.py b/ooni/measurements.py index 6d90c1b41..fd722eee4 100644 --- a/ooni/measurements.py +++ b/ooni/measurements.py @@ -4,6 +4,7 @@ from twisted.python.filepath import FilePath from ooni.utils import log +from ooni.utils.files import directory_usage from ooni.settings import config class MeasurementInProgress(Exception): @@ -61,7 +62,8 @@ def generate_summary(input_file, output_file): class MeasurementNotFound(Exception): pass -def get_measurement(measurement_id): +def get_measurement(measurement_id, compute_size=False): + size = -1 measurement_path = FilePath(config.measurements_directory) measurement = measurement_path.child(measurement_id) if not measurement.exists(): @@ -70,6 +72,7 @@ def get_measurement(measurement_id): running = False completed = True keep = False + stale = False if measurement.child("measurements.njson.progress").exists(): completed = False # XXX this is done quite often around the code, probably should @@ -80,10 +83,14 @@ def get_measurement(measurement_id): os.kill(pid, signal.SIG_DFL) running = True except OSError: - pass + stale = True if measurement.child("keep").exists(): keep = True + + if compute_size is True: + size = directory_usage(measurement.path) + test_start_time, country_code, asn, test_name = \ measurement_id.split("-")[:4] return { @@ -94,7 +101,9 @@ def get_measurement(measurement_id): "id": measurement_id, "completed": completed, "keep": keep, - "running": running + "running": running, + "stale": stale, + "size": size } @@ -115,12 +124,13 @@ def get_summary(measurement_id): with summary.open("r") as f: return json.load(f) -def list_measurements(): + +def list_measurements(compute_size=False): measurements = [] measurement_path = FilePath(config.measurements_directory) for measurement_id in measurement_path.listdir(): try: - measurements.append(get_measurement(measurement_id)) + measurements.append(get_measurement(measurement_id, compute_size)) except: log.err("Failed to get metadata for measurement {0}".format(measurement_id)) return measurements diff --git a/ooni/settings.py b/ooni/settings.py index 2bacd57d0..c36f3f777 100644 --- a/ooni/settings.py +++ b/ooni/settings.py @@ -20,6 +20,9 @@ # Where OONIProbe should be writing it's log file logfile: {logfile} loglevel: WARNING + # The maximum amount of data to store on disk. Once the quota is reached, + # we will start deleting older reports. + # measurement_quota: 1G privacy: # Should we include the IP address of the probe in the report? includeip: {include_ip} @@ -99,7 +102,8 @@ defaults = { "basic": { "loglevel": "WARNING", - "logfile": "ooniprobe.log" + "logfile": "ooniprobe.log", + "measurement_quota": "1G" }, "privacy": { "includeip": False, diff --git a/ooni/tests/test_utils.py b/ooni/tests/test_utils.py index ea1d858f8..4ef2926b2 100644 --- a/ooni/tests/test_utils.py +++ b/ooni/tests/test_utils.py @@ -1,7 +1,10 @@ import os +import tempfile + from twisted.trial import unittest from ooni.utils import log, generate_filename, net +from ooni.utils.files import human_size_to_bytes, directory_usage class TestUtils(unittest.TestCase): @@ -43,3 +46,34 @@ def test_generate_filename_with_extension_and_prefix(self): def test_get_addresses(self): addresses = net.getAddresses() assert isinstance(addresses, list) + + def test_human_size(self): + self.assertEqual( + human_size_to_bytes("1G"), + 1024**3 + ) + self.assertEqual( + human_size_to_bytes("1.3M"), + 1.3 * 1024**2 + ) + self.assertEqual( + human_size_to_bytes("1.2K"), + 1.2 * 1024 + ) + self.assertEqual( + human_size_to_bytes("1"), + 1.0 + ) + self.assertEqual( + human_size_to_bytes("100.2"), + 100.2 + ) + + def test_directory_usage(self): + tmp_dir = tempfile.mkdtemp() + with open(os.path.join(tmp_dir, "something.txt"), "w") as out_file: + out_file.write("A"*1000) + os.mkdir(os.path.join(tmp_dir, "subdir")) + with open(os.path.join(tmp_dir, "subdir", "something.txt"), "w") as out_file: + out_file.write("A"*1000) + self.assertEqual(directory_usage(tmp_dir), 1000*2) diff --git a/ooni/ui/web/server.py b/ooni/ui/web/server.py index ee33fa2b2..6f9ef69f4 100644 --- a/ooni/ui/web/server.py +++ b/ooni/ui/web/server.py @@ -1,6 +1,8 @@ from __future__ import print_function +import os import json +import errno import string from functools import wraps from random import SystemRandom @@ -186,13 +188,21 @@ def start_director(self): @property def status(self): + quota_warning = None + try: + with open(os.path.join(config.running_dir, "quota_warning")) as in_file: + quota_warning = in_file.read() + except IOError as ioe: + if ioe.errno != errno.ENOENT: + raise return { "software_version": ooniprobe_version, "software_name": "ooniprobe", "asn": probe_ip.geodata['asn'], "country_code": probe_ip.geodata['countrycode'], "director_started": self._director_started, - "initialized": self._is_initialized + "initialized": self._is_initialized, + "quota_warning": quota_warning } def handle_director_event(self, event): diff --git a/ooni/utils/files.py b/ooni/utils/files.py new file mode 100644 index 000000000..aefb831bb --- /dev/null +++ b/ooni/utils/files.py @@ -0,0 +1,34 @@ +import os +import re + +HUMAN_SIZE = re.compile("(\d+\.?\d*G)|(\d+\.?\d*M)|(\d+\.?\d*K)|(\d+\.?\d*)") + +class InvalidFormat(Exception): + pass + +def human_size_to_bytes(human_size): + """ + Converts a size specified in a human friendly way (for example 1G, 10M, + 30K) into bytes. + """ + gb, mb, kb, b = HUMAN_SIZE.match(human_size).groups() + if gb is not None: + b = float(gb[:-1]) * (1024 ** 3) + elif mb is not None: + b = float(mb[:-1]) * (1024 ** 2) + elif kb is not None: + b = float(kb[:-1]) * 1024 + elif b is not None: + b = float(b) + else: + raise InvalidFormat + return b + + +def directory_usage(path): + total_usage = 0 + for root, dirs, filenames in os.walk(path): + for filename in filenames: + fp = os.path.join(root, filename) + total_usage += os.path.getsize(fp) + return total_usage