From babf7a1c892b4fd1e9c33d9255da8872a25be7c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Drei=C3=9Fig?= Date: Sun, 31 May 2020 14:43:14 +0200 Subject: [PATCH] Major rewrite of everything related to Checkers * New API and library for Checker Scripts * Build foundation for Checker Scripts in other languages (IPC protocol) * Checker Scripts can be executed locally (during development) without a dedicated Runner * No more Checker Slaves * Launch processes using Python multiprocessing instead of asyncio * Scheduling of checks within a tick is different, should lead to more even distribution * Run Checker Scripts as different users than the Checker Master * Lots of test cases Closes: https://github.com/fausecteam/ctf-gameserver/issues/1 Closes: https://github.com/fausecteam/ctf-gameserver/issues/20 Closes: https://github.com/fausecteam/ctf-gameserver/issues/33 Closes: https://github.com/fausecteam/ctf-gameserver/issues/38 Improves: https://github.com/fausecteam/ctf-gameserver/issues/17 Improves: https://github.com/fausecteam/ctf-gameserver/issues/27 Improves: https://github.com/fausecteam/ctf-gameserver/issues/29 Improves: https://github.com/fausecteam/ctf-gameserver/issues/32 Improves: https://github.com/fausecteam/ctf-gameserver/issues/35 --- conf/checker/checkermaster.env | 4 +- conf/checker/ctf-checkermaster@.service | 30 +- debian/install | 1 + debian/postinst | 7 + doc/source/setup.rst | 2 +- examples/checker/example_checker.env | 5 + examples/checker/example_checker.py | 88 ++++ examples/checker/example_service.py | 55 +++ examples/checker/examplechecker.conf | 12 - examples/checker/examples/dummy.py | 11 - examples/checker/faust2048.conf | 25 - examples/checker/faust2048.py | 68 --- examples/checker/sudoers.d/ctf-checker | 6 + scripts/checker/ctf-checkermaster | 336 +------------ scripts/checker/ctf-checkerslave | 140 ------ scripts/checker/ctf-testrunner | 76 --- setup.py | 2 - src/ctf_gameserver/checker/__init__.py | 9 +- src/ctf_gameserver/checker/abstract.py | 148 ------ src/ctf_gameserver/checker/constants.py | 13 - src/ctf_gameserver/checker/contest.py | 29 -- src/ctf_gameserver/checker/database.py | 138 ++++++ src/ctf_gameserver/checker/helpers.py | 54 --- src/ctf_gameserver/checker/local.py | 46 -- src/ctf_gameserver/checker/master.py | 342 +++++++++++++ src/ctf_gameserver/checker/supervisor.py | 321 +++++++++++++ src/ctf_gameserver/checkerlib/__init__.py | 1 + src/ctf_gameserver/checkerlib/lib.py | 355 ++++++++++++++ src/ctf_gameserver/lib/checkresult.py | 17 + src/ctf_gameserver/lib/database.py | 61 +++ tests/checker/fixtures/integration.json | 88 ++++ tests/checker/fixtures/master.json | 87 ++++ .../integration_basic_checkerscript.py | 32 ++ .../integration_exception_checkerscript.py | 20 + .../integration_multi_checkerscript.py | 43 ++ .../integration_state_checkerscript.py | 57 +++ .../checker/integration_sudo_checkerscript.py | 32 ++ .../integration_timeout_checkerscript.py | 24 + .../integration_unfinished_checkerscript.py | 18 + tests/checker/test_integration.py | 449 ++++++++++++++++++ tests/checker/test_master.py | 130 +++++ tests/checkerlib/test_local.py | 215 +++++++++ 42 files changed, 2618 insertions(+), 979 deletions(-) create mode 100644 examples/checker/example_checker.env create mode 100755 examples/checker/example_checker.py create mode 100755 examples/checker/example_service.py delete mode 100644 examples/checker/examplechecker.conf delete mode 100755 examples/checker/examples/dummy.py delete mode 100644 examples/checker/faust2048.conf delete mode 100755 examples/checker/faust2048.py create mode 100644 examples/checker/sudoers.d/ctf-checker delete mode 100755 scripts/checker/ctf-checkerslave delete mode 100755 scripts/checker/ctf-testrunner delete mode 100644 src/ctf_gameserver/checker/abstract.py delete mode 100644 src/ctf_gameserver/checker/constants.py delete mode 100644 src/ctf_gameserver/checker/contest.py create mode 100644 src/ctf_gameserver/checker/database.py delete mode 100644 src/ctf_gameserver/checker/helpers.py delete mode 100644 src/ctf_gameserver/checker/local.py create mode 100644 src/ctf_gameserver/checker/master.py create mode 100644 src/ctf_gameserver/checker/supervisor.py create mode 100644 src/ctf_gameserver/checkerlib/__init__.py create mode 100644 src/ctf_gameserver/checkerlib/lib.py create mode 100644 src/ctf_gameserver/lib/checkresult.py create mode 100644 tests/checker/fixtures/integration.json create mode 100644 tests/checker/fixtures/master.json create mode 100755 tests/checker/integration_basic_checkerscript.py create mode 100755 tests/checker/integration_exception_checkerscript.py create mode 100755 tests/checker/integration_multi_checkerscript.py create mode 100755 tests/checker/integration_state_checkerscript.py create mode 100755 tests/checker/integration_sudo_checkerscript.py create mode 100755 tests/checker/integration_timeout_checkerscript.py create mode 100755 tests/checker/integration_unfinished_checkerscript.py create mode 100644 tests/checker/test_integration.py create mode 100644 tests/checker/test_master.py create mode 100644 tests/checkerlib/test_local.py diff --git a/conf/checker/checkermaster.env b/conf/checker/checkermaster.env index 189978d..9b836cf 100644 --- a/conf/checker/checkermaster.env +++ b/conf/checker/checkermaster.env @@ -2,4 +2,6 @@ CTF_DBNAME="DUMMY" CTF_DBUSER="DUMMY" CTF_STATEDBNAME="DUMMY" CTF_STATEDBUSER="DUMMY" -CTF_SECRET="DUMMY" + +CTF_IPPATTERN="0.0.%s.2" +CTF_FLAGSECRET="RFVNTVlTRUNSRVQ=" diff --git a/conf/checker/ctf-checkermaster@.service b/conf/checker/ctf-checkermaster@.service index 5844ec2..152de99 100644 --- a/conf/checker/ctf-checkermaster@.service +++ b/conf/checker/ctf-checkermaster@.service @@ -1,20 +1,28 @@ [Unit] -Description=CTF Checker Runner -After=network.target postgresql.service +Description=CTF Gameserver Checker Master +After=postgresql.service [Service] Type=notify -ExecStart=/usr/bin/ctf-checkermaster -ExecStop=/bin/kill -s usr1 $MAINPID -WatchdogSec=20 -Restart=on-failure -TimeoutStopSec=65 +User=ctf-checkermaster EnvironmentFile=/etc/ctf-gameserver/checkermaster.env EnvironmentFile=-/etc/ctf-gameserver/checker/%i.env -Environment=PYTHONPATH=/etc/ctf-gameserver/checker/modules/ -User=nobody -Group=nogroup -SyslogIdentifier=ctf-checkermaster@%i +ExecStart=/usr/bin/ctf-checkermaster +# Allow waiting for Checker Scripts to finish +TimeoutStopSec=90 +Restart=on-failure +RestartSec=5 + +# Security options, less strict to accommodate for Checker Script oddities +PrivateDevices=yes +PrivateTmp=yes +ProtectControlGroups=yes +ProtectHome=yes +ProtectKernelModules=yes +ProtectKernelTunables=yes +ProtectSystem=strict +RestrictNamespaces=yes +RestrictRealtime=yes [Install] WantedBy=multi-user.target diff --git a/debian/install b/debian/install index ca5bbf0..8876423 100644 --- a/debian/install +++ b/debian/install @@ -1,5 +1,6 @@ conf/checker/checkermaster.env etc/ctf-gameserver conf/checker/ctf-checkermaster@.service lib/systemd/system +examples/checker/sudoers.d/ctf-checker etc/sudoers.d conf/controller/controller.env etc/ctf-gameserver conf/controller/flagid.env etc/ctf-gameserver diff --git a/debian/postinst b/debian/postinst index 87e652e..f8914ce 100755 --- a/debian/postinst +++ b/debian/postinst @@ -1,5 +1,12 @@ #!/bin/sh +if ! getent passwd ctf-checkermaster >/dev/null; then + adduser --system --group --home /var/lib/ctf-checkermaster --gecos 'CTF Gameserver Checker Master user,,,' ctf-checkermaster +fi +if ! getent passwd ctf-checkerrunner >/dev/null; then + adduser --system --group --home /var/lib/ctf-checkerrunner --gecos 'CTF Gameserver Checker Script user,,,' ctf-checkerrunner +fi + # No dh-systemd because we don't want to enable/start any services if test -x /bin/systemctl; then systemctl daemon-reload diff --git a/doc/source/setup.rst b/doc/source/setup.rst index ddcafcc..37eb833 100644 --- a/doc/source/setup.rst +++ b/doc/source/setup.rst @@ -65,7 +65,7 @@ should contain exactly one table: team_id INTEGER, service_id INTEGER, identifier CHARACTER VARYING (128), - data BYTEA + data TEXT ) PRIMARY KEY (team_id, service_id, identifier); Checker diff --git a/examples/checker/example_checker.env b/examples/checker/example_checker.env new file mode 100644 index 0000000..655897e --- /dev/null +++ b/examples/checker/example_checker.env @@ -0,0 +1,5 @@ +CTF_SERVICE="example_slug" +CTF_CHECKERSCRIPT="/path/to/example_checker.py" +CTF_MAXCHECKDURATION="90" +CTF_CHECKERCOUNT="1" +CTF_INTERVAL="10" diff --git a/examples/checker/example_checker.py b/examples/checker/example_checker.py new file mode 100755 index 0000000..e6745c7 --- /dev/null +++ b/examples/checker/example_checker.py @@ -0,0 +1,88 @@ +#!/usr/bin/env python3 + +import logging +import socket + +from ctf_gameserver import checkerlib + + +class ExampleChecker(checkerlib.BaseChecker): + + def place_flag(self, tick): + conn = connect(self.ip) + flag = checkerlib.get_flag(tick) + conn.sendall('SET {} {}\n'.format(tick, flag).encode('utf-8')) + logging.info('Sent SET command') + + try: + resp = recv_line(conn) + logging.info('Received response to SET command') + except UnicodeDecodeError: + logging.warning('Received non-UTF-8 data') + return checkerlib.CheckResult.FAULTY + if resp != 'OK': + logging.warning('Received wrong response to SET command') + return checkerlib.CheckResult.FAULTY + + conn.close() + return checkerlib.CheckResult.OK + + def check_service(self): + conn = connect(self.ip) + conn.sendall(b'XXX\n') + logging.info('Sent dummy command') + + try: + recv_line(conn) + logging.info('Received response to dummy command') + except UnicodeDecodeError: + logging.warning('Received non-UTF-8 data') + return checkerlib.CheckResult.FAULTY + + conn.close() + return checkerlib.CheckResult.OK + + def check_flag(self, tick): + flag = checkerlib.get_flag(tick) + + conn = connect(self.ip) + conn.sendall('GET {}\n'.format(tick).encode('utf-8')) + logging.info('Sent GET command') + + try: + resp = recv_line(conn) + logging.info('Received response to GET command') + except UnicodeDecodeError: + logging.warning('Received non-UTF-8 data') + return checkerlib.CheckResult.FAULTY + if resp != flag: + logging.warning('Received wrong response to GET command') + return checkerlib.CheckResult.FLAG_NOT_FOUND + + conn.close() + return checkerlib.CheckResult.OK + + +def connect(ip): + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.connect((ip, 9999)) + return sock + + +def recv_line(conn): + + received = b'' + while not received.endswith(b'\n'): + new = conn.recv(1024) + if len(new) == 0: + if not received.endswith(b'\n'): + raise EOFError('Unexpected EOF') + break + received += new + return received.decode('utf-8').rstrip() + + +if __name__ == '__main__': + + checkerlib.run_check(ExampleChecker) diff --git a/examples/checker/example_service.py b/examples/checker/example_service.py new file mode 100755 index 0000000..511bc64 --- /dev/null +++ b/examples/checker/example_service.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python3 + +import socketserver + + +_STORE = {} + + +class RequestHandler(socketserver.BaseRequestHandler): + + def handle(self): + client_data = self._recv_line() + print('Received:', client_data) + + try: + operation, client_data = client_data.split(' ', 1) + except ValueError: + response = 'INVALID' + else: + if operation == 'GET': + key = client_data + try: + response = _STORE[key] + except KeyError: + response = 'NODATA' + elif operation == 'SET': + try: + key, value = client_data.split(' ', 1) + except ValueError: + response = 'INVALID' + else: + _STORE[key] = value + response = 'OK' + else: + response = 'INVALID' + + print('Response:', response) + self.request.sendall(response.encode('utf-8') + b'\n') + + def _recv_line(self): + received = b'' + while not received.endswith(b'\n'): + new = self.request.recv(1024) + if len(new) == 0: + if not received.endswith(b'\n'): + raise EOFError('Unexpected EOF') + break + received += new + return received.decode('utf-8').rstrip() + + +if __name__ == "__main__": + + with socketserver.TCPServer(('localhost', 9999), RequestHandler) as server: + server.serve_forever() diff --git a/examples/checker/examplechecker.conf b/examples/checker/examplechecker.conf deleted file mode 100644 index bf60e1f..0000000 --- a/examples/checker/examplechecker.conf +++ /dev/null @@ -1,12 +0,0 @@ -# maximum number of checkers to run in parallel ---maxtasks 40 -# minimum checkers to start per iteration ---minstart 5 -# maximum number of checkers to start per iteration ---maxstart 8 -# time between checking the database for new jobs in seconds ---refresh 3 -# module / class of the checker ---checker dummy:DummyChecker -# id of the service ---serviceid 2 diff --git a/examples/checker/examples/dummy.py b/examples/checker/examples/dummy.py deleted file mode 100755 index e839cec..0000000 --- a/examples/checker/examples/dummy.py +++ /dev/null @@ -1,11 +0,0 @@ -from checker import BaseChecker, OK - -class DummyChecker(BaseChecker): - def place_flag(self): - return OK - - def check_flag(self, tick): - return OK - - def check_service(self): - return OK diff --git a/examples/checker/faust2048.conf b/examples/checker/faust2048.conf deleted file mode 100644 index 0828d12..0000000 --- a/examples/checker/faust2048.conf +++ /dev/null @@ -1,25 +0,0 @@ -[CTF] -# maximum number of checkers to run in parallel -maxtasks = 40 -# minimum checkers to start per iteration -minstart = 5 -# maximum number of checkers to start per iteration -maxstart = 8 -# time between checking the database for new jobs in seconds -refresh = 5 -# module / class of the checker -checker = faust2048:Faust2048Checker -# id of the service -serviceid = 2 - -[gamedatabase] -user = gameserver_checker -password = changethepassword -database = ctf_gameserver -host = 10.0.0.3 - -[checkerdatabase] -user = gameserver_checker -password = changethepassword -database = ctf_checkerstate -host = 10.0.0.3 diff --git a/examples/checker/faust2048.py b/examples/checker/faust2048.py deleted file mode 100755 index a45d54a..0000000 --- a/examples/checker/faust2048.py +++ /dev/null @@ -1,68 +0,0 @@ -from ctf_gameserver.checker import BaseChecker, OK, NOTFOUND - -import time -import telnetlib -import re -import random -import logging - -class Faust2048Checker(BaseChecker): - def __init__(self, tick, team, service, ip): - BaseChecker.__init__(self, tick, team, service, ip) - self._connection = telnetlib.Telnet(ip, 9000, 5) - self._played = False - - def _moves(self): - while True: - yield random.choice([b'a', b's', b'a', b's', b'd']) - - def _fetch_fortune(self): - self._connection.write(b"*") - self._connection.write(b'X') - self._connection.read_until(b'wasd q:') - - def check_service(self): - if self._played: - return 0 - - counter = 1 - self._fetch_fortune() - for move in self._moves(): - index, _, data = self._connection.expect([b"wasd q:", b"yn:", b"a q:"], 5) - - if index == 0: - if random.random() < 0.05: - self._fetch_fortune() - - self._connection.write(move) - elif index == 1: - self._connection.write(b"y") - counter = counter + 1 - elif index == 2: - match = re.search(b'Your Score: ([0-9]+)', data) - logging.info("Finished playing after %d tries with %d points\n" % - (counter, int(match.group(1)))) - self._played = True - return 0 - - def place_flag(self): - flag = self.get_flag(self._tick) - self._connection.read_until(b"Please enter your name:") - self._connection.write(flag.encode('latin-1')) - self._connection.write(b'\n') - return 0 - - def check_flag(self, tick): - self.check_service() - - flag = self.get_flag(tick) - - self._connection.write(b'a') - self._connection.write(flag.encode('latin-1')) - self._connection.write(b'\n') - logging.debug("Waiting for data") - data = self._connection.read_until(b'a q:') - if flag.encode('latin-1') in data: - return OK - else: - return NOTFOUND diff --git a/examples/checker/sudoers.d/ctf-checker b/examples/checker/sudoers.d/ctf-checker new file mode 100644 index 0000000..babe9e4 --- /dev/null +++ b/examples/checker/sudoers.d/ctf-checker @@ -0,0 +1,6 @@ +# Allow user "ctf-checkermaster" to use the `--close-from` argument when running sudo +Defaults:ctf-checkermaster closefrom_override + +# Allow user "ctf-checkermaster" to run any command as user "ctf-checkerrunner" without being prompted for a +# password +ctf-checkermaster ALL = (ctf-checkerrunner) NOPASSWD: ALL diff --git a/scripts/checker/ctf-checkermaster b/scripts/checker/ctf-checkermaster index d72dc0a..d05a1da 100755 --- a/scripts/checker/ctf-checkermaster +++ b/scripts/checker/ctf-checkermaster @@ -1,341 +1,9 @@ #!/usr/bin/env python3 -import argparse -import asyncio -import asyncio.subprocess -import base64 -import codecs -import datetime -import logging -import os -import signal import sys -import psycopg2 +from ctf_gameserver.checker import main -from ctf_gameserver.lib import daemon, flag -from ctf_gameserver.lib.args import get_arg_parser_with_db -from ctf_gameserver.checker import string_to_result - -MAXRUNTIME = datetime.timedelta(minutes=3) -conteststart = None -flagvalidity = None -tickduration = None -open_tasks = dict() -stopping = False -serviceid = None - -#pylint: disable=unused-argument -def handle_stop(signum, frame): - global stopping - stopping = True - logging.warning("Received signal, stopping the checkermaster gracefully") - - -def do_store(line, dbconnection, service, team): - with dbconnection: - with dbconnection.cursor() as cursor: - ident = line[1] - data = psycopg2.Binary(base64.b64decode(line[2])) - - # TODO: think about proper on conflict replace - cursor.execute("""DELETE FROM checkerstate - WHERE service_id = %s AND team_id = %s AND identifier = %s""", - (service, team, ident)) - cursor.execute("""INSERT INTO checkerstate - (service_id,team_id,identifier,data) - VALUES - (%s, %s, %s, %s)""", - (service, team, ident, data)) - - -def do_retrieve(line, dbconnection, service, team): - with dbconnection: - with dbconnection.cursor() as cursor: - ident = line[1] - cursor.execute("""SELECT data FROM checkerstate - WHERE team_id = %s - AND service_id = %s - AND identifier = %s""", - (team, service, ident)) - data = cursor.fetchone() - if data != None: - data, = data - data = base64.b64encode(data) - return data - return b"" - - -def do_flag(line, service, team, secret): - tick = int(line[1]) - timestamp = conteststart.timestamp() + (flagvalidity + tick) * tickduration - payload = None - if len(line) > 2: - payload = codecs.decode(line[2], 'hex') - newflag = flag.generate(team=team, service=service, secret=secret, - payload=payload, timestamp=timestamp) - return newflag - - -@asyncio.coroutine -def run_checker(service, team, tick, checker, ippattern, loglevel, dbconnection, secret, journald_arg, - gelf_server_arg): - logger = logging.getLogger("service%02d-team%03d-tick%03d" % (service, team, tick)) - logger.info("running checker") - result = None - subenv = dict(os.environ) - subenv['CHECKER_CONTEST'] = '' - - checkerslave_args = [ - "--service", str(service), - "--team", str(team), - "--tick", str(tick), - "--loglevel", logging.getLevelName(loglevel), - "--ip", ippattern % team - ] - if journald_arg: - checkerslave_args.append('--journald') - if gelf_server_arg is not None: - checkerslave_args.extend(['--gelf-server', gelf_server_arg]) - - process = yield from asyncio.create_subprocess_exec("ctf-checkerslave", - *checkerslave_args, - checker, - env=subenv, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - stdin=asyncio.subprocess.PIPE) - - logger.debug(repr(process)) - while True: - try: - line = yield from process.stdout.readline() - if process.returncode is not None: - logger.warning(repr(process.returncode)) - - if line == b'': - err = yield from process.stderr.read() - logger.warning(err.decode("utf-8")) - return - - logger.debug(repr(line)) - line = line.decode('us-ascii').split() - logging.debug(repr(line)) - - if "FLAG" == line[0]: - newflag = do_flag(line, service, team, secret) - process.stdin.write(newflag.encode('us-ascii')) - process.stdin.write(b"\n") - - elif "STORE" == line[0]: - try: - do_store(line, dbconnection, service, team) - process.stdin.write(b"OK\n") - except Exception: - process.stdin.write(b"FAIL\n") - logger.exception("Database store failed") - - elif "RETRIEVE" == line[0]: - try: - data = do_retrieve(line, dbconnection, service, team) - process.stdin.write(data + b"\n") - - except Exception: - process.stdin.write(b"\n") - logger.exception("Database get failed") - - elif line[0] in ["OK", "TIMEOUT", "NOTWORKING", "NOTFOUND", "RECOVERING"]: - result = line[0] - break - except ValueError: - logging.exception("Exception while communicating with checkerslave") - break - - try: - process.terminate() - except ProcessLookupError: - pass - - logger.info("checker finished with result %s", result) - return result - - -def get_fresh_jobs(gamedb, statedb, args): - global stopping - - daemon.notify("WATCHDOG=1") - loop = asyncio.get_event_loop() - loop.call_later(args.refresh, get_fresh_jobs, gamedb, statedb, args) - - tasks = list(open_tasks.keys()) - for task in tasks: - service, team, tick, starttime = open_tasks[task] - if task.cancelled(): - del open_tasks[task] - - elif task.done(): - try: - if task.result() is None: - logging.warning("Checkerscript failed for Team %d Tick %d", team, tick) - else: - result = string_to_result(task.result()) - with gamedb: - with gamedb.cursor() as cursor: - cursor.execute("""INSERT INTO scoring_statuscheck - (service_id, team_id, tick, status, timestamp) - VALUES - (%s, %s, %s, %s, now())""", - (service, team, tick, result)) - cursor.execute("""UPDATE scoring_flag - SET placement_end = now() - WHERE service_id = %s - AND protecting_team_id = %s - AND tick = %s""", - (service, team, tick)) - del open_tasks[task] - except psycopg2.InterfaceError: - stopping = True - logging.warning("Database connection closed, restarting checkermaster") - - except Exception: - logging.exception("Exception while trying to update service status in database") - - elif starttime + MAXRUNTIME < datetime.datetime.now(): - logging.warning("aborting job(service=%d, team=%d, tick=%d) due to timeout", - service, team, tick) - task.cancel() - - current_tasks = len(open_tasks) - - if stopping: - if current_tasks == 0: - sys.exit(0) - else: - return - - new_tasks = args.maxtasks - current_tasks - if new_tasks < args.minstart: - new_tasks = args.minstart - elif new_tasks > args.maxstart: - new_tasks = args.maxstart - - logging.info("Currently running %d tasks, starting up to %d fresh ones", - current_tasks, new_tasks) - - try: - jobs = [] - with gamedb: - with gamedb.cursor() as cursor: - cursor.execute("""SELECT - sc.id, protecting_team_id, sc.tick - FROM scoring_flag sc, scoring_gamecontrol gc - WHERE placement_start is NULL - AND sc.tick = gc.current_tick - AND sc.service_id = %s - ORDER BY RANDOM() - LIMIT %s - FOR UPDATE OF sc """, - (serviceid, new_tasks)) - jobs = cursor.fetchall() - cursor.executemany("""UPDATE scoring_flag - SET placement_start = now() - WHERE id = %s""", - [ (job[0],) for job in jobs ]) - - for _, team, tick in jobs: - service = serviceid - checker = args.checker - ippattern = args.ippattern - task = loop.create_task(run_checker(service, team, tick, checker, ippattern, - logging.getLogger().level, statedb, - base64.b64decode(args.secret), args.journald, - args.gelf_server)) - open_tasks[task] = service, team, tick, datetime.datetime.now() - logging.info("started %d (of %d possible) new jobs", len(jobs), new_tasks) - - except psycopg2.InterfaceError: - stopping = True - logging.warning("Database connection closed, restarting checkermaster") - - except Exception: - logging.exception("Exception while trying to get new jobs") - - -def main(): - logging.basicConfig() - - parser = get_arg_parser_with_db('CTF Gameserver Checker Master') - parser.add_argument('--ippattern', type=str, required=True, - help="python formatstring to build IP to connect to") - - group = parser.add_argument_group('statedb', 'Checker state database') - group.add_argument('--statedbname', type=str, required=True, - help='Name of the used database') - group.add_argument('--statedbuser', type=str, required=True, - help='username for database access') - group.add_argument('--statedbpassword', type=str, - help='password for database access if needed') - group.add_argument('--statedbhost', type=str, - help='hostname of the database. If unspecified ' - 'ctf-submission will connect via default UNIX socket') - - group = parser.add_argument_group("checker") - group.add_argument("--maxtasks", type=int, required=True, - help="maximum number of checkers to run in parallel") - group.add_argument("--minstart", type=int, required=True, - help="minimum number of checkers to start per iteration") - group.add_argument("--maxstart", type=int, required=True, - help="maximum number of checkers to start per iteration") - group.add_argument("--refresh", type=float, required=True, - help="time between checking the database for new jobs in seconds") - group.add_argument("--checker", type=str, required=True, - help="module:Class of the checker") - group.add_argument("--service", type=str, required=True, - help="name of the service") - parser.add_argument('--secret', type=str, required=True, - help="base64 random string consistent with checkers") - - group = parser.add_argument_group('logging', 'Checker script logging') - group.add_argument('--journald', action='store_true', help='Let checker scripts log to journald with an ' - 'identifier per service, team and tick') - group.add_argument('--gelf-server', help='Let checker scripts log to the specified GELF (Graylog) ' - 'server (":")') - - args = parser.parse_args() - - numeric_level = getattr(logging, args.loglevel.upper()) - logging.getLogger().setLevel(numeric_level) - - logging.debug("connecting to game database") - gamedb = psycopg2.connect(host=args.dbhost, - database=args.dbname, - user=args.dbuser, - password=args.dbpassword) - - logging.debug("connecting to cache database") - statedb = psycopg2.connect(host=args.statedbhost, - database=args.statedbname, - user=args.statedbuser, - password=args.statedbpassword) - - with gamedb: - with gamedb.cursor() as cursor: - global conteststart, flagvalidity, tickduration, serviceid - cursor.execute("""SELECT start, valid_ticks, tick_duration - FROM scoring_gamecontrol""") - conteststart, flagvalidity, tickduration = cursor.fetchone() - cursor.execute("""SELECT id FROM scoring_service WHERE name = %s""", - (args.service,)) - serviceid, = cursor.fetchone() - - signal.signal(signal.SIGUSR1, handle_stop) - - loop = asyncio.get_event_loop() - loop.call_soon(get_fresh_jobs, gamedb, statedb, args) - - daemon.notify("READY=1") - - loop.run_forever() if __name__ == '__main__': - main() + sys.exit(main()) diff --git a/scripts/checker/ctf-checkerslave b/scripts/checker/ctf-checkerslave deleted file mode 100755 index 5abf7e9..0000000 --- a/scripts/checker/ctf-checkerslave +++ /dev/null @@ -1,140 +0,0 @@ -#!/usr/bin/env python3 - -import logging -import argparse -import importlib -import sys -import socket -import requests -import resource - -from requests.adapters import TimeoutSauce -class MyTimeout(TimeoutSauce): - def __init__(self, *args, **kwargs): - connect = kwargs.get('connect', 10.0) - if connect is None: - connect = 10.0 - read = kwargs.get('read', connect) - if read is None: - read = 10.0 - total = kwargs.get('total', connect) - if total is None: - total = 10.0 - super(MyTimeout, self).__init__(connect=connect, read=read, total=total) - - -def run(args): - socket.setdefaulttimeout(10.0) - requests.adapters.TimeoutSauce = MyTimeout - - if args.benchmark: - res_baseline = resource.getrusage(resource.RUSAGE_SELF) - - logging.debug("Importing checker") - checkermod, checkerclass = args.checker.split(":") - checkermod = importlib.import_module(checkermod) - checkerclass = getattr(checkermod, checkerclass) - - logging.debug("Initializing checker") - checker = checkerclass(args.tick, args.team, args.service, args.ip) - result = checker.run() - - if args.benchmark: - res_finished = resource.getrusage(resource.RUSAGE_SELF) - res_children = resource.getrusage(resource.RUSAGE_CHILDREN) - logging.info("Resource usage during checker run:") - logging.info(" utime: %d", res_finished.ru_utime - res_baseline.ru_utime + res_children.ru_utime) - logging.info(" stime: %d", res_finished.ru_stime - res_baseline.ru_stime + res_children.ru_stime) - logging.info(" additional maxrss: %d", res_finished.ru_maxrss - res_baseline.ru_maxrss + res_children.ru_maxrss) - logging.info(" nvcsw: %d", res_finished.ru_nvcsw - res_baseline.ru_nvcsw + res_children.ru_nvcsw) - logging.info(" ru_oublock: %d", res_finished.ru_oublock - res_baseline.ru_oublock + res_children.ru_oublock) - logging.info(" ru_inblock: %d", res_finished.ru_inblock - res_baseline.ru_inblock + res_children.ru_inblock) - - if 0 == result: - print("OK") - elif 1 == result: - print("TIMEOUT") - elif 2 == result: - print("NOTWORKING") - elif 3 == result: - print("NOTFOUND") - elif 4 == result: - print("RECOVERING") - else: - print("UNKNOWN RESULT: {}".format(result)) - sys.stdout.flush() - - -class CheckMetadataFilter(logging.Filter): - - def __init__(self, checker, team, tick): - super().__init__() - self.checker = checker - self.team = team - self.tick = tick - - def filter(self, record): - record.checker = self.checker - record.team = self.team - record.tick = self.tick - return True - - -def main(): - logging.basicConfig() - parser = argparse.ArgumentParser(description="CTF checker runner") - parser.add_argument('checker', type=str, - help="module:classname of checker") - parser.add_argument('--service', type=int, required=True) - parser.add_argument('--ip', type=str, required=True) - parser.add_argument('--tick', type=int, required=True) - parser.add_argument('--team', type=int, required=True) - parser.add_argument('--benchmark', action='store_true') - parser.add_argument('-v', '--loglevel', default='WARNING', type=str, - choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], - help='Loglevel') - parser.add_argument('--journald', action='store_true') - parser.add_argument('--gelf-server') - - args = parser.parse_args() - - numeric_level = getattr(logging, args.loglevel.upper()) - logging.getLogger().setLevel(numeric_level) - - if args.journald: - try: - from systemd.journal import JournalHandler - except ImportError: - logging.error('systemd module is required for journald logging') - sys.exit(1) - - identifier = "%s-team%03d-tick%03d" % (args.checker, args.team, args.tick) - logging.root.addHandler(JournalHandler(SYSLOG_IDENTIFIER=identifier, - CTF_TEAM_ID=args.team, - CTF_TICK=args.tick)) - if args.gelf_server is not None: - try: - import graypy - except ImportError: - logging.error('graypy module is required for GELF logging') - sys.exit(1) - - try: - gelf_host, gelf_port = args.gelf_server.rsplit(':', 1) - gelf_port = int(gelf_port) - except ValueError: - logging.error('GELF server needs to be specified as ":"') - sys.exit(1) - - gelf_handler = graypy.GELFHandler(gelf_host, gelf_port) - gelf_handler.addFilter(CheckMetadataFilter(args.checker, args.team, args.tick)) - logging.getLogger().addHandler(gelf_handler) - - try: - run(args) - except Exception as e: - logging.getLogger().exception("Checker script failed with unhandled exception") - raise e - -if __name__ == '__main__': - main() diff --git a/scripts/checker/ctf-testrunner b/scripts/checker/ctf-testrunner deleted file mode 100755 index a4c49a5..0000000 --- a/scripts/checker/ctf-testrunner +++ /dev/null @@ -1,76 +0,0 @@ -#!/usr/bin/env python3 -# -# set $PYTHONPATH to your local ctf-gameserver/src checkout -# -# If you want to test subsequent runs, you really want to set -# --first. An perfectly fine value for that is `date +"%s"` -# -# for i in {1..10} -# do -# ./ctf-testrunner --first 1437258032 --backend `mktemp -d` --tick $i --ip $someip --team 1 --service 1 dummy:DummyChecker -# done - -import time -import argparse -import importlib -import logging -from base64 import b64decode - -def run_job(args): - logging.debug("Importing checker") - checkermod, checkerclass = args.checker.split(":") - checkermod = importlib.import_module(checkermod) - checkerclass = getattr(checkermod, checkerclass) - - logging.debug("Initializing checker") - checker = checkerclass(args.tick, args.team, args.service, args.ip) - checker.set_starttime(args.first) - checker.set_backend(args.backend) - checker.set_secret(b64decode(args.secret)) - - logging.debug("Running checker") - result = checker.run() - - if 0 == result: - print("OK") - elif 1 == result: - print("TIMEOUT") - elif 2 == result: - print("NOTWORKING") - elif 3 == result: - print("NOTFOUND") - elif 4 == result: - print("RECOVERING") - - -def main(): - logging.basicConfig() - parser = argparse.ArgumentParser(description="CTF checker runner") - parser.add_argument('checker', type=str, - help="module:classname of checker") - parser.add_argument('--verbose', action="store_true") - parser.add_argument('--ip', type=str, required=True) - parser.add_argument('--tick', type=int, required=True) - parser.add_argument('--team', type=int, required=True) - parser.add_argument('--service', type=int, required=True) - parser.add_argument('--secret', type=str, default='dGVzdHRlc3R0ZQ==') - parser.add_argument('--first', type=int, default=int(time.time()) // 60 * 60, - help="timestamp of first tick") - parser.add_argument('--backend', type=str, default='/tmp', - help='location to store persistent data') - parser.add_argument('-v', '--loglevel', default='WARNING', type=str, - choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'], - help='Loglevel') - - args = parser.parse_args() - - numeric_level = getattr(logging, args.loglevel.upper()) - logging.getLogger().setLevel(numeric_level) - - logging.debug(repr(args)) - starttime = time.time() - run_job(args) - logging.info("Processing took %.2f seconds", (time.time() - starttime,)) - -if __name__ == '__main__': - main() diff --git a/setup.py b/setup.py index c05c951..9e3412a 100755 --- a/setup.py +++ b/setup.py @@ -42,9 +42,7 @@ packages = find_packages('src'), scripts = [ 'scripts/checker/ctf-checkermaster', - 'scripts/checker/ctf-checkerslave', 'scripts/checker/ctf-logviewer', - 'scripts/checker/ctf-testrunner', 'scripts/controller/ctf-controller', 'scripts/controller/ctf-flagid', 'scripts/submission/ctf-submission' diff --git a/src/ctf_gameserver/checker/__init__.py b/src/ctf_gameserver/checker/__init__.py index c05cb89..bd0ebad 100644 --- a/src/ctf_gameserver/checker/__init__.py +++ b/src/ctf_gameserver/checker/__init__.py @@ -1,8 +1 @@ -import os - -if 'CHECKER_CONTEST' in os.environ: - from .contest import ContestChecker as BaseChecker -else: - from .local import LocalChecker as BaseChecker - -from .constants import * +from .master import main diff --git a/src/ctf_gameserver/checker/abstract.py b/src/ctf_gameserver/checker/abstract.py deleted file mode 100644 index f17e919..0000000 --- a/src/ctf_gameserver/checker/abstract.py +++ /dev/null @@ -1,148 +0,0 @@ -from abc import ABCMeta, abstractmethod - -import logging -import json -import ssl - -from .constants import * -from .helpers import is_timeout - -class AbstractChecker(metaclass=ABCMeta): - """Base class for custom checker scripts - - Individual checkers should import `BaseChecker` which does the - right thing in terms of backend depending on whether you test - locally or the checker is run during the Contest. They must - implement the place_flag and check_flag methods and may add - individual __init__ code. You may override the run method but need - to keep it morally the same - - ident used for store/retrieve are supposed to match - [A-Za-z0-9_-]+. It must be uniq for every (service,target) - """ - def __init__(self, tick, team, service, ip): - self._team = team - self._ip = ip - self._tick = tick - self._service = service - self._tickduration = 300 - self._lookback = 5 - self._logger = logging.getLogger(self.__class__.__name__) - self._checker_action = None - - @property - def tick(self): - """Accessor for the current tick""" - return self._tick - - @property - def logger(self): - """Accessor for the logger to use""" - if self._checker_action: - extra = {'checker_action' : self._checker_action} - return logging.LoggerAdapter(self._logger, extra) - return self._logger - - def check_service(self): - """ Check if the service is running as expected""" - return 0 - - @abstractmethod - def check_flag(self, tick): - """Check for the flag from tick on the tested team's server - - To be reimplemented by users - """ - pass - - @abstractmethod - def place_flag(self): - """Place flag on the tested team's server - - To be reimplemented by users""" - pass - - def store_yaml(self, ident, yaml): - return self.store_blob(ident, json.dumps(yaml).encode('utf-8')) - - @abstractmethod - def store_blob(self, ident, blob): - "store binary blob on persistent storage" - pass - - def retrieve_yaml(self, ident): - blob = self.retrieve_blob(ident) - if blob is None: - return None - - try: - blob = self.retrieve_blob(ident) - if blob is None: - return None - return json.loads(blob.decode('utf-8')) - except ValueError: - return None - - @abstractmethod - def retrieve_blob(self, ident): - "return binary blob from persistent storage" - pass - - @abstractmethod - def get_flag(self, tick, payload=None): - "returns the flag for tick possibly including payload" - pass - - def _validate_result(self, result): - try: - text = result_to_string(result) - except KeyError: - self.logger.error("Checker returned unexpected return value '%s'", result) - raise Exception("broken checker: invalid return value") - self.logger.debug("checker result: %s", text) - - def run(self): - try: - self._checker_action = 'place_flag' - self.logger.debug("Placing flag") - result = self.place_flag() - self._validate_result(result) - if result != OK: - return result - - self._checker_action = 'check_service' - self.logger.debug("General Service Checks") - result = self.check_service() - self._validate_result(result) - if result != OK: - return result - - oldesttick = max(self._tick - self._lookback, -1) - recovering = False - self._checker_action = 'check_flag' - for tick in range(self._tick, oldesttick, -1): - self.logger.debug("Checking for flag of tick %d", tick) - result = self.check_flag(tick) - self._validate_result(result) - - if result != OK: - self.logger.info("Got %d for flag of tick %d", result, tick) - if tick != self._tick and result == NOTFOUND: - recovering = True - else: - return result - - return RECOVERING if recovering else OK - - except Exception as e: - if is_timeout(e): - self.logger.info("Timeout caught by BaseLogger: %s", e) - return TIMEOUT - elif isinstance(e, ssl.SSLError): - self.logger.info("generic SSLError: %s", e) - return NOTWORKING - else: - self.logger.exception("Checker script failed with unhandled exception") - raise e - finally: - self._checker_action = None diff --git a/src/ctf_gameserver/checker/constants.py b/src/ctf_gameserver/checker/constants.py deleted file mode 100644 index 74595e9..0000000 --- a/src/ctf_gameserver/checker/constants.py +++ /dev/null @@ -1,13 +0,0 @@ -OK = 0 -TIMEOUT = 1 -NOTWORKING = 2 -NOTFOUND = 3 -RECOVERING = 4 - -_mapping = ["OK", "TIMEOUT", "NOTWORKING", "NOTFOUND", "RECOVERING"] - -def string_to_result(strresult): - return _mapping.index(strresult) - -def result_to_string(result): - return _mapping[result] diff --git a/src/ctf_gameserver/checker/contest.py b/src/ctf_gameserver/checker/contest.py deleted file mode 100644 index 0c83d4e..0000000 --- a/src/ctf_gameserver/checker/contest.py +++ /dev/null @@ -1,29 +0,0 @@ -from .abstract import AbstractChecker - -import base64 -import sys -import codecs - -class ContestChecker(AbstractChecker): - def __init__(self, tick, team, service, ip): - AbstractChecker.__init__(self, tick, team, service, ip) - - def _rpc(self, function, *args): - sys.stdout.write("%s %s\n" % (function, " ".join(args))) - sys.stdout.flush() - return sys.stdin.readline().strip() - - def get_flag(self, tick, payload=None): - if payload is None: - return self._rpc("FLAG", str(tick)) - else: - payload = codecs.encode(payload, 'hex').decode('latin-1') - return self._rpc("FLAG", str(tick), payload) - - def store_blob(self, ident, blob): - data = base64.b64encode(blob) - return self._rpc("STORE", ident, data.decode('latin-1')) - - def retrieve_blob(self, ident): - data = self._rpc("RETRIEVE", ident) - return base64.b64decode(data) diff --git a/src/ctf_gameserver/checker/database.py b/src/ctf_gameserver/checker/database.py new file mode 100644 index 0000000..274240f --- /dev/null +++ b/src/ctf_gameserver/checker/database.py @@ -0,0 +1,138 @@ +from ctf_gameserver.lib.database import transaction_cursor +from ctf_gameserver.lib.exceptions import DBDataError + + +def get_control_info(db_conn, prohibit_changes=False): + """ + Returns a dictionary containing relevant information about the competion, as stored in the game database. + """ + + with transaction_cursor(db_conn, prohibit_changes) as cursor: + cursor.execute('SELECT valid_ticks, tick_duration FROM scoring_gamecontrol') + result = cursor.fetchone() + + if result is None: + raise DBDataError('Game control information has not been configured') + + return { + 'valid_ticks': result[0], + 'tick_duration': result[1] + } + + +def get_service_attributes(db_conn, service_slug, prohibit_changes=False): + """ + Returns ID and name of a service for a given slug. + """ + + with transaction_cursor(db_conn, prohibit_changes) as cursor: + cursor.execute('SELECT id, name FROM scoring_service WHERE slug = %s', (service_slug,)) + result = cursor.fetchone() + + return { + 'id': result[0], + 'name': result[1] + } + + +def get_current_tick(db_conn, prohibit_changes=False): + """ + Reads the current tick from the game database. + """ + + with transaction_cursor(db_conn, prohibit_changes) as cursor: + cursor.execute('SELECT current_tick FROM scoring_gamecontrol') + result = cursor.fetchone() + + return result[0] + + +def get_task_count(db_conn, service_id, prohibit_changes=False): + """ + Returns the total number of tasks for the given service in the current tick. + With our current Controller implementation, this should always be equal to the number of teams. + """ + + with transaction_cursor(db_conn, prohibit_changes) as cursor: + cursor.execute('SELECT COUNT(*)' + ' FROM scoring_flag flag, scoring_gamecontrol control' + ' WHERE flag.tick = control.current_tick' + ' AND flag.service_id = %s', (service_id,)) + result = cursor.fetchone() + + return result[0] + + +def get_new_tasks(db_conn, service_id, task_count, prohibit_changes=False): + """ + Retrieves the given number of random open check tasks and marks them as in progress. + """ + + with transaction_cursor(db_conn, prohibit_changes) as cursor: + cursor.execute('SELECT flag.id, flag.protecting_team_id, flag.tick' + ' FROM scoring_flag flag, scoring_gamecontrol control' + ' WHERE flag.placement_start is NULL' + ' AND flag.tick = control.current_tick' + ' AND flag.service_id = %s' + ' ORDER BY RANDOM()' + ' LIMIT %s' + ' FOR UPDATE OF flag', (service_id, task_count)) + tasks = cursor.fetchall() + + # Mark placement as in progress + cursor.executemany('UPDATE scoring_flag' + ' SET placement_start = NOW()' + ' WHERE id = %s', [(task[0],) for task in tasks]) + + return [{ + 'team_id': task[1], + 'tick': task[2] + } for task in tasks] + + +def commit_result(db_conn, service_id, team_id, tick, result, prohibit_changes=False): + """ + Saves the result from a Checker run to game database. + """ + + with transaction_cursor(db_conn, prohibit_changes) as cursor: + cursor.execute('INSERT INTO scoring_statuscheck' + ' (service_id, team_id, tick, status, timestamp)' + ' VALUES (%s, %s, %s, %s, NOW())', (service_id, team_id, tick, result)) + # (In case of `prohibit_changes`,) PostgreSQL checks the database grants even if nothing is matched + # by `WHERE` + cursor.execute('UPDATE scoring_flag' + ' SET placement_end = NOW()' + ' WHERE service_id = %s AND protecting_team_id = %s AND tick = %s', (service_id, + team_id, + tick)) + + +def load_state(db_conn, service_id, team_id, identifier, prohibit_changes=False): + """ + Loads Checker data from state database. + """ + + with transaction_cursor(db_conn, prohibit_changes) as cursor: + cursor.execute('SELECT data FROM checkerstate' + ' WHERE service_id = %s' + ' AND team_id = %s' + ' AND identifier = %s', (service_id, team_id, identifier)) + data = cursor.fetchone() + + if data is None: + return None + return data[0] + + +def store_state(db_conn, service_id, team_id, identifier, data, prohibit_changes=False): + """ + Stores Checker data in state database. + """ + + with transaction_cursor(db_conn, prohibit_changes) as cursor: + # (In case of `prohibit_changes`,) PostgreSQL checks the database grants even if no CONFLICT occurs + cursor.execute('INSERT INTO checkerstate (service_id, team_id, identifier, data)' + ' VALUES (%s, %s, %s, %s)' + ' ON CONFLICT (service_id, team_id, identifier)' + ' DO UPDATE SET data = EXCLUDED.data', (service_id, team_id, identifier, data)) diff --git a/src/ctf_gameserver/checker/helpers.py b/src/ctf_gameserver/checker/helpers.py deleted file mode 100644 index 2cb4dea..0000000 --- a/src/ctf_gameserver/checker/helpers.py +++ /dev/null @@ -1,54 +0,0 @@ -""" Useful functions for checkers. """ - -import socket -import errno -import ssl -import urllib3 -import requests - -def is_timeout(ex): - """ Is this exception due to a timeout/connection error? """ - - exception_types = ( - socket.timeout, - requests.exceptions.Timeout, - requests.exceptions.ConnectTimeout, - requests.packages.urllib3.exceptions.ConnectionError, - urllib3.exceptions.ConnectionError, - urllib3.exceptions.ReadTimeoutError, - EOFError, # telnetlib - ssl.SSLEOFError, - ssl.SSLZeroReturnError, - ssl.SSLWantReadError, - ssl.SSLWantWriteError, - ) - - try: - import nclib - exception_types += (nclib.NetcatError,) - except ImportError: - pass - - # these only exist in recent urllib3 versions: - if hasattr(requests.packages.urllib3.exceptions, 'NewConnectionError'): - exception_types += (requests.packages.urllib3.exceptions.NewConnectionError,) - if hasattr(urllib3.exceptions, 'NewConnectionError'): - exception_types += (urllib3.exceptions.NewConnectionError,) - - if isinstance(ex, exception_types): - return True - - if isinstance(ex, requests.exceptions.ConnectionError): - return len(ex.args) == 1 and is_timeout(ex.args[0]) - if isinstance(ex, (urllib3.exceptions.MaxRetryError, - requests.packages.urllib3.exceptions.MaxRetryError)): - return is_timeout(ex.reason) - if isinstance(ex, (urllib3.exceptions.ProtocolError, - requests.packages.urllib3.exceptions.ProtocolError)): - return len(ex.args) == 2 and is_timeout(ex.args[1]) - if isinstance(ex, OSError): - return ex.errno in (errno.ETIMEDOUT, errno.ECONNREFUSED, errno.EHOSTDOWN, - errno.EHOSTUNREACH, errno.ENETUNREACH, errno.ENETDOWN, - errno.ENETRESET, errno.ECONNRESET, errno.ECONNABORTED, - errno.EPIPE) - return False diff --git a/src/ctf_gameserver/checker/local.py b/src/ctf_gameserver/checker/local.py deleted file mode 100644 index 8d95271..0000000 --- a/src/ctf_gameserver/checker/local.py +++ /dev/null @@ -1,46 +0,0 @@ -from .abstract import AbstractChecker -from ctf_gameserver.lib import flag - -import os -import os.path -import logging -import sys - -class LocalChecker(AbstractChecker): - def __init__(self, tick, team, service, ip): - AbstractChecker.__init__(self, tick, team, service, ip) - self._starttime = 0 - self._backend = '/tmp' - self._secret = b'testtestte' - self.logger.setLevel(logging.DEBUG) - self.logger.addHandler(logging.StreamHandler(sys.stderr)) - - def store_blob(self, ident, blob): - filename = os.path.join(self._backend, "%s.blob" % ident) - try: - with open(filename, "wb") as handle: - return handle.write(blob) - except FileNotFoundError: - return None - - def retrieve_blob(self, ident): - filename = os.path.join(self._backend, "%s.blob" % ident) - try: - with open(filename, "rb") as handle: - return handle.read() - except FileNotFoundError: - return None - - def get_flag(self, tick, payload=None): - generatedflag = flag.generate(self._team, self._service, self._secret, payload, - self._starttime + self._tickduration * tick) - return generatedflag - - def set_backend(self, backend): - self._backend = backend - - def set_starttime(self, starttime): - self._starttime = starttime - - def set_secret(self, secret): - self._secret = secret diff --git a/src/ctf_gameserver/checker/master.py b/src/ctf_gameserver/checker/master.py new file mode 100644 index 0000000..ec9b8b1 --- /dev/null +++ b/src/ctf_gameserver/checker/master.py @@ -0,0 +1,342 @@ +import base64 +import datetime +import logging +import math +import os +import signal +import time + +import psycopg2 +from psycopg2 import errorcodes as postgres_errors + +from ctf_gameserver.lib.args import get_arg_parser_with_db +from ctf_gameserver.lib import daemon +from ctf_gameserver.lib.database import transaction_cursor +from ctf_gameserver.lib.checkresult import CheckResult +from ctf_gameserver.lib.exceptions import DBDataError +import ctf_gameserver.lib.flag as flag_lib + +from . import database +from .supervisor import RunnerSupervisor +from .supervisor import ACTION_FLAG, ACTION_LOAD, ACTION_STORE, ACTION_RESULT + + +def main(): + + arg_parser = get_arg_parser_with_db('CTF Gameserver Checker Master') + arg_parser.add_argument('--ippattern', type=str, required=True, + help='(Old-style) Python formatstring for building the IP to connect to') + arg_parser.add_argument('--flagsecret', type=str, required=True, + help='Base64 string used as secret in flag generation') + + group = arg_parser.add_argument_group('statedb', 'Checker state database') + group.add_argument('--statedbhost', type=str, help='Hostname of the database. If unspecified, the ' + 'default Unix socket will be used.') + group.add_argument('--statedbname', type=str, required=True, + help='Name of the used database') + group.add_argument('--statedbuser', type=str, required=True, + help='User name for database access') + group.add_argument('--statedbpassword', type=str, + help='Password for database access if needed') + + group = arg_parser.add_argument_group('check', 'Check parameters') + group.add_argument('--service', type=str, required=True, + help='Slug of the service') + group.add_argument('--checkerscript', type=str, required=True, + help='Path of the Checker Script') + group.add_argument('--sudouser', type=str, help=' User to excute the Checker Scripts as, will be passed ' + 'to `sudo -u`') + group.add_argument('--maxcheckduration', type=float, required=True, + help='Maximum duration of a Checker Script run in seconds') + group.add_argument('--checkercount', type=int, required=True, + help='Number of Checker Masters running for this service') + group.add_argument('--interval', type=float, required=True, + help='Time between launching batches of Checker Scripts in seconds') + + group = arg_parser.add_argument_group('logging', 'Checker Script logging') + group.add_argument('--journald', action='store_true', help='Log Checker Script messages to journald') + group.add_argument('--gelf-server', help='Log Checker Script messages to the specified GELF (Graylog) ' + 'server (":")') + + args = arg_parser.parse_args() + + logging.basicConfig(format='[%(levelname)s] %(message)s [%(name)s]') + numeric_loglevel = getattr(logging, args.loglevel.upper()) + logging.getLogger().setLevel(numeric_loglevel) + + if args.interval < 3: + logging.error('`--interval` must be at least 3 seconds') + return os.EX_USAGE + + logging_params = {} + + if args.journald: + try: + # pylint: disable=import-outside-toplevel,unused-import,import-error + from systemd.journal import JournalHandler + except ImportError: + logging.error('systemd module is required for journald logging') + return os.EX_USAGE + logging_params['journald'] = True + + if args.gelf_server is not None: + try: + # pylint: disable=import-outside-toplevel,unused-import,import-error + import graypy + except ImportError: + logging.error('graypy module is required for GELF logging') + return os.EX_USAGE + try: + gelf_host, gelf_port = args.gelf_server.rsplit(':', 1) + gelf_port = int(gelf_port) + except ValueError: + logging.error('GELF server needs to be specified as ":"') + return os.EX_USAGE + logging_params['gelf'] = {'host': gelf_host, 'port': gelf_port} + + try: + game_db_conn = psycopg2.connect(host=args.dbhost, database=args.dbname, user=args.dbuser, + password=args.dbpassword) + except psycopg2.OperationalError as e: + logging.error('Could not establish connection to game database: %s', e) + return os.EX_UNAVAILABLE + logging.info('Established connection to game database') + + try: + state_db_conn = psycopg2.connect(host=args.statedbhost, database=args.statedbname, + user=args.statedbuser, password=args.statedbpassword) + except psycopg2.OperationalError as e: + logging.error('Could not establish connection to state database: %s', e) + return os.EX_UNAVAILABLE + logging.info('Established connection to state database') + + # Keep our mental model easy by always using (timezone-aware) UTC for dates and times + with transaction_cursor(game_db_conn) as cursor: + cursor.execute('SET TIME ZONE "UTC"') + with transaction_cursor(state_db_conn) as cursor: + cursor.execute('SET TIME ZONE "UTC"') + + # Check database grants + try: + try: + database.get_control_info(game_db_conn, prohibit_changes=True) + except DBDataError as e: + logging.warning('Invalid database state: %s', e) + + service_id = database.get_service_attributes(game_db_conn, args.service, prohibit_changes=True)['id'] + database.get_current_tick(game_db_conn, prohibit_changes=True) + database.get_task_count(game_db_conn, service_id, prohibit_changes=True) + database.get_new_tasks(game_db_conn, service_id, 1, prohibit_changes=True) + database.commit_result(game_db_conn, service_id, 1, -1, 0, prohibit_changes=True) + database.load_state(state_db_conn, service_id, 1, 'identifier', prohibit_changes=True) + database.store_state(state_db_conn, service_id, 1, 'identifier', 'data', prohibit_changes=True) + except psycopg2.ProgrammingError as e: + if e.pgcode == postgres_errors.INSUFFICIENT_PRIVILEGE: + # Log full exception because only the backtrace will tell which kind of permission is missing + logging.exception('Missing database permissions:') + return os.EX_NOPERM + else: + raise + + daemon.notify('READY=1') + + while True: + try: + master_loop = MasterLoop(game_db_conn, state_db_conn, args.service, args.checkerscript, + args.sudouser, args.maxcheckduration, args.checkercount, args.interval, + args.ippattern, args.flagsecret, logging_params) + break + except DBDataError as e: + logging.warning('Waiting for valid database state: %s', e) + time.sleep(60) + + # Graceful shutdown to prevent loss of check results + def sigterm_handler(_, __): + logging.info('Shutting down, waiting for %d Checker Scripts to finish', + master_loop.get_running_script_count()) + master_loop.shutting_down = True + signal.signal(signal.SIGTERM, sigterm_handler) + + while True: + try: + master_loop.step() + if master_loop.shutting_down and master_loop.get_running_script_count() == 0: + break + except: # noqa, pylint: disable=bare-except + logging.exception('Error in main loop:') + + return os.EX_OK + + +class MasterLoop: + + def __init__(self, game_db_conn, state_db_conn, service_slug, checker_script, sudo_user, + max_check_duration, checker_count, interval, ip_pattern, flag_secret, logging_params): + self.game_db_conn = game_db_conn + self.state_db_conn = state_db_conn + self.checker_script = checker_script + self.sudo_user = sudo_user + self.max_check_duration = max_check_duration + self.checker_count = checker_count + self.interval = interval + self.ip_pattern = ip_pattern + self.flag_secret = flag_secret + self.logging_params = logging_params + + control_info = database.get_control_info(self.game_db_conn) + self.tick_duration = datetime.timedelta(seconds=control_info['tick_duration']) + self.flag_valid_ticks = control_info['valid_ticks'] + self.service = database.get_service_attributes(self.game_db_conn, service_slug) + self.service['slug'] = service_slug + + self.supervisor = RunnerSupervisor() + self.known_tick = -1 + # Trigger launch of tasks in first step() + self.last_launch = get_monotonic_time() - self.interval + self.tasks_per_launch = None + self.shutting_down = False + + def step(self): + """ + Handles a request from the supervisor, kills overdue tasks and launches new ones. + Only processes one request at a time to make sure that launch_tasks() gets called regularly and + long-running tasks get killed, at the cost of accumulating a backlog of messages. + + Returns: + A boolean indicating whether a request was handled. + """ + req = self.supervisor.get_request() + if req is not None: + resp = None + send_resp = True + + try: + if req['action'] == ACTION_FLAG: + resp = self.handle_flag_request(req['info'], req['param']) + elif req['action'] == ACTION_LOAD: + resp = self.handle_load_request(req['info'], req['param']) + elif req['action'] == ACTION_STORE: + self.handle_store_request(req['info'], req['param']) + elif req['action'] == ACTION_RESULT: + self.handle_result_request(req['info'], req['param']) + else: + logging.error('Unknown action received from Checker Script for team %d in tick %d: %s', + req['info']['team'], req['info']['tick'], req['action']) + # We can't signal an error to the Checker Script (which might be waiting for a response), + # so our only option is to kill it + self.supervisor.terminate_runner(req['runner_id']) + send_resp = False + except: # noqa, pylint: disable=bare-except + logging.exception('Checker Script communication error for team %d in tick %d:', + req['info']['team'], req['info']['tick']) + self.supervisor.terminate_runner(req['runner_id']) + else: + if send_resp: + req['send'].send(resp) + + if not self.shutting_down: + # Launch new tasks and catch up missed intervals + while get_monotonic_time() - self.last_launch >= self.interval: + self.last_launch += self.interval + self.launch_tasks() + + return req is not None + + def handle_flag_request(self, task_info, params): + try: + payload = base64.b64decode(params['payload']) + except KeyError: + payload = None + + if payload == b'': + payload = None + expiration = datetime.datetime.utcnow() + (self.tick_duration * self.flag_valid_ticks) + + return flag_lib.generate(task_info['team'], self.service['id'], self.flag_secret, payload, + expiration.timestamp()) + + def handle_load_request(self, task_info, param): + return database.load_state(self.state_db_conn, self.service['id'], task_info['team'], param) + + def handle_store_request(self, task_info, params): + database.store_state(self.state_db_conn, self.service['id'], task_info['team'], params['key'], + params['data']) + + def handle_result_request(self, task_info, param): + try: + result = int(param) + except ValueError: + logging.error('Invalid result from Checker Script for team %d in tick %d: %s', + task_info['team'], task_info['tick'], param) + return + + try: + check_result = CheckResult(result) + except ValueError: + logging.error('Invalid result from Checker Script for team %d in tick %d: %d', + task_info['team'], task_info['tick'], result) + return + + logging.info('Result from Checker Script for team %d in tick %d: %d', task_info['team'], + task_info['tick'], check_result.value) + database.commit_result(self.game_db_conn, self.service['id'], task_info['team'], task_info['tick'], + result) + + def launch_tasks(self): + current_tick = database.get_current_tick(self.game_db_conn) + + if current_tick < 0: + # Competition not running yet + return + + if current_tick != self.known_tick: + self.supervisor.terminate_runners() + self.update_launch_params() + self.known_tick = current_tick + + tasks = database.get_new_tasks(self.game_db_conn, self.service['id'], self.tasks_per_launch) + for task in tasks: + ip = self.ip_pattern % task['team_id'] + runner_args = [self.checker_script, ip, str(task['team_id']), str(task['tick'])] + if self.sudo_user is not None: + runner_args = ['sudo', '--user='+self.sudo_user, '--preserve-env=PATH,CTF_CHECKERSCRIPT', + '--close-from=5', '--'] + runner_args + + # Information in task_info should be somewhat human-readable, because it also ends up in Checker + # Script logs + task_info = {'service': self.service['name'], + 'team': task['team_id'], + 'tick': current_tick} + logging.info('Starting Checker Script for team %d in tick %d', task['team_id'], current_tick) + self.supervisor.start_runner(runner_args, task_info, self.logging_params) + + def update_launch_params(self): + """ + Determines the number of Checker tasks to start per launch. + Our goal here is to balance the load over a tick with some smearing (to make Checker fingerprinting + more difficult), while also ensuring that all teams get checked in every tick. + This simple implementation distributes the start of tasks evenly across the available time with some + safety margin at the end. + """ + total_tasks = database.get_task_count(self.game_db_conn, self.service['id']) + local_tasks = math.ceil(total_tasks / self.checker_count) + + margin_seconds = self.tick_duration.total_seconds() / 6 + launch_timeframe = self.tick_duration.total_seconds() - self.max_check_duration - margin_seconds + if launch_timeframe <= 0: + raise ValueError('Maximum Checker Script duration too long for tick') + + intervals_per_timeframe = math.floor(launch_timeframe / self.interval) + self.tasks_per_launch = math.ceil(local_tasks / intervals_per_timeframe) + + def get_running_script_count(self): + return len(self.supervisor.processes) + + +def get_monotonic_time(): + """ + Wrapper around time.monotonic() to enables mocking in test cases. Globally mocking time.monotonic() + breaks library code (e.g. multiprocessing in RunnerSupervisor). + """ + + return time.monotonic() diff --git a/src/ctf_gameserver/checker/supervisor.py b/src/ctf_gameserver/checker/supervisor.py new file mode 100644 index 0000000..d59262e --- /dev/null +++ b/src/ctf_gameserver/checker/supervisor.py @@ -0,0 +1,321 @@ +import json +import logging +import multiprocessing +import os +import queue +import select +import signal +import subprocess +import sys + +from ctf_gameserver.lib.checkresult import CheckResult + + +ACTION_FLAG = 'FLAG' +ACTION_LOAD = 'LOAD' +ACTION_STORE = 'STORE' +ACTION_LOG = 'LOG' +ACTION_RESULT = 'RESULT' +ACTION_RUNNER_EXIT = 'RUNNER_EXIT' + +ACTIONS = [ + ACTION_FLAG, + ACTION_LOAD, + ACTION_STORE, + ACTION_LOG, + ACTION_RESULT, + ACTION_RUNNER_EXIT +] + + +class RunnerSupervisor: + """ + Launches Checker Script Runners as individual processes and takes care of communicating with them. + """ + + def __init__(self): + # Timeout if there are no requests when all Runners are done or blocking + self.queue_timeout = 1 + self._reset() + + def _reset(self): + self.work_queue = multiprocessing.Queue() + self.processes = {} + self.next_identifier = 0 + + def start_runner(self, args, info, logging_params): + logging.info('Starting Runner process, args: %s, info: %s', args, info) + receive, send = multiprocessing.Pipe(False) + proc = multiprocessing.Process(target=run_checker_script, args=(args, info, logging_params, + self.next_identifier, + self.work_queue, receive)) + self.processes[self.next_identifier] = (proc, send, info) + + proc.start() + self.next_identifier += 1 + + def terminate_runner(self, runner_id): + logging.info('Terminating Runner process, info: %s', self.processes[runner_id][2]) + self.processes[runner_id][0].terminate() + # Afterwards, get_request() will join the child and remove it from `self.processes` + + def terminate_runners(self): + if len(self.processes) > 0: + logging.warning('Terminating all %d Runner processes', len(self.processes)) + for runner_id in self.processes: + self.terminate_runner(runner_id) + + # Prevent memory leaks + self._reset() + + def get_request(self): + # Use a loop to not leak our implementation detail for ACTION_RUNNER_EXIT: Only return None when the + # queue is really empty (barring non-critical race conditions) + while True: + try: + request = self.work_queue.get(True, self.queue_timeout) + except queue.Empty: + return None + runner_id = request[0] + action = request[1] + + # Join all terminated child processes + if action == ACTION_RUNNER_EXIT: + proc = self.processes[runner_id][0] + proc.join() + del self.processes[runner_id] + if self.work_queue.empty(): + return None + else: + break + + return { + 'action': action, + 'param': request[2], + 'runner_id': runner_id, + 'send': self.processes[runner_id][1], + 'info': self.processes[runner_id][2] + } + + +def run_checker_script(args, info, logging_params, runner_id, queue_to_master, pipe_from_master): + """ + Checker Script Runner, which is supposed to already be executed in an individual process. The actual + Checker Script is then launched as another child process (one per Runner). + We're also taking care of saving logs from the Checker Script here, since passing them to the Master + process would put a lot of load on it. + """ + + runner_logger = logging.getLogger('Checker Runner: {}'.format(args)) + + class InfoFilter(logging.Filter): + """ + Log Filter which adds all metadata from an "info" dict as attributes to Log Records. + """ + def __init__(self, info): + super().__init__() + self.info = info + + def filter(self, record): + for key, value in self.info.items(): + if hasattr(record, key): + runner_logger.warning('Discarding log metadata "%s" due to a naming conflict', key) + else: + setattr(record, key, value) + return True + + script_logger = logging.getLogger('Checker Script') + script_logger.setLevel(logging.INFO) + script_logger.propagate = False + if info is not None: + script_logger.addFilter(InfoFilter(info)) + + if 'journald' in logging_params: + from systemd.journal import JournalHandler # pylint: disable=import-outside-toplevel,import-error + syslog_identifier = 'checker_{}-team{:03d}-tick{:03d}'.format(info['service'], info['team'], + info['tick']) + journal_handler = JournalHandler(SYSLOG_IDENTIFIER=syslog_identifier) + script_logger.addHandler(journal_handler) + if 'gelf' in logging_params: + import graypy # pylint: disable=import-outside-toplevel,import-error + gelf_handler = graypy.GELFHandler(logging_params['gelf']['host'], logging_params['gelf']['port']) + script_logger.addHandler(gelf_handler) + + stdout_read, stdout_write = os.pipe() + stderr_read, stderr_write = os.pipe() + ctrlin_read, ctrlin_write = os.pipe() + ctrlout_read, ctrlout_write = os.pipe() + + # File descriptor numbers after dup_ctrl_fds() + CTRLIN_FD = 3 # pylint: disable=invalid-name + CTRLOUT_FD = 4 # pylint: disable=invalid-name + + def dup_ctrl_fds(): + """ + preexec_fn for subprocess.Popen() which forces specific numbers for file descriptors within the + child. We need this because otherwise the child won't know the numbers of additional FDs. + """ + os.dup2(ctrlin_read, CTRLIN_FD) + os.close(ctrlin_read) + os.dup2(ctrlout_write, CTRLOUT_FD) + os.close(ctrlout_write) + + env = {**os.environ, 'CTF_CHECKERSCRIPT': '1'} + # Python doesn't specify if preexec_fn gets executed before or after closing file descriptors, thus we + # specify both variants as pass_fds + try: + proc = subprocess.Popen(args, env=env, shell=False, # pylint: disable=subprocess-popen-preexec-fn + stdout=stdout_write, stderr=stderr_write, + pass_fds=(CTRLIN_FD, CTRLOUT_FD, ctrlin_read, ctrlout_write), + preexec_fn=dup_ctrl_fds, start_new_session=True) + except OSError: + runner_logger.exception('Executing Checker Script failed:') + # Tell the Supervisor that we are safe to be joined + queue_to_master.put((runner_id, ACTION_RUNNER_EXIT, None)) + return + # Close the child's ends of the pipes on the parent's side + os.close(stdout_write) + os.close(stderr_write) + os.close(ctrlin_read) + os.close(ctrlout_write) + + # Kill all children when this process gets terminated (requires `start_new_session=True` above) + def sigterm_handler(_, __): + os.killpg(proc.pid, signal.SIGKILL) + sys.exit(1) + signal.signal(signal.SIGTERM, sigterm_handler) + + poll = select.poll() + poll.register(stdout_read, select.POLLIN | select.POLLERR | select.POLLHUP) + poll.register(stderr_read, select.POLLIN | select.POLLERR | select.POLLHUP) + poll.register(ctrlout_read, select.POLLIN | select.POLLERR | select.POLLHUP) + + # Loop until the child has terminated + while proc.poll() is None: + events = poll.poll() + + for event in events: + if not (event[1] & select.POLLIN): # pylint: disable=superfluous-parens + # We only care if we can read, POLLERR and POLLHUP will terminate the child anyway + continue + + fd = event[0] + try: + data = os.read(fd, 4096) + except OSError: + runner_logger.exception('Read from child pipe failed:') + continue + if len(data) == 0: + # EOF (on this file descriptor) + continue + + # Save everything the Checker Scripts writes to stdout or stderr as log message + if fd in (stdout_read, stderr_read): + script_output = data.decode('ascii', errors='backslashreplace').rstrip('\n') + script_logger.warning('[SCRIPT OUTPUT] %s', script_output) + # Communication with the Checker Script via single-line JSON objects + else: + buf = b'' + while True: + buf += data + lines = buf.split(b'\n') + for line in lines[:-1]: + try: + message = json.loads(line) + except json.JSONDecodeError: + runner_logger.error('Could not decode message from Script as JSON: %s', line) + else: + handle_script_message(message, ctrlin_write, runner_id, queue_to_master, + pipe_from_master, runner_logger, script_logger) + + if len(lines[-1]) == 0: + # We have not read a partial line + break + + buf = lines[-1] + try: + data = os.read(fd, 4096) + except OSError: + runner_logger.exception('Read from child pipe failed:') + break + + os.close(stdout_read) + os.close(stderr_read) + os.close(ctrlin_write) + os.close(ctrlout_read) + + runner_logger.info('Checker Script exited with code %d', proc.returncode) + # Tell the Supervisor that our child has exited and we are safe to be joined without blocking + queue_to_master.put((runner_id, ACTION_RUNNER_EXIT, None)) + + +def handle_script_message(message, ctrlin_fd, runner_id, queue_to_master, pipe_from_master, runner_logger, + script_logger): + """ + Processes a single message from communication with a Checker Script. Communication is always initiated + by the Checker Script, we (as the Runner) only respond. + """ + + try: + action = message['action'] + param = message['param'] + except KeyError: + runner_logger.error('Message must have "action" and "param" keys: %s', message) + return + + if action not in ACTIONS: + runner_logger.error('Message has invalid "action" key: %s', message) + return + if action == ACTION_RUNNER_EXIT: + runner_logger.error('RUNNER_EXIT messages must not be generated by the Script: %s', message) + return + + if action == ACTION_LOG: + record = make_script_log_record(param) + if record is None: + runner_logger.error('Malformed log message from the Script: %s', param) + else: + script_logger.handle(record) + return + + if action == ACTION_RESULT: + try: + result = CheckResult(int(param)) + except ValueError: + # Ignore malformed message from the Checker Script, will be logged by the Master + pass + else: + script_logger.info('CHECKER SCRIPT RESULT: %s', result.name, extra={'result': result.value}) + + queue_to_master.put((runner_id, action, param)) + response = pipe_from_master.recv() + + try: + # Make sure that our JSON consists of just a single line + response_json = json.dumps({'response': response}).replace('\n', '') + '\n' + os.write(ctrlin_fd, response_json.encode()) + except OSError: + runner_logger.exception('Write to child pipe failed:') + + +def make_script_log_record(json_record): + + # Use actual message as "args" to prevent format string injections + msg = '%s' + try: + args = (str(json_record['message']),) + except KeyError: + return None + + try: + level = int(json_record['levelno']) + except (KeyError, ValueError): + level = logging.INFO + pathname = str(json_record.get('pathname', '')) + try: + lineno = int(json_record['lineno']) + except (KeyError, ValueError): + lineno = 0 + func = str(json_record.get('funcName', '')) + + return logging.LogRecord('Checker Script', level, pathname, lineno, msg, args, None, func) diff --git a/src/ctf_gameserver/checkerlib/__init__.py b/src/ctf_gameserver/checkerlib/__init__.py new file mode 100644 index 0000000..e3ba976 --- /dev/null +++ b/src/ctf_gameserver/checkerlib/__init__.py @@ -0,0 +1 @@ +from .lib import BaseChecker, CheckResult, get_flag, load_state, run_check, store_state diff --git a/src/ctf_gameserver/checkerlib/lib.py b/src/ctf_gameserver/checkerlib/lib.py new file mode 100644 index 0000000..1cf83d0 --- /dev/null +++ b/src/ctf_gameserver/checkerlib/lib.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python3 + +import base64 +import errno +import json +import logging +import os +import pickle +import socket +import ssl +import sys +import threading +from typing import Any, Type + +import ctf_gameserver.lib.flag +from ctf_gameserver.lib.checkresult import CheckResult + + +_TIMEOUT_SECONDS = 10.0 # Default timeout for socket operations +_LOCAL_STATE_PATH = '_state.json' + +_ctrl_in = None # pylint: disable=invalid-name +_ctrl_out = None # pylint: disable=invalid-name +_ctrl_out_lock = None # pylint: disable=invalid-name + + +def _setup(): + + global _ctrl_in, _ctrl_out, _ctrl_out_lock # pylint: disable=invalid-name + if 'CTF_CHECKERSCRIPT' in os.environ: + # Launched by Checker Runner, we cannot just try to open the descriptors (and fallback if they don't + # exist) because execution environments like pytest might use them as well + _ctrl_in = os.fdopen(3, 'r') + _ctrl_out = os.fdopen(4, 'w') + else: + # Local execution without a Checker Runner + logging.basicConfig() + logging.getLogger().setLevel(logging.INFO) + return + + _ctrl_out_lock = threading.RLock() + + class JsonHandler(logging.StreamHandler): + def __init__(self): + super().__init__(_ctrl_out) + + def emit(self, record): + _ctrl_out_lock.acquire() + super().emit(record) + _ctrl_out_lock.release() + + def format(self, record): + param = { + 'message': super().format(record), + 'levelno': record.levelno, + 'pathname': record.pathname, + 'lineno': record.lineno, + 'funcName': record.funcName + } + json_message = {'action': 'LOG', 'param': param} + # Make sure that our JSON consists of just a single line + return json.dumps(json_message).replace('\n', '') + + json_handler = JsonHandler() + logging.getLogger().addHandler(json_handler) + logging.getLogger().setLevel(logging.INFO) + + socket.setdefaulttimeout(_TIMEOUT_SECONDS) + try: + import requests # pylint: disable=import-outside-toplevel + + # Ugly monkey patch to set defaults for the timeouts in requests, because requests (resp. urllib3) + # always overwrites the default socket timeout + class TimeoutSoup(requests.adapters.TimeoutSauce): + def __init__(self, total=None, connect=None, read=None): + if total is None: + total = _TIMEOUT_SECONDS + if connect is None: + connect = _TIMEOUT_SECONDS + if read is None: + read = _TIMEOUT_SECONDS + super().__init__(total, connect, read) + requests.adapters.TimeoutSauce = TimeoutSoup + except ImportError: + pass + + +_setup() + + +class BaseChecker: + """ + Base class for individual Checker implementations. Checker Scripts must implement all methods. + + Attributes: + ip: Vulnbox IP address of the team to be checked + team: ID of the team to be checked + """ + + def __init__(self, ip: str, team: int) -> None: + self.ip = ip + self.team = team + + def place_flag(self, tick: int) -> CheckResult: + raise NotImplementedError('place_flag() must be implemented by the subclass') + + def check_service(self) -> CheckResult: + raise NotImplementedError('check_service() must be implemented by the subclass') + + def check_flag(self, tick: int) -> CheckResult: + raise NotImplementedError('check_flag() must be implemented by the subclass') + + +def get_flag(tick: int, payload: bytes = b'') -> str: + """ + May be called by Checker Scripts to get the flag for a given tick, for the team and service of the + current run. The returned flag can be used for both placement and checks. + """ + + if _launched_without_runner(): + try: + team = get_flag._team # pylint: disable=protected-access + except AttributeError: + raise Exception('get_flag() must be called through run_check()') + # Return dummy flag when launched locally + if payload == b'': + payload = None + return ctf_gameserver.lib.flag.generate(team, 42, b'TOPSECRET', payload, tick) + + payload_b64 = base64.b64encode(payload).decode('ascii') + _send_ctrl_message({'action': 'FLAG', 'param': {'tick': tick, 'payload': payload_b64}}) + result = _recv_ctrl_message() + return result['response'] + + +def store_state(key: str, data: Any) -> None: + """ + Allows a Checker Script to store arbitrary Python data persistently across runs. Data is stored per + service and team with the given key as an additional identifier. + """ + + serialized_data = base64.b64encode(pickle.dumps(data)).decode('ascii') + + if not _launched_without_runner(): + message = {'key': key, 'data': serialized_data} + _send_ctrl_message({'action': 'STORE', 'param': message}) + # Wait for acknowledgement + _recv_ctrl_message() + else: + try: + with open(_LOCAL_STATE_PATH, 'r') as f: + state = json.load(f) + except FileNotFoundError: + state = {} + state[key] = serialized_data + with open(_LOCAL_STATE_PATH, 'w') as f: + json.dump(state, f, indent=4) + + +def load_state(key: str) -> Any: + """ + Allows to retrieve data stored through store_state(). If no data exists for the given key (and the + current service and team), None is returned. + """ + + if not _launched_without_runner(): + _send_ctrl_message({'action': 'LOAD', 'param': key}) + result = _recv_ctrl_message() + data = result['response'] + if data is None: + return None + else: + try: + with open(_LOCAL_STATE_PATH, 'r') as f: + state = json.load(f) + except FileNotFoundError: + return None + try: + data = state[key] + except KeyError: + return None + + return pickle.loads(base64.b64decode(data)) + + +def run_check(checker_cls: Type[BaseChecker]) -> None: + """ + Launch execution of the specified Checker implementation. Must be called by all Checker Scripts. + """ + + if len(sys.argv) != 4: + raise Exception('Invalid arguments, usage: {} '.format(sys.argv[0])) + + ip = sys.argv[1] + team = int(sys.argv[2]) + tick = int(sys.argv[3]) + + if _launched_without_runner(): + # Hack because get_flag() only needs to know the team when launched locally + get_flag._team = team # pylint: disable=protected-access + + checker = checker_cls(ip, team) + result = _run_check_steps(checker, tick) + + if not _launched_without_runner(): + _send_ctrl_message({'action': 'RESULT', 'param': result.value}) + # Wait for acknowledgement + _recv_ctrl_message() + else: + print('Check result: {}'.format(result)) + + +def _run_check_steps(checker, tick): + + tick_lookback = 5 + + try: + logging.info('Placing flag') + result = checker.place_flag(tick) + logging.info('Flag placement result: %s', result) + if result != CheckResult.OK: + return result + + logging.info('Checking service') + result = checker.check_service() + logging.info('Service check result: %s', result) + if result != CheckResult.OK: + return result + + current_tick = tick + oldest_tick = max(tick-tick_lookback, 0) + recovering = False + while current_tick >= oldest_tick: + logging.info('Checking flag of tick %d', current_tick) + result = checker.check_flag(current_tick) + logging.info('Flag check result of tick %d: %s', current_tick, result) + if result != CheckResult.OK: + if current_tick != tick and result == CheckResult.FLAG_NOT_FOUND: + recovering = True + else: + return result + current_tick -= 1 + + if recovering: + return CheckResult.RECOVERING + else: + return CheckResult.OK + except Exception as e: # pylint: disable=broad-except + if _is_timeout(e): + logging.warning('Timeout during check', exc_info=e) + return CheckResult.TIMEOUT + elif isinstance(e, ssl.SSLError): + logging.warning('SSL error during check', exc_info=e) + return CheckResult.FAULTY + else: + # Just let the Checker Script die, logging will be handled by the Runner + raise e + + +def _launched_without_runner(): + """ + Returns True if the Checker Script has been launched locally (during development) and False if it has + been launched by the Checker Script Runner (during an actual competition). + """ + return _ctrl_in is None + + +def _recv_ctrl_message(): + + message_json = _ctrl_in.readline() + return json.loads(message_json) + + +def _send_ctrl_message(message): + + # Make sure that our JSON consists of just a single line + message_json = json.dumps(message).replace('\n', '') + '\n' + + _ctrl_out_lock.acquire() + _ctrl_out.write(message_json) + _ctrl_out.flush() + _ctrl_out_lock.release() + + +def _is_timeout(exception): + """ + Checks if the given exception resembles a timeout/connection error. + """ + + timeout_exceptions = ( + EOFError, # Raised by telnetlib on timeout + socket.timeout, + ssl.SSLEOFError, + ssl.SSLZeroReturnError, + ssl.SSLWantReadError, + ssl.SSLWantWriteError + ) + try: + import urllib3 # pylint: disable=import-outside-toplevel + have_urllib3 = True + timeout_exceptions += ( + urllib3.exceptions.ConnectionError, + urllib3.exceptions.NewConnectionError, + urllib3.exceptions.ReadTimeoutError + ) + except ImportError: + have_urllib3 = False + try: + import requests # pylint: disable=import-outside-toplevel + have_requests = True + timeout_exceptions += ( + requests.exceptions.ConnectTimeout, + requests.exceptions.Timeout, + requests.packages.urllib3.exceptions.ConnectionError, + requests.packages.urllib3.exceptions.NewConnectionError + ) + except ImportError: + have_requests = False + try: + import nclib # pylint: disable=import-outside-toplevel + timeout_exceptions += (nclib.NetcatError,) + except ImportError: + pass + + if isinstance(exception, timeout_exceptions): + return True + + if isinstance(exception, OSError): + return exception.errno in ( + errno.ECONNABORTED, + errno.ECONNREFUSED, + errno.ECONNRESET, + errno.EHOSTDOWN, + errno.EHOSTUNREACH, + errno.ENETDOWN, + errno.ENETRESET, + errno.ENETUNREACH, + errno.EPIPE, + errno.ETIMEDOUT + ) + + if have_urllib3: + if isinstance(exception, urllib3.exceptions.MaxRetryError): + return _is_timeout(exception.reason) + if isinstance(exception, urllib3.exceptions.ProtocolError): + return len(exception.args) == 2 and _is_timeout(exception.args[1]) + if have_requests: + if isinstance(exception, requests.exceptions.ConnectionError): + return len(exception.args) == 1 and _is_timeout(exception.args[0]) + if isinstance(exception, requests.packages.urllib3.exceptions.MaxRetryError): + return _is_timeout(exception.reason) + if isinstance(exception, requests.packages.urllib3.exceptions.ProtocolError): + return len(exception.args) == 2 and _is_timeout(exception.args[1]) + + return False diff --git a/src/ctf_gameserver/lib/checkresult.py b/src/ctf_gameserver/lib/checkresult.py new file mode 100644 index 0000000..f4d68cc --- /dev/null +++ b/src/ctf_gameserver/lib/checkresult.py @@ -0,0 +1,17 @@ +import enum + + +class CheckResult(enum.Enum): + """ + Maps possible Checker results to their integer values. + These integers map directly to the database! (See also: "web/scoring/models.py") + """ + + OK = 0 + TIMEOUT = 1 + FAULTY = 2 + FLAG_NOT_FOUND = 3 + RECOVERING = 4 + + def __str__(self): + return self.name diff --git a/src/ctf_gameserver/lib/database.py b/src/ctf_gameserver/lib/database.py index c831605..e880d5b 100644 --- a/src/ctf_gameserver/lib/database.py +++ b/src/ctf_gameserver/lib/database.py @@ -1,4 +1,6 @@ from contextlib import contextmanager +import re +import sqlite3 @contextmanager @@ -19,6 +21,9 @@ def transaction_cursor(db_conn, always_rollback=False): # A transaction BEGINs implicitly when the previous one has been finalized cursor = db_conn.cursor() + if isinstance(cursor, sqlite3.Cursor): + cursor = _SQLite3Cursor(cursor) + try: yield cursor except: @@ -29,3 +34,59 @@ def transaction_cursor(db_conn, always_rollback=False): db_conn.rollback() else: db_conn.commit() + + +class _SQLite3Cursor: + """ + Wrapper for sqlite3.Cursor, which translates Psycopg2-style parameter format strings and SQL features + to constructs understood by SQLite. + This is quite hacky, but it should only ever be used in tests, as we don't support SQLite in production. + """ + + def __init__(self, orig_cursor): + self._orig_cursor = orig_cursor + + def __getattribute__(self, name): + # Prevent endless recursion + if name == '_orig_cursor': + return object.__getattribute__(self, name) + + if name == 'execute': + def sqlite3_execute(_, operation, *args, **kwargs): + operation = _translate_operation(operation) + return self._orig_cursor.execute(operation, *args, **kwargs) + + # Turn function into bound method (to be called on an instance) + # pylint: disable=no-value-for-parameter + sqlite3_execute_bound = sqlite3_execute.__get__(self, _SQLite3Cursor) + return sqlite3_execute_bound + + if name == 'executemany': + def sqlite3_executemany(_, operation, *args, **kwargs): + operation = _translate_operation(operation) + return self._orig_cursor.executemany(operation, *args, **kwargs) + + # pylint: disable=no-value-for-parameter + sqlite3_executemany_bound = sqlite3_executemany.__get__(self, _SQLite3Cursor) + return sqlite3_executemany_bound + + return self._orig_cursor.__getattribute__(name) + + +def _translate_operation(operation): + """ + Translates Psycopg2 features to their SQLite counterparts on a best-effort base. + """ + + # The placeholder is always "%s" in Psycopg2, "even if a different placeholder (such as a %d for + # integers or %f for floats) may look more appropriate" + operation = operation.replace('%s', '?') + operation = operation.replace('NOW()', "DATETIME('now')") + + operation = re.sub(r'\)( PRIMARY KEY \(.+\))', r',\1)', operation) + + # Apart from being a best effort, this also changes the semantics, but SQLite just doesn't support + # "FOR UPDATE" + operation = re.sub(r'FOR UPDATE OF \S+', '', operation) + + return operation diff --git a/tests/checker/fixtures/integration.json b/tests/checker/fixtures/integration.json new file mode 100644 index 0000000..057dc24 --- /dev/null +++ b/tests/checker/fixtures/integration.json @@ -0,0 +1,88 @@ +[ +{ + "model": "auth.user", + "pk": 2, + "fields": { + "password": "pbkdf2_sha256$36000$kHAF2GkRGCyG$qm+7EyJr0b8E9VbQWp3ZtfxaV0A5wIJSV/ABWEML6II=", + "last_login": null, + "is_superuser": false, + "username": "Team2", + "first_name": "", + "last_name": "", + "email": "", + "is_staff": false, + "is_active": true, + "date_joined": "2019-04-03T18:21:28.622Z", + "groups": [], + "user_permissions": [] + } +}, +{ + "model": "auth.user", + "pk": 3, + "fields": { + "password": "pbkdf2_sha256$36000$kHAF2GkRGCyG$qm+7EyJr0b8E9VbQWp3ZtfxaV0A5wIJSV/ABWEML6II=", + "last_login": null, + "is_superuser": false, + "username": "Team3", + "first_name": "", + "last_name": "", + "email": "", + "is_staff": false, + "is_active": true, + "date_joined": "2019-04-03T18:21:28.622Z", + "groups": [], + "user_permissions": [] + } +}, +{ + "model": "registration.team", + "pk": 2, + "fields": { + "informal_email": "team2@example.org", + "image": "", + "affiliation": "", + "country": "World", + "nop_team": false + } +}, +{ + "model": "registration.team", + "pk": 3, + "fields": { + "informal_email": "team3@example.org", + "image": "", + "affiliation": "", + "country": "World", + "nop_team": false + } +}, +{ + "model": "scoring.service", + "pk": 1, + "fields": { + "name": "Service 1", + "slug": "service1" + } +}, +{ + "model": "scoring.service", + "pk": 2, + "fields": { + "name": "Service 2", + "slug": "service2" + } +}, +{ + "model": "scoring.gamecontrol", + "pk": 1, + "fields": { + "start": null, + "end": null, + "tick_duration": 180, + "valid_ticks": 5, + "current_tick": -1, + "registration_open": false + } +} +] diff --git a/tests/checker/fixtures/master.json b/tests/checker/fixtures/master.json new file mode 100644 index 0000000..c0cd84f --- /dev/null +++ b/tests/checker/fixtures/master.json @@ -0,0 +1,87 @@ +[ +{ + "model": "auth.user", + "pk": 2, + "fields": { + "password": "pbkdf2_sha256$36000$kHAF2GkRGCyG$qm+7EyJr0b8E9VbQWp3ZtfxaV0A5wIJSV/ABWEML6II=", + "last_login": null, + "is_superuser": false, + "username": "Team1", + "first_name": "", + "last_name": "", + "email": "", + "is_staff": false, + "is_active": true, + "date_joined": "2019-04-03T18:21:28.622Z", + "groups": [], + "user_permissions": [] + } +}, +{ + "model": "registration.team", + "pk": 2, + "fields": { + "informal_email": "team1@example.org", + "image": "", + "affiliation": "", + "country": "World", + "nop_team": false + } +}, +{ + "model": "scoring.service", + "pk": 1, + "fields": { + "name": "Service 1", + "slug": "service1" + } +}, +{ + "model": "scoring.flag", + "pk": 1, + "fields": { + "service": 1, + "protecting_team": 2, + "tick": 1, + "placement_start": null, + "placement_end": null, + "bonus": null + } +}, +{ + "model": "scoring.flag", + "pk": 2, + "fields": { + "service": 1, + "protecting_team": 2, + "tick": 2, + "placement_start": null, + "placement_end": null, + "bonus": null + } +}, +{ + "model": "scoring.flag", + "pk": 3, + "fields": { + "service": 1, + "protecting_team": 2, + "tick": 3, + "placement_start": null, + "placement_end": null, + "bonus": null + } +}, +{ + "model": "scoring.gamecontrol", + "pk": 1, + "fields": { + "start": null, + "end": null, + "tick_duration": 180, + "valid_ticks": 5, + "current_tick": -1, + "registration_open": false + } +} +] diff --git a/tests/checker/integration_basic_checkerscript.py b/tests/checker/integration_basic_checkerscript.py new file mode 100755 index 0000000..a16f44f --- /dev/null +++ b/tests/checker/integration_basic_checkerscript.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 + +from ctf_gameserver import checkerlib + + +class TestChecker(checkerlib.BaseChecker): + + def place_flag(self, tick): + if self.team != 2: + raise Exception('Team {} != 2'.format(self.team)) + if self.ip != '0.0.2.1': + raise Exception('IP {} != 0.0.2.1'.format(self.ip)) + if tick != 0: + raise Exception('Tick {} != 0'.format(tick)) + + checkerlib.get_flag(tick) + return checkerlib.CheckResult.OK + + def check_service(self): + return checkerlib.CheckResult.OK + + def check_flag(self, tick): + if tick != 0: + raise Exception('Tick {} != 0'.format(tick)) + + checkerlib.get_flag(tick) + return checkerlib.CheckResult.OK + + +if __name__ == '__main__': + + checkerlib.run_check(TestChecker) diff --git a/tests/checker/integration_exception_checkerscript.py b/tests/checker/integration_exception_checkerscript.py new file mode 100755 index 0000000..ac1c196 --- /dev/null +++ b/tests/checker/integration_exception_checkerscript.py @@ -0,0 +1,20 @@ +#!/usr/bin/env python3 + +from ctf_gameserver import checkerlib + + +class TestChecker(checkerlib.BaseChecker): + + def place_flag(self, tick): + raise Exception('This is fine') + + def check_service(self): + return checkerlib.CheckResult.OK + + def check_flag(self, tick): + return checkerlib.CheckResult.OK + + +if __name__ == '__main__': + + checkerlib.run_check(TestChecker) diff --git a/tests/checker/integration_multi_checkerscript.py b/tests/checker/integration_multi_checkerscript.py new file mode 100755 index 0000000..5e05824 --- /dev/null +++ b/tests/checker/integration_multi_checkerscript.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python3 + +from ctf_gameserver import checkerlib + + +class TestChecker(checkerlib.BaseChecker): + + def place_flag(self, tick): + self._tick = tick # pylint: disable=attribute-defined-outside-init + + if self.team != 2 and self.team != 3: + raise Exception('Invalid team {}'.format(self.team)) + + checkerlib.get_flag(tick) + + if self.team == 2 and tick == 0: + return checkerlib.CheckResult.FAULTY + else: + return checkerlib.CheckResult.OK + + def check_service(self): + if self.team == 2 and self._tick == 1: + return checkerlib.CheckResult.TIMEOUT + else: + return checkerlib.CheckResult.OK + + def check_flag(self, tick): + checkerlib.get_flag(tick) + + if self.team == 2 and self._tick == 2: + if tick == 0: + return checkerlib.CheckResult.FLAG_NOT_FOUND + else: + return checkerlib.CheckResult.OK + elif self.team == 3 and self._tick == 1: + return checkerlib.CheckResult.FAULTY + else: + return checkerlib.CheckResult.OK + + +if __name__ == '__main__': + + checkerlib.run_check(TestChecker) diff --git a/tests/checker/integration_state_checkerscript.py b/tests/checker/integration_state_checkerscript.py new file mode 100755 index 0000000..0ca0c89 --- /dev/null +++ b/tests/checker/integration_state_checkerscript.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3 + +from ctf_gameserver import checkerlib + + +class TestChecker(checkerlib.BaseChecker): + + def place_flag(self, tick): + if checkerlib.load_state('key2') is not None: + raise Exception('Got state where there should be none') + + if tick == 0: + if checkerlib.load_state('key1') is not None: + raise Exception('Got state where there should be none') + + checkerlib.get_flag(tick) + + if self.team == 2: + if tick == 0: + checkerlib.store_state('key1', 'Wir können Zustände speichern 🥳') + else: + if checkerlib.load_state('key1') != 'Wir können Zustände speichern 🥳': + raise Exception('Did not get stored state back') + + if tick == 0: + checkerlib.store_state('🔑ser', 'Söze') + if checkerlib.load_state('🔑ser') != 'Söze': + raise Exception('Did not get stored state back') + elif tick == 1: + if checkerlib.load_state('🔑ser') != 'Söze': + raise Exception('Did not get stored state back') + checkerlib.store_state('🔑ser', ['Roger', '"Verbal"', 'Kint']) + elif tick == 2: + if checkerlib.load_state('🔑ser') != ['Roger', '"Verbal"', 'Kint']: + raise Exception('Did not get stored state back') + elif self.team == 3: + if tick == 1: + if checkerlib.load_state('key1') is not None: + raise Exception('Got state where there should be none') + data = [{'number': 42}, {'number': 1337}] + checkerlib.store_state('key1', data) + elif tick >= 2: + if checkerlib.load_state('key1') != [{'number': 42}, {'number': 1337}]: + raise Exception('Did not get stored state back') + + return checkerlib.CheckResult.OK + + def check_service(self): + return checkerlib.CheckResult.OK + + def check_flag(self, tick): + return checkerlib.CheckResult.OK + + +if __name__ == '__main__': + + checkerlib.run_check(TestChecker) diff --git a/tests/checker/integration_sudo_checkerscript.py b/tests/checker/integration_sudo_checkerscript.py new file mode 100755 index 0000000..dabda00 --- /dev/null +++ b/tests/checker/integration_sudo_checkerscript.py @@ -0,0 +1,32 @@ +#!/usr/bin/env python3 + +import os + +from ctf_gameserver import checkerlib + + +class TestChecker(checkerlib.BaseChecker): + + def place_flag(self, tick): + checkerlib.get_flag(tick) + + # Try to send a signal to our parent, this should not be possible when running as another user + parent_pid = os.getppid() + try: + os.kill(parent_pid, 0) + except PermissionError: + return checkerlib.CheckResult.OK + + raise Exception('Should not be able to kill the parent') + + def check_service(self): + return checkerlib.CheckResult.OK + + def check_flag(self, tick): + checkerlib.get_flag(tick) + return checkerlib.CheckResult.OK + + +if __name__ == '__main__': + + checkerlib.run_check(TestChecker) diff --git a/tests/checker/integration_timeout_checkerscript.py b/tests/checker/integration_timeout_checkerscript.py new file mode 100755 index 0000000..7ffc03c --- /dev/null +++ b/tests/checker/integration_timeout_checkerscript.py @@ -0,0 +1,24 @@ +#!/usr/bin/env python3 + +import errno + +from ctf_gameserver import checkerlib + + +class TestChecker(checkerlib.BaseChecker): + + def place_flag(self, tick): + checkerlib.get_flag(tick) + raise OSError(errno.ETIMEDOUT, 'A timeout occured') + + def check_service(self): + return checkerlib.CheckResult.OK + + def check_flag(self, tick): + checkerlib.get_flag(tick) + return checkerlib.CheckResult.OK + + +if __name__ == '__main__': + + checkerlib.run_check(TestChecker) diff --git a/tests/checker/integration_unfinished_checkerscript.py b/tests/checker/integration_unfinished_checkerscript.py new file mode 100755 index 0000000..45df0f5 --- /dev/null +++ b/tests/checker/integration_unfinished_checkerscript.py @@ -0,0 +1,18 @@ +#!/usr/bin/env python3 + +import os +import time + +from ctf_gameserver import checkerlib + + +if __name__ == '__main__': + + pidfile_path = os.environ['CHECKERSCRIPT_PIDFILE'] # pylint: disable=invalid-name + with open(pidfile_path, 'w') as pidfile: + pidfile.write(str(os.getpid())) + + checkerlib.get_flag(1) + + while True: + time.sleep(10) diff --git a/tests/checker/test_integration.py b/tests/checker/test_integration.py new file mode 100644 index 0000000..dd3132b --- /dev/null +++ b/tests/checker/test_integration.py @@ -0,0 +1,449 @@ +import os.path +from unittest import SkipTest +from unittest.mock import patch +import sqlite3 +import tempfile +import time + +from ctf_gameserver.checker.master import MasterLoop +from ctf_gameserver.lib.checkresult import CheckResult +from ctf_gameserver.lib.database import transaction_cursor +from ctf_gameserver.lib.test_util import DatabaseTestCase + + +class IntegrationTest(DatabaseTestCase): + + fixtures = ['tests/checker/fixtures/integration.json'] + + def setUp(self): + self.state_db_conn = sqlite3.connect(':memory:') + with transaction_cursor(self.state_db_conn) as cursor: + cursor.execute('CREATE TABLE checkerstate (' + ' team_id INTEGER,' + ' service_id INTEGER,' + ' identifier CHARACTER VARYING (128),' + ' data TEXT' + ') PRIMARY KEY (team_id, service_id, identifier)') + + @patch('ctf_gameserver.checker.master.get_monotonic_time') + def test_basic(self, monotonic_mock): + checkerscript_path = os.path.join(os.path.dirname(__file__), 'integration_basic_checkerscript.py') + + monotonic_mock.return_value = 10 + master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, + 90, 1, 10, '0.0.%s.1', b'secret', {}) + + master_loop.supervisor.queue_timeout = 0.01 + # Sanity check before any tick + self.assertFalse(master_loop.step()) + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck') + self.assertEqual(cursor.fetchone()[0], 0) + + # Start tick + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=0') + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, 2, 0)') + # Checker won't get started because interval is not yet over + self.assertFalse(master_loop.step()) + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag WHERE placement_start IS NOT NULL') + self.assertEqual(cursor.fetchone()[0], 0) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck') + self.assertEqual(cursor.fetchone()[0], 0) + + # Interval is over, Checker Script gets started + monotonic_mock.return_value = 20 + # Will return False because no messages yet + self.assertFalse(master_loop.step()) + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag WHERE placement_start IS NOT NULL') + self.assertEqual(cursor.fetchone()[0], 1) + + master_loop.supervisor.queue_timeout = 10 + # Handle all messages from Checker Script + while master_loop.step(): + pass + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag WHERE placement_end IS NOT NULL') + self.assertEqual(cursor.fetchone()[0], 1) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck') + self.assertEqual(cursor.fetchone()[0], 1) + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id=1 AND team_id=2 AND tick=0') + self.assertEqual(cursor.fetchone()[0], CheckResult.OK.value) + + @patch('ctf_gameserver.checker.master.get_monotonic_time') + def test_missing_checkerscript(self, monotonic_mock): + checkerscript_path = os.path.join(os.path.dirname(__file__), 'does not exist') + + monotonic_mock.return_value = 10 + master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, + 90, 1, 10, '0.0.%s.1', b'secret', {}) + + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=0') + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, 2, 0)') + monotonic_mock.return_value = 20 + + master_loop.supervisor.queue_timeout = 0.01 + # Checker Script gets started, will return False because no messages yet + self.assertFalse(master_loop.step()) + + master_loop.supervisor.queue_timeout = 10 + while master_loop.step(): + pass + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag' + ' WHERE placement_start IS NOT NULL AND placement_end IS NULL') + self.assertEqual(cursor.fetchone()[0], 1) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck') + self.assertEqual(cursor.fetchone()[0], 0) + + @patch('ctf_gameserver.checker.master.get_monotonic_time') + def test_exception(self, monotonic_mock): + checkerscript_path = os.path.join(os.path.dirname(__file__), + 'integration_exception_checkerscript.py') + + monotonic_mock.return_value = 10 + master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, + 90, 1, 10, '0.0.%s.1', b'secret', {}) + + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=0') + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, 2, 0)') + monotonic_mock.return_value = 20 + + master_loop.supervisor.queue_timeout = 0.01 + # Checker Script gets started, will return False because no messages yet + self.assertFalse(master_loop.step()) + + master_loop.supervisor.queue_timeout = 10 + while master_loop.step(): + pass + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag' + ' WHERE placement_start IS NOT NULL AND placement_end IS NULL') + self.assertEqual(cursor.fetchone()[0], 1) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck') + self.assertEqual(cursor.fetchone()[0], 0) + + @patch('ctf_gameserver.checker.master.get_monotonic_time') + def test_timeout(self, monotonic_mock): + checkerscript_path = os.path.join(os.path.dirname(__file__), + 'integration_timeout_checkerscript.py') + + monotonic_mock.return_value = 10 + master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, + 90, 1, 10, '0.0.%s.1', b'secret', {}) + + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=0') + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, 2, 0)') + monotonic_mock.return_value = 20 + + master_loop.supervisor.queue_timeout = 0.01 + # Checker Script gets started, will return False because no messages yet + self.assertFalse(master_loop.step()) + + master_loop.supervisor.queue_timeout = 10 + while master_loop.step(): + pass + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag WHERE placement_end IS NOT NULL') + self.assertEqual(cursor.fetchone()[0], 1) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck') + self.assertEqual(cursor.fetchone()[0], 1) + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id=1 AND team_id=2 AND tick=0') + self.assertEqual(cursor.fetchone()[0], CheckResult.TIMEOUT.value) + + @patch('logging.warning') + @patch('ctf_gameserver.checker.master.get_monotonic_time') + def test_unfinished(self, monotonic_mock, warning_mock): + checkerscript_path = os.path.join(os.path.dirname(__file__), + 'integration_unfinished_checkerscript.py') + + checkerscript_pidfile = tempfile.NamedTemporaryFile() + os.environ['CHECKERSCRIPT_PIDFILE'] = checkerscript_pidfile.name + + monotonic_mock.return_value = 10 + master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, + 90, 1, 10, '0.0.%s.1', b'secret', {}) + + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=0') + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, 2, 0)') + monotonic_mock.return_value = 20 + + master_loop.supervisor.queue_timeout = 0.01 + # Checker Script gets started, will return False because no messages yet + self.assertFalse(master_loop.step()) + master_loop.supervisor.queue_timeout = 10 + self.assertTrue(master_loop.step()) + + checkerscript_pidfile.seek(0) + checkerscript_pid = int(checkerscript_pidfile.read()) + # Ensure process is running by sending signal 0 + os.kill(checkerscript_pid, 0) + + master_loop.supervisor.queue_timeout = 0.01 + monotonic_mock.return_value = 50 + self.assertFalse(master_loop.step()) + # Process should still be running + os.kill(checkerscript_pid, 0) + + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=1') + monotonic_mock.return_value = 190 + self.assertFalse(master_loop.step()) + # Poll whether the process has been killed + for _ in range(100): + try: + os.kill(checkerscript_pid, 0) + except ProcessLookupError: + break + time.sleep(0.1) + with self.assertRaises(ProcessLookupError): + os.kill(checkerscript_pid, 0) + + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag' + ' WHERE placement_start IS NOT NULL AND placement_end IS NULL') + self.assertEqual(cursor.fetchone()[0], 1) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck') + self.assertEqual(cursor.fetchone()[0], 0) + + warning_mock.assert_called_with('Terminating all %d Runner processes', 1) + + del os.environ['CHECKERSCRIPT_PIDFILE'] + checkerscript_pidfile.close() + + @patch('ctf_gameserver.checker.master.get_monotonic_time') + def test_multi_teams_ticks(self, monotonic_mock): + checkerscript_path = os.path.join(os.path.dirname(__file__), + 'integration_multi_checkerscript.py') + + monotonic_mock.return_value = 10 + master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, + 90, 1, 10, '0.0.%s.1', b'secret', {}) + + # Tick 0 + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=0') + # Also add flags for service 2 (which does not get checked) to make sure it won't get touched + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, 2, 0), (1, 3, 0), (2, 2, 0), (2, 3, 0)') + monotonic_mock.return_value = 20 + master_loop.supervisor.queue_timeout = 0.01 + self.assertFalse(master_loop.step()) + monotonic_mock.return_value = 100 + master_loop.supervisor.queue_timeout = 10 + while master_loop.step() or master_loop.get_running_script_count() > 0: + pass + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag' + ' WHERE placement_end IS NOT NULL') + self.assertEqual(cursor.fetchone()[0], 2) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck') + self.assertEqual(cursor.fetchone()[0], 2) + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id=1 AND team_id=2 AND tick=0') + self.assertEqual(cursor.fetchone()[0], CheckResult.FAULTY.value) + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id=1 AND team_id=3 AND tick=0') + self.assertEqual(cursor.fetchone()[0], CheckResult.OK.value) + + # Tick 1 + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=1') + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, 2, 1), (1, 3, 1), (2, 2, 1), (2, 3, 1)') + monotonic_mock.return_value = 200 + master_loop.supervisor.queue_timeout = 0.01 + self.assertFalse(master_loop.step()) + monotonic_mock.return_value = 280 + master_loop.supervisor.queue_timeout = 10 + while master_loop.step() or master_loop.get_running_script_count() > 0: + pass + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag' + ' WHERE placement_end IS NOT NULL') + self.assertEqual(cursor.fetchone()[0], 4) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck') + self.assertEqual(cursor.fetchone()[0], 4) + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id=1 AND team_id=2 AND tick=0') + self.assertEqual(cursor.fetchone()[0], CheckResult.FAULTY.value) + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id=1 AND team_id=3 AND tick=0') + self.assertEqual(cursor.fetchone()[0], CheckResult.OK.value) + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id=1 AND team_id=2 AND tick=1') + self.assertEqual(cursor.fetchone()[0], CheckResult.TIMEOUT.value) + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id=1 AND team_id=3 AND tick=1') + self.assertEqual(cursor.fetchone()[0], CheckResult.FAULTY.value) + + # Tick 2 + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=2') + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, 2, 2), (1, 3, 2), (2, 2, 2), (2, 3, 2)') + monotonic_mock.return_value = 380 + master_loop.supervisor.queue_timeout = 0.01 + self.assertFalse(master_loop.step()) + monotonic_mock.return_value = 460 + master_loop.supervisor.queue_timeout = 10 + while master_loop.step() or master_loop.get_running_script_count() > 0: + pass + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag' + ' WHERE placement_end IS NOT NULL') + self.assertEqual(cursor.fetchone()[0], 6) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck') + self.assertEqual(cursor.fetchone()[0], 6) + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id=1 AND team_id=2 AND tick=2') + self.assertEqual(cursor.fetchone()[0], CheckResult.RECOVERING.value) + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id=1 AND team_id=3 AND tick=2') + self.assertEqual(cursor.fetchone()[0], CheckResult.OK.value) + + @patch('ctf_gameserver.checker.master.get_monotonic_time') + def test_state(self, monotonic_mock): + checkerscript_path = os.path.join(os.path.dirname(__file__), + 'integration_state_checkerscript.py') + + monotonic_mock.return_value = 10 + master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, + 90, 1, 10, '0.0.%s.1', b'secret', {}) + + with transaction_cursor(self.state_db_conn) as cursor: + # Prepopulate state for the non-checked service to ensure we'll never get this data returned + data = 'gAN9cQBYAwAAAGZvb3EBWAMAAABiYXJxAnMu' + cursor.execute('INSERT INTO checkerstate (team_id, service_id, identifier, data)' + ' VALUES (2, 2, %s, %s), (3, 2, %s, %s)', ('key1', data, 'key2', data)) + + # Tick 0 + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=0') + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, 2, 0), (1, 3, 0)') + monotonic_mock.return_value = 20 + master_loop.supervisor.queue_timeout = 0.01 + self.assertFalse(master_loop.step()) + monotonic_mock.return_value = 100 + master_loop.supervisor.queue_timeout = 10 + while master_loop.step() or master_loop.get_running_script_count() > 0: + pass + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag' + ' WHERE placement_end IS NOT NULL') + self.assertEqual(cursor.fetchone()[0], 2) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck WHERE status=%s', + (CheckResult.OK.value,)) + self.assertEqual(cursor.fetchone()[0], 2) + + # Tick 1 + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=1') + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, 2, 1), (1, 3, 1)') + monotonic_mock.return_value = 200 + master_loop.supervisor.queue_timeout = 0.01 + self.assertFalse(master_loop.step()) + monotonic_mock.return_value = 280 + master_loop.supervisor.queue_timeout = 10 + while master_loop.step() or master_loop.get_running_script_count() > 0: + pass + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag' + ' WHERE placement_end IS NOT NULL') + self.assertEqual(cursor.fetchone()[0], 4) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck WHERE status=%s', + (CheckResult.OK.value,)) + self.assertEqual(cursor.fetchone()[0], 4) + + # Tick 2 + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=2') + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, 2, 2), (1, 3, 2)') + monotonic_mock.return_value = 380 + master_loop.supervisor.queue_timeout = 0.01 + self.assertFalse(master_loop.step()) + monotonic_mock.return_value = 460 + master_loop.supervisor.queue_timeout = 10 + while master_loop.step() or master_loop.get_running_script_count() > 0: + pass + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag' + ' WHERE placement_end IS NOT NULL') + self.assertEqual(cursor.fetchone()[0], 6) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck WHERE status=%s', + (CheckResult.OK.value,)) + self.assertEqual(cursor.fetchone()[0], 6) + + @patch('ctf_gameserver.checker.master.get_monotonic_time') + def test_shutdown(self, monotonic_mock): + checkerscript_path = '/dev/null' + + monotonic_mock.return_value = 10 + master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, None, + 90, 1, 10, '0.0.%s.1', b'secret', {}) + + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=0') + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, 2, 0)') + + master_loop.shutting_down = True + master_loop.supervisor.queue_timeout = 0.01 + monotonic_mock.return_value = 20 + # Will return False because no messages yet + self.assertFalse(master_loop.step()) + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag WHERE placement_start IS NOT NULL') + self.assertEqual(cursor.fetchone()[0], 0) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck') + self.assertEqual(cursor.fetchone()[0], 0) + + @patch('ctf_gameserver.checker.master.get_monotonic_time') + def test_sudo(self, monotonic_mock): + if not os.path.exists('/etc/sudoers.d/ctf-checker'): + raise SkipTest('sudo config not available') + + checkerscript_path = os.path.join(os.path.dirname(__file__), + 'integration_sudo_checkerscript.py') + + monotonic_mock.return_value = 10 + master_loop = MasterLoop(self.connection, self.state_db_conn, 'service1', checkerscript_path, + 'ctf-checkerrunner', 90, 1, 10, '0.0.%s.1', b'secret', {}) + + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=0') + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, 2, 0)') + monotonic_mock.return_value = 20 + + master_loop.supervisor.queue_timeout = 0.01 + # Checker Script gets started, will return False because no messages yet + self.assertFalse(master_loop.step()) + + master_loop.supervisor.queue_timeout = 10 + while master_loop.step(): + pass + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_flag WHERE placement_end IS NOT NULL') + self.assertEqual(cursor.fetchone()[0], 1) + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck') + self.assertEqual(cursor.fetchone()[0], 1) + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id=1 AND team_id=2 AND tick=0') + self.assertEqual(cursor.fetchone()[0], CheckResult.OK.value) diff --git a/tests/checker/test_master.py b/tests/checker/test_master.py new file mode 100644 index 0000000..f8e62df --- /dev/null +++ b/tests/checker/test_master.py @@ -0,0 +1,130 @@ +import datetime +from unittest.mock import patch + +from ctf_gameserver.checker.master import MasterLoop +from ctf_gameserver.lib.checkresult import CheckResult +from ctf_gameserver.lib.database import transaction_cursor +from ctf_gameserver.lib.test_util import DatabaseTestCase + + +class MasterTest(DatabaseTestCase): + + fixtures = ['tests/checker/fixtures/master.json'] + + def setUp(self): + self.master_loop = MasterLoop(self.connection, None, 'service1', '/dev/null', None, 90, 8, 10, + '0.0.%s.1', b'secret', {}) + + @patch('datetime.datetime') + def test_handle_flag_request(self, datetime_mock): + datetime_mock.utcnow.return_value = datetime.datetime.utcfromtimestamp(42) + + task_info = { + 'service': 'Service 1', + 'team': 2, + 'tick': 1 + } + + params1 = {} + resp1 = self.master_loop.handle_flag_request(task_info, params1) + params2 = {} + resp2 = self.master_loop.handle_flag_request(task_info, params2) + params3 = {'payload': 'TmV2ZXIgZ28='} + resp3 = self.master_loop.handle_flag_request(task_info, params3) + + self.assertEqual(resp1, resp2) + self.assertNotEqual(resp1, resp3) + + def test_handle_result_request(self): + task_info = { + 'service': 'Service 1', + 'team': 2, + 'tick': 1 + } + param = CheckResult.OK.value + start_time = datetime.datetime.utcnow().replace(microsecond=0) + self.assertIsNone(self.master_loop.handle_result_request(task_info, param)) + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT COUNT(*) FROM scoring_statuscheck') + self.assertEqual(cursor.fetchone()[0], 1) + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id = 1 AND team_id = 2 AND tick = 1') + self.assertEqual(cursor.fetchone()[0], CheckResult.OK.value) + cursor.execute('SELECT placement_end FROM scoring_flag' + ' WHERE service_id = 1 AND protecting_team_id = 2 AND tick = 1') + self.assertGreaterEqual(cursor.fetchone()[0], start_time) + + task_info['tick'] = 2 + param = CheckResult.FAULTY.value + start_time = datetime.datetime.utcnow().replace(microsecond=0) + self.assertIsNone(self.master_loop.handle_result_request(task_info, param)) + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id = 1 AND team_id = 2 AND tick = 2') + self.assertEqual(cursor.fetchone()[0], CheckResult.FAULTY.value) + cursor.execute('SELECT placement_end FROM scoring_flag' + ' WHERE service_id = 1 AND protecting_team_id = 2 AND tick = 2') + self.assertGreaterEqual(cursor.fetchone()[0], start_time) + + task_info['tick'] = 3 + param = 'Not an int' + self.assertIsNone(self.master_loop.handle_result_request(task_info, param)) + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id = 1 AND team_id = 2 AND tick = 3') + self.assertIsNone(cursor.fetchone()) + cursor.execute('SELECT placement_end FROM scoring_flag' + ' WHERE service_id = 1 AND protecting_team_id = 2 AND tick = 3') + self.assertIsNone(cursor.fetchone()[0]) + + param = 1337 + self.assertIsNone(self.master_loop.handle_result_request(task_info, param)) + with transaction_cursor(self.connection) as cursor: + cursor.execute('SELECT status FROM scoring_statuscheck' + ' WHERE service_id = 1 AND team_id = 2 AND tick = 3') + self.assertIsNone(cursor.fetchone()) + cursor.execute('SELECT placement_end FROM scoring_flag' + ' WHERE service_id = 1 AND protecting_team_id = 2 AND tick = 3') + self.assertIsNone(cursor.fetchone()[0]) + + def test_update_launch_params(self): + self.master_loop.update_launch_params() + self.assertEqual(self.master_loop.tasks_per_launch, 0) + + with transaction_cursor(self.connection) as cursor: + cursor.execute('UPDATE scoring_gamecontrol SET current_tick=1') + self.master_loop.update_launch_params() + self.assertEqual(self.master_loop.tasks_per_launch, 1) + + with transaction_cursor(self.connection) as cursor: + for i in range(10, 400): + username = 'team{}'.format(i) + email = '{}@example.org'.format(username) + cursor.execute('INSERT INTO auth_user (id, username, first_name, last_name, email, password,' + ' is_superuser, is_staff, is_active, date_joined)' + ' VALUES (%s, %s, %s, %s, %s, %s, false, false, true, NOW())', + (i, username, '', '', '', 'password')) + cursor.execute('INSERT INTO registration_team (user_id, informal_email, image, affiliation,' + ' country, nop_team)' + ' VALUES (%s, %s, %s, %s, %s, false)', (i, email, '', '', 'World')) + cursor.execute('INSERT INTO scoring_flag (service_id, protecting_team_id, tick)' + ' VALUES (1, %s, 1)', (i,)) + self.master_loop.update_launch_params() + self.assertEqual(self.master_loop.tasks_per_launch, 9) + + self.master_loop.tick_duration = datetime.timedelta(seconds=360) + self.master_loop.update_launch_params() + self.assertEqual(self.master_loop.tasks_per_launch, 3) + + self.master_loop.tick_duration = datetime.timedelta(seconds=180) + self.master_loop.interval = 5 + self.master_loop.update_launch_params() + self.assertEqual(self.master_loop.tasks_per_launch, 5) + + self.master_loop.max_check_duration = 30 + self.master_loop.update_launch_params() + self.assertEqual(self.master_loop.tasks_per_launch, 3) + + self.master_loop.max_check_duration = 3600 + with self.assertRaises(ValueError): + self.master_loop.update_launch_params() diff --git a/tests/checkerlib/test_local.py b/tests/checkerlib/test_local.py new file mode 100644 index 0000000..4f5781c --- /dev/null +++ b/tests/checkerlib/test_local.py @@ -0,0 +1,215 @@ +import io +import os +import socket +import sys +import tempfile +from unittest import TestCase +from unittest.mock import Mock, call, patch + +from ctf_gameserver import checkerlib +from ctf_gameserver.checkerlib import CheckResult + + +class LocalTest(TestCase): + """ + Test case for the checkerlib being run locally (such as during development). + """ + + def setUp(self): + self.old_working_dir = os.getcwd() + self.working_dir = tempfile.TemporaryDirectory() + os.chdir(self.working_dir.name) + + def tearDown(self): + os.chdir(self.old_working_dir) + self.working_dir.cleanup() + + def test_get_flag(self): + checkerlib.get_flag._team = 1 # pylint: disable=protected-access + team1_tick1_flag1 = checkerlib.get_flag(1, b'fooobaar') + team1_tick2_flag1 = checkerlib.get_flag(2, b'fooobaar') + team1_tick1_flag2 = checkerlib.get_flag(1, b'fooobaar') + team1_tick2_flag2 = checkerlib.get_flag(2) + checkerlib.get_flag._team = 2 # pylint: disable=protected-access + team2_tick1_flag1 = checkerlib.get_flag(1, b'fooobaar') + + self.assertEqual(team1_tick1_flag1, team1_tick1_flag2) + self.assertNotEqual(team1_tick1_flag1, team1_tick2_flag1) + self.assertNotEqual(team1_tick2_flag1, team1_tick2_flag2) + self.assertNotEqual(team1_tick1_flag1, team2_tick1_flag1) + + def test_state_primitive(self): + self.assertIsNone(checkerlib.load_state('primitive')) + + checkerlib.store_state('primitive', 1337) + self.assertEqual(checkerlib.load_state('primitive'), 1337) + + def test_state_object(self): + self.assertIsNone(checkerlib.load_state('object')) + + obj = {'data': [b'foo', b'bar']} + checkerlib.store_state('object', obj) + self.assertEqual(checkerlib.load_state('object'), obj) + + @patch.object(sys, 'argv', ['argv-0', '0.0.0.0', '42', '10']) + @patch('sys.stdout', new_callable=io.StringIO) + def test_run_check_basic(self, stdout_io): + MockChecker.reset_mocks() + MockChecker.place_mock.return_value = CheckResult.OK + MockChecker.service_mock.return_value = CheckResult.OK + MockChecker.flag_mock.return_value = CheckResult.OK + + checkerlib.run_check(MockChecker) + + MockChecker.place_mock.assert_called_once_with(10) + MockChecker.service_mock.assert_called_once() + self.assertEqual(MockChecker.flag_mock.call_count, 6) + self.assertEqual(MockChecker.flag_mock.call_args_list, [call(10), call(9), call(8), call(7), call(6), + call(5)]) + self.assertEqual(stdout_io.getvalue(), 'Check result: OK\n') + + @patch.object(sys, 'argv', ['argv-0', '0.0.0.0', '42', '0']) + @patch('sys.stdout', new_callable=io.StringIO) + def test_run_check_tick0(self, stdout_io): + MockChecker.reset_mocks() + MockChecker.place_mock.return_value = CheckResult.OK + MockChecker.service_mock.return_value = CheckResult.OK + MockChecker.flag_mock.return_value = CheckResult.OK + + checkerlib.run_check(MockChecker) + + MockChecker.place_mock.assert_called_once_with(0) + MockChecker.service_mock.assert_called_once() + MockChecker.flag_mock.assert_called_once_with(0) + self.assertEqual(stdout_io.getvalue(), 'Check result: OK\n') + + @patch.object(sys, 'argv', ['argv-0', '0.0.0.0', '42', '3']) + @patch('sys.stdout', new_callable=io.StringIO) + def test_run_check_tick3(self, stdout_io): + MockChecker.reset_mocks() + MockChecker.place_mock.return_value = CheckResult.OK + MockChecker.service_mock.return_value = CheckResult.OK + MockChecker.flag_mock.return_value = CheckResult.OK + + checkerlib.run_check(MockChecker) + + MockChecker.place_mock.assert_called_once_with(3) + MockChecker.service_mock.assert_called_once() + self.assertEqual(MockChecker.flag_mock.call_count, 4) + self.assertEqual(MockChecker.flag_mock.call_args_list, [call(3), call(2), call(1), call(0)]) + self.assertEqual(stdout_io.getvalue(), 'Check result: OK\n') + + @patch.object(sys, 'argv', ['argv-0', '0.0.0.0', '42', '10']) + @patch('sys.stdout', new_callable=io.StringIO) + def test_run_check_place_fail(self, stdout_io): + MockChecker.reset_mocks() + MockChecker.place_mock.return_value = CheckResult.TIMEOUT + MockChecker.service_mock.return_value = CheckResult.OK + MockChecker.flag_mock.return_value = CheckResult.OK + + checkerlib.run_check(MockChecker) + + MockChecker.place_mock.assert_called_once_with(10) + MockChecker.service_mock.assert_not_called() + MockChecker.flag_mock.assert_not_called() + self.assertEqual(stdout_io.getvalue(), 'Check result: TIMEOUT\n') + + @patch.object(sys, 'argv', ['argv-0', '0.0.0.0', '42', '10']) + @patch('sys.stdout', new_callable=io.StringIO) + def test_run_check_service_fail(self, stdout_io): + MockChecker.reset_mocks() + MockChecker.place_mock.return_value = CheckResult.OK + MockChecker.service_mock.return_value = CheckResult.FAULTY + MockChecker.flag_mock.return_value = CheckResult.OK + + checkerlib.run_check(MockChecker) + + MockChecker.place_mock.assert_called_once_with(10) + MockChecker.service_mock.assert_called_once() + MockChecker.flag_mock.assert_not_called() + self.assertEqual(stdout_io.getvalue(), 'Check result: FAULTY\n') + + @patch.object(sys, 'argv', ['argv-0', '0.0.0.0', '42', '10']) + @patch('sys.stdout', new_callable=io.StringIO) + def test_run_check_flag_fail(self, stdout_io): + MockChecker.reset_mocks() + MockChecker.place_mock.return_value = CheckResult.OK + MockChecker.service_mock.return_value = CheckResult.OK + MockChecker.flag_mock.return_value = CheckResult.FLAG_NOT_FOUND + + checkerlib.run_check(MockChecker) + + MockChecker.place_mock.assert_called_once_with(10) + MockChecker.service_mock.assert_called_once() + MockChecker.flag_mock.assert_called_once_with(10) + self.assertEqual(stdout_io.getvalue(), 'Check result: FLAG_NOT_FOUND\n') + + @patch.object(sys, 'argv', ['argv-0', '0.0.0.0', '42', '10']) + @patch('sys.stdout', new_callable=io.StringIO) + def test_run_check_recovering(self, stdout_io): + MockChecker.reset_mocks() + MockChecker.place_mock.return_value = CheckResult.OK + MockChecker.service_mock.return_value = CheckResult.OK + MockChecker.flag_mock.side_effect = [CheckResult.OK, CheckResult.OK, CheckResult.FLAG_NOT_FOUND, + CheckResult.OK, CheckResult.OK, CheckResult.OK] + + checkerlib.run_check(MockChecker) + + MockChecker.place_mock.assert_called_once_with(10) + MockChecker.service_mock.assert_called_once() + self.assertEqual(MockChecker.flag_mock.call_count, 6) + self.assertEqual(stdout_io.getvalue(), 'Check result: RECOVERING\n') + + @patch.object(sys, 'argv', ['argv-0', '0.0.0.0', '42', '10']) + @patch('sys.stdout', new_callable=io.StringIO) + def test_run_check_not_recovering(self, stdout_io): + MockChecker.reset_mocks() + MockChecker.place_mock.return_value = CheckResult.OK + MockChecker.service_mock.return_value = CheckResult.OK + MockChecker.flag_mock.side_effect = [CheckResult.OK, CheckResult.OK, CheckResult.FLAG_NOT_FOUND, + CheckResult.OK, CheckResult.FAULTY] + + checkerlib.run_check(MockChecker) + + MockChecker.place_mock.assert_called_once_with(10) + MockChecker.service_mock.assert_called_once() + self.assertEqual(MockChecker.flag_mock.call_count, 5) + self.assertEqual(stdout_io.getvalue(), 'Check result: FAULTY\n') + + @patch.object(sys, 'argv', ['argv-0', '0.0.0.0', '42', '10']) + def test_run_check_exception(self): + # pylint: disable=abstract-method + class ExceptionChecker(checkerlib.BaseChecker): + def place_flag(self, tick): + raise ValueError() + + with self.assertRaises(ValueError): + checkerlib.run_check(ExceptionChecker) + + @patch.object(sys, 'argv', ['argv-0', '0.0.0.0', '42', '10']) + @patch('sys.stdout', new_callable=io.StringIO) + def test_run_check_exception_timeout(self, stdout_io): + # pylint: disable=abstract-method + class ExceptionChecker(checkerlib.BaseChecker): + def place_flag(self, tick): + raise socket.timeout() + + checkerlib.run_check(ExceptionChecker) + self.assertEqual(stdout_io.getvalue(), 'Check result: TIMEOUT\n') + + +class MockChecker(checkerlib.BaseChecker): + @classmethod + def reset_mocks(cls): + cls.place_mock = Mock() + cls.service_mock = Mock() + cls.flag_mock = Mock() + + def place_flag(self, tick): + return self.__class__.place_mock(tick) + + def check_service(self): + return self.__class__.service_mock() + + def check_flag(self, tick): + return self.__class__.flag_mock(tick)