Skip to content

Commit

Permalink
ecnspider should output one observation instead of two flows
Browse files Browse the repository at this point in the history
  • Loading branch information
gubser authored and irl committed Sep 16, 2016
1 parent d27ff2e commit 854de72
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 27 deletions.
12 changes: 6 additions & 6 deletions examples/webtest.csv
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
160.85.31.173,80,mami-project.eu
139.133.210.32,80,erg.abdn.ac.uk
2001:630:241:210:569f:35ff:fe0a:116a,80,erg.abdn.ac.uk
129.132.52.158,80,ecn.ethz.ch
2001:67c:10ec:36c2::61,80,ecn.ethz.ch
139.133.1.4,80,abdn.ac.uk
160.85.31.173,80,mami-project.eu,1
139.133.210.32,80,erg.abdn.ac.uk,2
2001:630:241:210:569f:35ff:fe0a:116a,80,erg.abdn.ac.uk,3
129.132.52.158,80,ecn.ethz.ch,4
2001:67c:10ec:36c2::61,80,ecn.ethz.ch,5
139.133.1.4,80,abdn.ac.uk,6
99 changes: 78 additions & 21 deletions pathspider/plugins/ecnspider3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import subprocess
import traceback
from datetime import datetime

import socket
import collections
Expand All @@ -13,21 +14,28 @@
from pathspider.observer import Observer
from pathspider.observer import basic_flow
from pathspider.observer import basic_count

from pathspider.observer.tcp import tcp_setup
from pathspider.observer.tcp import tcp_complete

Connection = collections.namedtuple("Connection", ["client", "port", "state"])
Connection = collections.namedtuple("Connection", ["client", "port", "state", "tstart"])
SpiderRecord = collections.namedtuple("SpiderRecord", ["ip", "rport", "port",
"host", "ecnstate",
"connstate"])
"rank", "host", "ecnstate",
"connstate", "tstart", "tstop"])

CONN_OK = 0
CONN_FAILED = 1
CONN_TIMEOUT = 2

USER_AGENT = "pathspider"

TCP_CWR = 0x80
TCP_ECE = 0x40
TCP_ACK = 0x10
TCP_SYN = 0x02

TCP_SAEW = (TCP_SYN | TCP_ACK | TCP_ECE | TCP_CWR)
TCP_SAE = (TCP_SYN | TCP_ACK | TCP_ECE)

## Chain functions

def ecnsetup(rec, ip):
Expand All @@ -37,13 +45,9 @@ def ecnsetup(rec, ip):
return True

def ecnflags(rec, tcp, rev):
SYN = 0x02
CWR = 0x40
ECE = 0x80

flags = tcp.flags

if flags & SYN:
if flags & TCP_SYN:
if rev == 0:
rec['fwd_syn_flags'] = flags
if rev == 1:
Expand Down Expand Up @@ -74,6 +78,7 @@ def __init__(self, worker_count, libtrace_uri):
libtrace_uri=libtrace_uri)
self.tos = None # set by configurator
self.conn_timeout = 10
self.comparetab = {}

def config_zero(self):
"""
Expand All @@ -100,30 +105,38 @@ def connect(self, job, pcs, config):
Performs a TCP connection.
"""

if ":" in job[0]:
job_ip, job_port, job_host, job_rank = job

tstart = datetime.utcnow()

if ":" in job_ip:
sock = socket.socket(socket.AF_INET6)
else:
sock = socket.socket(socket.AF_INET)

try:
sock.settimeout(self.conn_timeout)
sock.connect((job[0], job[1]))
sock.connect((job_ip, job_port))

return Connection(sock, sock.getsockname()[1], CONN_OK)
return Connection(sock, sock.getsockname()[1], CONN_OK, tstart)
except TimeoutError:
return Connection(sock, sock.getsockname()[1], CONN_TIMEOUT)
return Connection(sock, sock.getsockname()[1], CONN_TIMEOUT, tstart)
except OSError:
return Connection(sock, sock.getsockname()[1], CONN_FAILED)
return Connection(sock, sock.getsockname()[1], CONN_FAILED, tstart)

def post_connect(self, job, conn, pcs, config):
"""
Close the socket gracefully.
"""

job_ip, job_port, job_host, job_rank = job

tstop = datetime.utcnow()

if conn.state == CONN_OK:
rec = SpiderRecord(job[0], job[1], conn.port, job[2], config, True)
rec = SpiderRecord(job_ip, job_port, conn.port, job_rank, job_host, config, True, conn.tstart, tstop)
else:
rec = SpiderRecord(job[0], job[1], conn.port, job[2], config, False)
rec = SpiderRecord(job_ip, job_port, conn.port, job_rank, job_host, config, False, conn.tstart, tstop)

try:
conn.client.shutdown(socket.SHUT_RDWR)
Expand Down Expand Up @@ -155,6 +168,47 @@ def create_observer(self):
traceback.print_exc()
sys.exit(-1)

def combine_flows(self, flow):
dip = flow['dip']
if dip in self.comparetab:
other_flow = self.comparetab.pop(dip)

# first has always ecn off, while the second has ecn on
flows = (flow, other_flow) if other_flow['ecnstate'] else (other_flow, flow)

tstart = min(flow['tstart'], other_flow['tstart'])
tstop = max(flow['tstop'], other_flow['tstop'])

if flows[0]['connstate'] and flows[1]['connstate']:
cond_conn = 'ecn.connectivity.works'
elif flows[0]['connstate'] and not flows[1]['connstate']:
cond_conn = 'ecn.connectivity.broken'
elif not flows[0]['connstate'] and not flows[1]['connstate']:
cond_conn = 'ecn.connectivity.transient'
else:
cond_conn = 'ecn.connectivity.offline'

if flows[1]['rev_syn_flags'] & TCP_SAEW == TCP_SAE:
cond_nego = 'ecn.negotiated'
else:
cond_nego = 'ecn.not_negotiated'

self.outqueue.put({
'sip': flow['sip'],
'dip': dip,
'dp': flow['dp'],
'conditions': [cond_conn, cond_nego],
'hostname': flow['host'],
'rank': flow['rank'],
'flow_results': flows,
'time': {
'from': tstart.isoformat(),
'to': tstop.isoformat()
}
})
else:
self.comparetab[dip] = flow

def merge(self, flow, res):
"""
Merge flow records.
Expand All @@ -168,14 +222,17 @@ def merge(self, flow, res):
flow = {"dip": res.ip,
"sp": res.port,
"dp": res.rport,
"connstate": res.connstate,
"ecnstate": res.ecnstate,
"observed": False }
else:
flow['connstate'] = res.connstate
flow['ecnstate'] = res.ecnstate
flow['observed'] = True

flow['rank'] = res.rank
flow['host'] = res.host
flow['connstate'] = res.connstate
flow['ecnstate'] = res.ecnstate
flow['tstart'] = res.tstart
flow['tstop'] = res.tstop

logger.debug("Result: " + str(flow))
self.outqueue.put(flow)
self.combine_flows(flow)

0 comments on commit 854de72

Please sign in to comment.