Skip to content

Commit

Permalink
Implement disk quota management
Browse files Browse the repository at this point in the history
This is related to the feature described in here: TheTorProject/lepidopter#53
  • Loading branch information
hellais committed Aug 30, 2016
1 parent 196fc17 commit def4c92
Show file tree
Hide file tree
Showing 6 changed files with 166 additions and 7 deletions.
67 changes: 67 additions & 0 deletions ooni/agent/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
import errno

from datetime import datetime

from twisted.application import service
Expand All @@ -8,6 +11,7 @@
from ooni.scripts import oonireport
from ooni import resources
from ooni.utils import log, SHORT_DATE
from ooni.utils.files import human_size_to_bytes, directory_usage
from ooni.deck.store import input_store, deck_store
from ooni.settings import config
from ooni.contrib import croniter
Expand Down Expand Up @@ -146,6 +150,66 @@ def task(self):
measurement_path.child(measurement['id']).remove()


class CheckMeasurementQuota(ScheduledTask):
"""
This task is run to ensure we don't run out of disk space and deletes
older reports to avoid filling the quota.
"""
identifier = 'check-measurement-quota'
schedule = '@hourly'
_warn_when = 0.8

def task(self):
if config.basic.measurement_quota is None:
return
maximum_bytes = human_size_to_bytes(config.basic.measurement_quota)
available_bytes = directory_usage(config.measurements_directory)
warning_path = os.path.join(config.running_path, 'quota_warning')

if (float(available_bytes) / float(maximum_bytes)) >= self._warn_when:
log.warn("You are about to reach the maximum allowed quota. Be careful")
with open(warning_path, "w") as out_file:
out_file.write("{0} {1}".split(available_bytes, maximum_bytes))
else:
try:
os.remove(warning_path)
except OSError as ose:
if ose.errno != errno.ENOENT:
raise

if float(available_bytes) < float(maximum_bytes):
# We are within the allow quota exit.
return

# We should begin to delete old reports
amount_to_delete = float(maximum_bytes) - float(available_bytes)
amount_deleted = 0
measurement_path = FilePath(config.measurements_directory)

kept_measurements = []
stale_measurements = []
remaining_measurements = []
measurements_by_date = sorted(list_measurements(compute_size=True),
key=lambda k: k['test_start_time'])
for measurement in measurements_by_date:
if measurement['keep'] is True:
kept_measurements.append(measurement)
elif measurement['stale'] is True:
stale_measurements.append(measurement)
else:
remaining_measurements.append(measurement)

# This is the order in which we should begin deleting measurements.
ordered_measurements = (stale_measurements +
remaining_measurements +
kept_measurements)
while amount_deleted < amount_to_delete:
measurement = ordered_measurements.pop(0)
log.warn("Deleting report {0}".format(measurement["id"]))
measurement_path.child(measurement['id']).remove()
amount_deleted += measurement['size']


class RunDeck(ScheduledTask):
"""
This will run the decks that have been configured on the system as the
Expand Down Expand Up @@ -196,6 +260,7 @@ def task(self):
UpdateInputsAndResources
]


@defer.inlineCallbacks
def run_system_tasks(no_input_store=False):
task_classes = SYSTEM_TASKS[:]
Expand All @@ -215,6 +280,7 @@ def run_system_tasks(no_input_store=False):
log.err("Failed to run task {0}".format(task.identifier))
log.exception(exc)


class SchedulerService(service.MultiService):
"""
This service is responsible for running the periodic tasks.
Expand Down Expand Up @@ -271,6 +337,7 @@ def startService(self):
self.schedule(UpdateInputsAndResources())
self.schedule(UploadReports())
self.schedule(DeleteOldReports())
self.schedule(CheckMeasurementQuota())
self.schedule(RefreshDeckList(self))

self._looping_call.start(self.interval)
Expand Down
20 changes: 15 additions & 5 deletions ooni/measurements.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from twisted.python.filepath import FilePath
from ooni.utils import log
from ooni.utils.files import directory_usage
from ooni.settings import config

class MeasurementInProgress(Exception):
Expand Down Expand Up @@ -61,7 +62,8 @@ def generate_summary(input_file, output_file):
class MeasurementNotFound(Exception):
pass

def get_measurement(measurement_id):
def get_measurement(measurement_id, compute_size=False):
size = -1
measurement_path = FilePath(config.measurements_directory)
measurement = measurement_path.child(measurement_id)
if not measurement.exists():
Expand All @@ -70,6 +72,7 @@ def get_measurement(measurement_id):
running = False
completed = True
keep = False
stale = False
if measurement.child("measurements.njson.progress").exists():
completed = False
# XXX this is done quite often around the code, probably should
Expand All @@ -80,10 +83,14 @@ def get_measurement(measurement_id):
os.kill(pid, signal.SIG_DFL)
running = True
except OSError:
pass
stale = True

if measurement.child("keep").exists():
keep = True

if compute_size is True:
size = directory_usage(measurement.path)

test_start_time, country_code, asn, test_name = \
measurement_id.split("-")[:4]
return {
Expand All @@ -94,7 +101,9 @@ def get_measurement(measurement_id):
"id": measurement_id,
"completed": completed,
"keep": keep,
"running": running
"running": running,
"stale": stale,
"size": size
}


Expand All @@ -115,12 +124,13 @@ def get_summary(measurement_id):
with summary.open("r") as f:
return json.load(f)

def list_measurements():

def list_measurements(compute_size=False):
measurements = []
measurement_path = FilePath(config.measurements_directory)
for measurement_id in measurement_path.listdir():
try:
measurements.append(get_measurement(measurement_id))
measurements.append(get_measurement(measurement_id, compute_size))
except:
log.err("Failed to get metadata for measurement {0}".format(measurement_id))
return measurements
Expand Down
6 changes: 5 additions & 1 deletion ooni/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
# Where OONIProbe should be writing it's log file
logfile: {logfile}
loglevel: WARNING
# The maximum amount of data to store on disk. Once the quota is reached,
# we will start deleting older reports.
# measurement_quota: 1G
privacy:
# Should we include the IP address of the probe in the report?
includeip: {include_ip}
Expand Down Expand Up @@ -99,7 +102,8 @@
defaults = {
"basic": {
"loglevel": "WARNING",
"logfile": "ooniprobe.log"
"logfile": "ooniprobe.log",
"measurement_quota": "1G"
},
"privacy": {
"includeip": False,
Expand Down
34 changes: 34 additions & 0 deletions ooni/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import os
import tempfile

from twisted.trial import unittest

from ooni.utils import log, generate_filename, net
from ooni.utils.files import human_size_to_bytes, directory_usage


class TestUtils(unittest.TestCase):
Expand Down Expand Up @@ -43,3 +46,34 @@ def test_generate_filename_with_extension_and_prefix(self):
def test_get_addresses(self):
addresses = net.getAddresses()
assert isinstance(addresses, list)

def test_human_size(self):
self.assertEqual(
human_size_to_bytes("1G"),
1024**3
)
self.assertEqual(
human_size_to_bytes("1.3M"),
1.3 * 1024**2
)
self.assertEqual(
human_size_to_bytes("1.2K"),
1.2 * 1024
)
self.assertEqual(
human_size_to_bytes("1"),
1.0
)
self.assertEqual(
human_size_to_bytes("100.2"),
100.2
)

def test_directory_usage(self):
tmp_dir = tempfile.mkdtemp()
with open(os.path.join(tmp_dir, "something.txt"), "w") as out_file:
out_file.write("A"*1000)
os.mkdir(os.path.join(tmp_dir, "subdir"))
with open(os.path.join(tmp_dir, "subdir", "something.txt"), "w") as out_file:
out_file.write("A"*1000)
self.assertEqual(directory_usage(tmp_dir), 1000*2)
12 changes: 11 additions & 1 deletion ooni/ui/web/server.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from __future__ import print_function

import os
import json
import errno
import string
from functools import wraps
from random import SystemRandom
Expand Down Expand Up @@ -186,13 +188,21 @@ def start_director(self):

@property
def status(self):
quota_warning = None
try:
with open(os.path.join(config.running_dir, "quota_warning")) as in_file:
quota_warning = in_file.read()
except IOError as ioe:
if ioe.errno != errno.ENOENT:
raise
return {
"software_version": ooniprobe_version,
"software_name": "ooniprobe",
"asn": probe_ip.geodata['asn'],
"country_code": probe_ip.geodata['countrycode'],
"director_started": self._director_started,
"initialized": self._is_initialized
"initialized": self._is_initialized,
"quota_warning": quota_warning
}

def handle_director_event(self, event):
Expand Down
34 changes: 34 additions & 0 deletions ooni/utils/files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import os
import re

HUMAN_SIZE = re.compile("(\d+\.?\d*G)|(\d+\.?\d*M)|(\d+\.?\d*K)|(\d+\.?\d*)")

class InvalidFormat(Exception):
pass

def human_size_to_bytes(human_size):
"""
Converts a size specified in a human friendly way (for example 1G, 10M,
30K) into bytes.
"""
gb, mb, kb, b = HUMAN_SIZE.match(human_size).groups()
if gb is not None:
b = float(gb[:-1]) * (1024 ** 3)
elif mb is not None:
b = float(mb[:-1]) * (1024 ** 2)
elif kb is not None:
b = float(kb[:-1]) * 1024
elif b is not None:
b = float(b)
else:
raise InvalidFormat
return b


def directory_usage(path):
total_usage = 0
for root, dirs, filenames in os.walk(path):
for filename in filenames:
fp = os.path.join(root, filename)
total_usage += os.path.getsize(fp)
return total_usage

0 comments on commit def4c92

Please sign in to comment.