From 854de72ab8d310d6501e037bb122288cb15d98dd Mon Sep 17 00:00:00 2001 From: Elio Gubser Date: Sun, 17 Jul 2016 08:06:00 +0200 Subject: [PATCH] ecnspider should output one observation instead of two flows --- examples/webtest.csv | 12 ++-- pathspider/plugins/ecnspider3.py | 99 +++++++++++++++++++++++++------- 2 files changed, 84 insertions(+), 27 deletions(-) diff --git a/examples/webtest.csv b/examples/webtest.csv index dcde5df..127275a 100644 --- a/examples/webtest.csv +++ b/examples/webtest.csv @@ -1,6 +1,6 @@ -160.85.31.173,80,mami-project.eu -139.133.210.32,80,erg.abdn.ac.uk -2001:630:241:210:569f:35ff:fe0a:116a,80,erg.abdn.ac.uk -129.132.52.158,80,ecn.ethz.ch -2001:67c:10ec:36c2::61,80,ecn.ethz.ch -139.133.1.4,80,abdn.ac.uk +160.85.31.173,80,mami-project.eu,1 +139.133.210.32,80,erg.abdn.ac.uk,2 +2001:630:241:210:569f:35ff:fe0a:116a,80,erg.abdn.ac.uk,3 +129.132.52.158,80,ecn.ethz.ch,4 +2001:67c:10ec:36c2::61,80,ecn.ethz.ch,5 +139.133.1.4,80,abdn.ac.uk,6 diff --git a/pathspider/plugins/ecnspider3.py b/pathspider/plugins/ecnspider3.py index 3a1c72b..556ebb7 100644 --- a/pathspider/plugins/ecnspider3.py +++ b/pathspider/plugins/ecnspider3.py @@ -3,6 +3,7 @@ import logging import subprocess import traceback +from datetime import datetime import socket import collections @@ -13,14 +14,13 @@ from pathspider.observer import Observer from pathspider.observer import basic_flow from pathspider.observer import basic_count - from pathspider.observer.tcp import tcp_setup from pathspider.observer.tcp import tcp_complete -Connection = collections.namedtuple("Connection", ["client", "port", "state"]) +Connection = collections.namedtuple("Connection", ["client", "port", "state", "tstart"]) SpiderRecord = collections.namedtuple("SpiderRecord", ["ip", "rport", "port", - "host", "ecnstate", - "connstate"]) + "rank", "host", "ecnstate", + "connstate", "tstart", "tstop"]) CONN_OK = 0 CONN_FAILED = 1 @@ -28,6 +28,14 @@ USER_AGENT = "pathspider" +TCP_CWR = 0x80 +TCP_ECE = 0x40 +TCP_ACK = 0x10 +TCP_SYN = 0x02 + +TCP_SAEW = (TCP_SYN | TCP_ACK | TCP_ECE | TCP_CWR) +TCP_SAE = (TCP_SYN | TCP_ACK | TCP_ECE) + ## Chain functions def ecnsetup(rec, ip): @@ -37,13 +45,9 @@ def ecnsetup(rec, ip): return True def ecnflags(rec, tcp, rev): - SYN = 0x02 - CWR = 0x40 - ECE = 0x80 - flags = tcp.flags - if flags & SYN: + if flags & TCP_SYN: if rev == 0: rec['fwd_syn_flags'] = flags if rev == 1: @@ -74,6 +78,7 @@ def __init__(self, worker_count, libtrace_uri): libtrace_uri=libtrace_uri) self.tos = None # set by configurator self.conn_timeout = 10 + self.comparetab = {} def config_zero(self): """ @@ -100,30 +105,38 @@ def connect(self, job, pcs, config): Performs a TCP connection. """ - if ":" in job[0]: + job_ip, job_port, job_host, job_rank = job + + tstart = datetime.utcnow() + + if ":" in job_ip: sock = socket.socket(socket.AF_INET6) else: sock = socket.socket(socket.AF_INET) try: sock.settimeout(self.conn_timeout) - sock.connect((job[0], job[1])) + sock.connect((job_ip, job_port)) - return Connection(sock, sock.getsockname()[1], CONN_OK) + return Connection(sock, sock.getsockname()[1], CONN_OK, tstart) except TimeoutError: - return Connection(sock, sock.getsockname()[1], CONN_TIMEOUT) + return Connection(sock, sock.getsockname()[1], CONN_TIMEOUT, tstart) except OSError: - return Connection(sock, sock.getsockname()[1], CONN_FAILED) + return Connection(sock, sock.getsockname()[1], CONN_FAILED, tstart) def post_connect(self, job, conn, pcs, config): """ Close the socket gracefully. """ + job_ip, job_port, job_host, job_rank = job + + tstop = datetime.utcnow() + if conn.state == CONN_OK: - rec = SpiderRecord(job[0], job[1], conn.port, job[2], config, True) + rec = SpiderRecord(job_ip, job_port, conn.port, job_rank, job_host, config, True, conn.tstart, tstop) else: - rec = SpiderRecord(job[0], job[1], conn.port, job[2], config, False) + rec = SpiderRecord(job_ip, job_port, conn.port, job_rank, job_host, config, False, conn.tstart, tstop) try: conn.client.shutdown(socket.SHUT_RDWR) @@ -155,6 +168,47 @@ def create_observer(self): traceback.print_exc() sys.exit(-1) + def combine_flows(self, flow): + dip = flow['dip'] + if dip in self.comparetab: + other_flow = self.comparetab.pop(dip) + + # first has always ecn off, while the second has ecn on + flows = (flow, other_flow) if other_flow['ecnstate'] else (other_flow, flow) + + tstart = min(flow['tstart'], other_flow['tstart']) + tstop = max(flow['tstop'], other_flow['tstop']) + + if flows[0]['connstate'] and flows[1]['connstate']: + cond_conn = 'ecn.connectivity.works' + elif flows[0]['connstate'] and not flows[1]['connstate']: + cond_conn = 'ecn.connectivity.broken' + elif not flows[0]['connstate'] and not flows[1]['connstate']: + cond_conn = 'ecn.connectivity.transient' + else: + cond_conn = 'ecn.connectivity.offline' + + if flows[1]['rev_syn_flags'] & TCP_SAEW == TCP_SAE: + cond_nego = 'ecn.negotiated' + else: + cond_nego = 'ecn.not_negotiated' + + self.outqueue.put({ + 'sip': flow['sip'], + 'dip': dip, + 'dp': flow['dp'], + 'conditions': [cond_conn, cond_nego], + 'hostname': flow['host'], + 'rank': flow['rank'], + 'flow_results': flows, + 'time': { + 'from': tstart.isoformat(), + 'to': tstop.isoformat() + } + }) + else: + self.comparetab[dip] = flow + def merge(self, flow, res): """ Merge flow records. @@ -168,14 +222,17 @@ def merge(self, flow, res): flow = {"dip": res.ip, "sp": res.port, "dp": res.rport, - "connstate": res.connstate, - "ecnstate": res.ecnstate, "observed": False } else: - flow['connstate'] = res.connstate - flow['ecnstate'] = res.ecnstate flow['observed'] = True + flow['rank'] = res.rank + flow['host'] = res.host + flow['connstate'] = res.connstate + flow['ecnstate'] = res.ecnstate + flow['tstart'] = res.tstart + flow['tstop'] = res.tstop + logger.debug("Result: " + str(flow)) - self.outqueue.put(flow) + self.combine_flows(flow)