From 6f0a0748f1c165ae18b1d1da37762ad8d3988268 Mon Sep 17 00:00:00 2001 From: meejah Date: Tue, 19 Jan 2021 15:44:13 -0700 Subject: [PATCH 01/30] first-cut of state-machine style code --- docs/server-statemachine.dot | 22 ++ src/wormhole_transit_relay/server_state.py | 299 +++++++++++++++++++++ 2 files changed, 321 insertions(+) create mode 100644 docs/server-statemachine.dot create mode 100644 src/wormhole_transit_relay/server_state.py diff --git a/docs/server-statemachine.dot b/docs/server-statemachine.dot new file mode 100644 index 0000000..d3a2215 --- /dev/null +++ b/docs/server-statemachine.dot @@ -0,0 +1,22 @@ +/** +. thinking about state-machine from "hand-drawn" perspective +. will it look the same as an Automat one? +**/ + +digraph { + listening -> wait_relay [label="connection_made"] + + wait_relay -> wait_partner [label="please_relay\nFindPartner"] + wait_relay -> wait_partner [label="please_relay_for_side\nFindPartner"] + wait_relay -> done [label="invalid_token\nSend('bad handshake')\nDisconnect"] + wait_relay -> done [label="connection_lost"] + + wait_partner -> relaying [label="got_partner\nConnectPartner(partner)\nSend('ok')"] + wait_partner -> done [label="got_bytes\nDisconnect"] + wait_partner -> done [label="connection_lost"] + + relaying -> relaying [label="got_bytes\nSend(bytes)"] + relaying -> done [label="partner_connection_lost\nDisconnectMe"] + relaying -> done [label="connection_lost\nDisconnectPartner"] +} + diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py new file mode 100644 index 0000000..6b2c6a5 --- /dev/null +++ b/src/wormhole_transit_relay/server_state.py @@ -0,0 +1,299 @@ + +import automat +from zope.interface import ( + Interface, + implementer, +) + + +class ITransitClient(Interface): + def send(data): + """ + Send some byets to the client + """ + + def disconnect(reason): + """ + Disconnect the client transport + """ + + def connect_partner(other): + """ + Hook up to our partner. + :param ITransitClient other: our partner + """ + + def disconnect_partner(): + """ + Disconnect our partner's transport + """ + + +@implementer(ITransitClient) +class TestClient(object): + _partner = None + _data = b"" + + def send_to_partner(self, data): + print("{} GOT:{}".format(id(self), repr(data))) + if self._partner: + self._partner.send(data) + + def send(self, data): + print("{} SEND:{}".format(id(self), repr(data))) + self._data += data + + def disconnect(self): + print("disconnect") + + def connect_partner(self, other): + print("connect_partner: {} <--> {}".format(id(self), id(other))) + assert self._partner is None, "double partner" + self._partner = other + + def disconnect_partner(self): + assert self._partner is not None, "no partner" + print("disconnect_partner: {}".format(id(self._partner))) + + +class PendingRequests(object): + """ + Tracks the tokens we have received from client connections and + maps them to their partner connections + """ + + def register_token(self, *args): + """ + """ + + +class TransitServer(object): + """ + Encapsulates the state-machine of the server side of a transit + relay connection. + + Once the protocol has been told to relay (or to relay for a side) + it starts passing all received bytes to the other side until it + closes. + """ + + _machine = automat.MethodicalMachine() + _client = None + + @_machine.input() + def connection_made(self, client): + """ + A client has connected. May only be called once. + + :param ITransitClient client: our client. + """ + # NB: the "only called once" is enforced by the state-machine; + # this input is only valid for the "listening" state, to which + # we never return. + + @_machine.input() + def please_relay(self, token): + pass + + @_machine.input() + def please_relay_for_side(self, token, side): + pass + + @_machine.input() + def bad_token(self): + """ + A bad token / relay line was received + """ + + @_machine.input() + def got_partner(self, client): + """ + The partner for this relay session has been found + """ + + @_machine.input() + def connection_lost(self): + pass + + @_machine.input() + def partner_connection_lost(self): + pass + + @_machine.input() + def got_bytes(self, data): + """ + Some bytes have arrived (that aren't part of the handshake) + """ + + @_machine.output() + def _remember_client(self, client): + self._client = client + + @_machine.output() + def _register_token(self, token): + return self._real_register_token_for_side(token, None) + + @_machine.output() + def _register_token_for_side(self, token, side): + return self._real_register_token_for_side(token, side) + + @_machine.output() + def _unregister(self): + """ + remove us from the thing that remembers tokens and sides + """ + + @_machine.output() + def _send_bad(self): + self._client.send("bad handshake\n") + + @_machine.output() + def _send_ok(self): + self._client.send("ok\n") + + @_machine.output() + def _send(self, data): + self._client.send(data) + + @_machine.output() + def _send_to_partner(self, data): + self._client.send_to_partner(data) + + @_machine.output() + def _connect_partner(self, client): + self._client.connect_partner(client) + + @_machine.output() + def _disconnect(self): + self._client.disconnect() + + @_machine.output() + def _disconnect_partner(self): + self._client.disconnect_partner() + + def _real_register_token_for_side(self, token, side): + """ + basically, _got_handshake() + connection_got_token() from "real" + code ...and if this is the "second" side, hook them up and + pass .got_partner() input to both + """ + + @_machine.state(initial=True) + def listening(self): + """ + Initial state, awaiting connection. + """ + + @_machine.state() + def wait_relay(self): + """ + Waiting for a 'relay' message + """ + + @_machine.state() + def wait_partner(self): + """ + Waiting for our partner to connect + """ + + @_machine.state() + def relaying(self): + """ + Relaying bytes to our partner + """ + + @_machine.state() + def done(self): + """ + Terminal state + """ + + listening.upon( + connection_made, + enter=wait_relay, + outputs=[_remember_client], + ) + + wait_relay.upon( + please_relay, + enter=wait_partner, + outputs=[_register_token], + ) + wait_relay.upon( + please_relay_for_side, + enter=wait_partner, + outputs=[_register_token_for_side], + ) + wait_relay.upon( + bad_token, + enter=done, + outputs=[_send_bad, _disconnect], + ) + wait_relay.upon( + connection_lost, + enter=done, + outputs=[_disconnect], + ) + + wait_partner.upon( + got_partner, + enter=relaying, + outputs=[_send_ok, _connect_partner], + ) + wait_partner.upon( + connection_lost, + enter=done, + outputs=[_unregister], + ) + + relaying.upon( + got_bytes, + enter=relaying, + outputs=[_send_to_partner], + ) + relaying.upon( + connection_lost, + enter=done, + outputs=[_disconnect_partner, _unregister], + ) + relaying.upon( + partner_connection_lost, + enter=done, + outputs=[_disconnect, _unregister], + ) + + + + +# actions: +# - send("ok") +# - send("bad handshake") +# - disconnect +# - ... + +if __name__ == "__main__": + server0 = TransitServer() + client0 = TestClient() + server1 = TransitServer() + client1 = TestClient() + server0.connection_made(client0) + server0.please_relay(b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + + # this would be an error, because our partner hasn't shown up yet + # print(server0.got_bytes(b"asdf")) + + server1.connection_made(client1) + server1.please_relay(b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + + # XXX the PendingRequests stuff should do this, going "by hand" for now + server0.got_partner(client1) + server1.got_partner(client0) + + # should be connected now + server0.got_bytes(b"asdf") + # client1 should receive b"asdf" + + server0.connection_lost() + print("----[ received data on both sides ]----") + print("client0:{}".format(repr(client0._data))) + print("client1:{}".format(repr(client1._data))) From 57f9c32b812a49dcaf1ac4f6cc86fe68e0fe5e2f Mon Sep 17 00:00:00 2001 From: meejah Date: Mon, 25 Jan 2021 17:59:14 -0700 Subject: [PATCH 02/30] (wip) refactor to use Automat state-machine --- src/wormhole_transit_relay/server_state.py | 256 ++++++++++++++++-- .../test/test_rlimits.py | 2 +- .../test/test_service.py | 2 +- src/wormhole_transit_relay/test/test_stats.py | 2 +- src/wormhole_transit_relay/transit_server.py | 220 +++++++-------- 5 files changed, 335 insertions(+), 147 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index 6b2c6a5..cf95d1d 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -1,3 +1,4 @@ +from collections import defaultdict import automat from zope.interface import ( @@ -12,7 +13,7 @@ def send(data): Send some byets to the client """ - def disconnect(reason): + def disconnect(): """ Disconnect the client transport """ @@ -37,7 +38,7 @@ class TestClient(object): def send_to_partner(self, data): print("{} GOT:{}".format(id(self), repr(data))) if self._partner: - self._partner.send(data) + self._partner._client.send(data) def send(self, data): print("{} SEND:{}".format(id(self), repr(data))) @@ -56,18 +57,94 @@ def disconnect_partner(self): print("disconnect_partner: {}".format(id(self._partner))) -class PendingRequests(object): +class ActiveConnections(object): """ - Tracks the tokens we have received from client connections and - maps them to their partner connections + Tracks active connections. A connection is 'active' when both + sides have shown up and they are glued together. """ + def __init__(self): + self._connections = set() + + def register(self, side0, side1): + """ + A connection has become active so register both its sides - def register_token(self, *args): + :param TransitConnection side0: one side of the connection + :param TransitConnection side1: one side of the connection """ + self._connections.add(side0) + self._connections.add(side1) + + def unregister(self, side): + """ + One side of a connection has become inactive. + + :param TransitConnection side: an inactive side of a connection """ + self._connections.discard(side) + +class PendingRequests(object): + """ + Tracks the tokens we have received from client connections and + maps them to their partner connections for requests that haven't + yet been 'glued together' (that is, one side hasn't yet shown up). + """ -class TransitServer(object): + def __init__(self, active_connections): + self._requests = defaultdict(set) # token -> set((side, TransitConnection)) + self._active = active_connections + + def unregister(self, token, side, tc): + if token in self._requests: + self._requests[token].discard((side, tc)) + self._active.unregister(tc) + + def register_token(self, token, new_side, new_tc): + """ + A client has connected and successfully offered a token (and + optional 'side' token). If this is the first one for this + token, we merely remember it. If it is the second side for + this token we connect them together. + + :returns bool: True if we are the first side to register this + token + """ + potentials = self._requests[token] + for old in potentials: + (old_side, old_tc) = old + if ((old_side is None) + or (new_side is None) + or (old_side != new_side)): + # we found a match + # FIXME: debug-log this + # print("transit relay 2: %s" % new_tc.get_token()) + + # drop and stop tracking the rest + potentials.remove(old) + for (_, leftover_tc) in potentials.copy(): + # Don't record this as errory. It's just a spare connection + # from the same side as a connection that got used. This + # can happen if the connection hint contains multiple + # addresses (we don't currently support those, but it'd + # probably be useful in the future). + leftover_tc.disconnect_redundant() + self._requests.pop(token, None) + + # glue the two ends together + self._active.register(new_tc, old_tc) + new_tc.got_partner(old_tc) + old_tc.got_partner(new_tc) + return False + + # FIXME: debug-log this + # print("transit relay 1: %s" % new_tc.get_token()) + potentials.add((new_side, new_tc)) + return True + # TODO: timer + + +class TransitServerState(object): """ Encapsulates the state-machine of the server side of a transit relay connection. @@ -79,6 +156,36 @@ class TransitServer(object): _machine = automat.MethodicalMachine() _client = None + _buddy = None + _token = None + _side = None + _first = None + _mood = "empty" + + def __init__(self, pending_requests): + self._pending_requests = pending_requests + + def get_token(self): + """ + :returns str: a string describing our token. This will be "-" if + we have no token yet, or "{16 chars}-" if we have + just a token or "{16 chars}-{16 chars}" if we have a token and + a side. + """ + d = "-" + if self._token is not None: + d = self._token[:16].decode("ascii") + if self._side is not None: + d += "-" + self._side.decode("ascii") + else: + d += "-" + return d + + def get_mood(self): + """ + :returns str: description of the current 'mood' of the connection + """ + return self._mood @_machine.input() def connection_made(self, client): @@ -93,16 +200,22 @@ def connection_made(self, client): @_machine.input() def please_relay(self, token): - pass + """ + A 'please relay X' message has been received (the original version + of the protocol). + """ @_machine.input() def please_relay_for_side(self, token, side): - pass + """ + A 'please relay X for side Y' message has been received (the + second version of the protocol). + """ @_machine.input() def bad_token(self): """ - A bad token / relay line was received + A bad token / relay line was received (e.g. couldn't be parsed) """ @_machine.input() @@ -113,11 +226,15 @@ def got_partner(self, client): @_machine.input() def connection_lost(self): - pass + """ + Our transport has failed. + """ @_machine.input() def partner_connection_lost(self): - pass + """ + Our partner's transport has failed. + """ @_machine.input() def got_bytes(self, data): @@ -142,14 +259,20 @@ def _unregister(self): """ remove us from the thing that remembers tokens and sides """ + return self._pending_requests.unregister(self._token, self._side, self) @_machine.output() def _send_bad(self): - self._client.send("bad handshake\n") + self._mood = "errory" + self._client.send(b"bad handshake\n") @_machine.output() def _send_ok(self): - self._client.send("ok\n") + self._client.send(b"ok\n") + + @_machine.output() + def _send_impatient(self): + self._client.send(b"impatient\n") @_machine.output() def _send(self, data): @@ -157,10 +280,11 @@ def _send(self, data): @_machine.output() def _send_to_partner(self, data): - self._client.send_to_partner(data) + self._buddy._client.send(data) @_machine.output() def _connect_partner(self, client): + self._buddy = client self._client.connect_partner(client) @_machine.output() @@ -171,12 +295,60 @@ def _disconnect(self): def _disconnect_partner(self): self._client.disconnect_partner() + # some outputs to record the "mood" .. + @_machine.output() + def _mood_happy(self): + self._mood = "happy" + + @_machine.output() + def _mood_lonely(self): + self._mood = "lonely" + + @_machine.output() + def _mood_impatient(self): + self._mood = "impatient" + + @_machine.output() + def _mood_errory(self): + self._mood = "errory" + + @_machine.output() + def _mood_happy_if_first(self): + """ + We disconnected first so we're only happy if we also connected + first. + """ + if self._first: + self._mood = "happy" + else: + self._mood = "jilted" + + @_machine.output() + def _mood_happy_if_second(self): + """ + We disconnected second so we're only happy if we also connected + second. + """ + if self._first: + self._mood = "jilted" + else: + self._mood = "happy" + def _real_register_token_for_side(self, token, side): """ - basically, _got_handshake() + connection_got_token() from "real" - code ...and if this is the "second" side, hook them up and - pass .got_partner() input to both + A client has connected and sent a valid version 1 or version 2 + handshake. If the former, `side` will be None. + + In either case, we remember the tokens and register + ourselves. This might result in 'got_partner' notifications to + two state-machines if this is the second side for a given token. + + :param bytes token: the token + :param bytes side: The side token (or None) """ + self._token = token + self._side = side + self._first = self._pending_requests.register_token(token, side, self) @_machine.state(initial=True) def listening(self): @@ -217,17 +389,22 @@ def done(self): wait_relay.upon( please_relay, enter=wait_partner, - outputs=[_register_token], + outputs=[_mood_lonely, _register_token], ) wait_relay.upon( please_relay_for_side, enter=wait_partner, - outputs=[_register_token_for_side], + outputs=[_mood_lonely, _register_token_for_side], ) wait_relay.upon( bad_token, enter=done, - outputs=[_send_bad, _disconnect], + outputs=[_mood_errory, _send_bad, _disconnect], + ) + wait_relay.upon( + got_bytes, + enter=done, + outputs=[_mood_errory, _disconnect], ) wait_relay.upon( connection_lost, @@ -238,12 +415,17 @@ def done(self): wait_partner.upon( got_partner, enter=relaying, - outputs=[_send_ok, _connect_partner], + outputs=[_mood_happy, _send_ok, _connect_partner], ) wait_partner.upon( connection_lost, enter=done, - outputs=[_unregister], + outputs=[_mood_lonely, _unregister], + ) + wait_partner.upon( + got_bytes, + enter=done, + outputs=[_mood_impatient, _send_impatient, _disconnect, _unregister], ) relaying.upon( @@ -254,12 +436,23 @@ def done(self): relaying.upon( connection_lost, enter=done, - outputs=[_disconnect_partner, _unregister], + outputs=[_mood_happy_if_first, _disconnect_partner, _unregister], ) relaying.upon( partner_connection_lost, enter=done, - outputs=[_disconnect, _unregister], + outputs=[_mood_happy_if_second, _disconnect, _unregister], + ) + + done.upon( + connection_lost, + enter=done, + outputs=[], + ) + done.upon( + partner_connection_lost, + enter=done, + outputs=[], ) @@ -272,9 +465,12 @@ def done(self): # - ... if __name__ == "__main__": - server0 = TransitServer() + active = ActiveConnections() + pending = PendingRequests(active) + + server0 = TransitServerState(pending) client0 = TestClient() - server1 = TransitServer() + server1 = TransitServerState(pending) client1 = TestClient() server0.connection_made(client0) server0.please_relay(b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") @@ -282,12 +478,14 @@ def done(self): # this would be an error, because our partner hasn't shown up yet # print(server0.got_bytes(b"asdf")) + print("about to relay client1") server1.connection_made(client1) server1.please_relay(b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + print("done") # XXX the PendingRequests stuff should do this, going "by hand" for now - server0.got_partner(client1) - server1.got_partner(client0) +# server0.got_partner(client1) +# server1.got_partner(client0) # should be connected now server0.got_bytes(b"asdf") diff --git a/src/wormhole_transit_relay/test/test_rlimits.py b/src/wormhole_transit_relay/test/test_rlimits.py index 10497e4..3ee23a9 100644 --- a/src/wormhole_transit_relay/test/test_rlimits.py +++ b/src/wormhole_transit_relay/test/test_rlimits.py @@ -1,5 +1,5 @@ from __future__ import print_function, unicode_literals -import mock +from unittest import mock from twisted.trial import unittest from ..increase_rlimits import increase_rlimits diff --git a/src/wormhole_transit_relay/test/test_service.py b/src/wormhole_transit_relay/test/test_service.py index dac642c..f72765c 100644 --- a/src/wormhole_transit_relay/test/test_service.py +++ b/src/wormhole_transit_relay/test/test_service.py @@ -1,6 +1,6 @@ from __future__ import unicode_literals, print_function from twisted.trial import unittest -import mock +from unittest import mock from twisted.application.service import MultiService from .. import server_tap diff --git a/src/wormhole_transit_relay/test/test_stats.py b/src/wormhole_transit_relay/test/test_stats.py index f9433ef..43b912f 100644 --- a/src/wormhole_transit_relay/test/test_stats.py +++ b/src/wormhole_transit_relay/test/test_stats.py @@ -1,6 +1,6 @@ from __future__ import print_function, unicode_literals import os, io, json, sqlite3 -import mock +from unittest import mock from twisted.trial import unittest from ..transit_server import Transit from .. import database diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index 91d84e0..9de3b1e 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -24,6 +24,17 @@ def blur_size(size): return round_to(size, 1e6) return round_to(size, 100e6) + +from wormhole_transit_relay.server_state import ( + TransitServerState, + PendingRequests, + ActiveConnections, + ITransitClient, +) +from zope.interface import implementer + + +@implementer(ITransitClient) class TransitConnection(LineReceiver): delimiter = b'\n' # maximum length of a line we will accept before the handshake is complete. @@ -32,13 +43,33 @@ class TransitConnection(LineReceiver): MAX_LENGTH = 1024 def __init__(self): - self._got_token = False - self._got_side = False - self._sent_ok = False - self._mood = "empty" - self._buddy = None self._total_sent = 0 + def send(self, data): + """ + ITransitClient API + """ + self.transport.write(data) + + def disconnect(self): + """ + ITransitClient API + """ + self.transport.loseConnection() + + def connect_partner(self, other): + """ + ITransitClient API + """ + self._buddy = other + + def disconnect_partner(self): + """ + ITransitClient API + """ + self._buddy._client.transport.loseConnection() + self._buddy = None + def describeToken(self): d = "-" if self._got_token: @@ -50,6 +81,8 @@ def describeToken(self): return d def connectionMade(self): + self._state = TransitServerState(self.factory.pending_requests) + self._state.connection_made(self) self._started = time.time() self._log_requests = self.factory._log_requests try: @@ -71,10 +104,10 @@ def lineReceived(self, line): side = new.group(2) return self._got_handshake(token, side) - self.sendLine(b"bad handshake") - if self._log_requests: - log.msg("transit handshake failure") - return self.disconnect_error() + # state-machine calls us via ITransitClient interface to do + # bad handshake etc. + return self._state.bad_token() + #return self._state.got_bytes(line) def rawDataReceived(self, data): # We are an IPushProducer to our buddy's IConsumer, so they'll @@ -83,33 +116,15 @@ def rawDataReceived(self, data): # practice, this buffers about 10MB per connection, after which # point the sender will only transmit data as fast as the # receiver can handle it. - if self._sent_ok: - if not self._buddy: - # Our buddy disconnected (we're "jilted"), so we hung up too, - # but our incoming data hasn't stopped yet (it will in a - # moment, after our disconnect makes a roundtrip through the - # kernel). This probably means the file receiver hung up, and - # this connection is the file sender. In may-2020 this - # happened 11 times in 40 days. - return - self._total_sent += len(data) - self._buddy.transport.write(data) - return - - # handshake is complete but not yet sent_ok - self.sendLine(b"impatient") - if self._log_requests: - log.msg("transit impatience failure") - return self.disconnect_error() # impatience yields failure + self._state.got_bytes(data) + self._total_sent += len(data) def _got_handshake(self, token, side): - self._got_token = token - self._got_side = side - self._mood = "lonely" # until buddy connects + self._state.please_relay_for_side(token, side) + # self._mood = "lonely" # until buddy connects self.setRawMode() - self.factory.connection_got_token(token, side, self) - def buddy_connected(self, them): + def __buddy_connected(self, them): self._buddy = them self._mood = "happy" self.sendLine(b"ok") @@ -121,7 +136,7 @@ def buddy_connected(self, them): # The Transit object calls buddy_connected() on both protocols, so # there will be two producer/consumer pairs. - def buddy_disconnected(self): + def __buddy_disconnected(self): if self._log_requests: log.msg("buddy_disconnected %s" % self.describeToken()) self._buddy = None @@ -145,56 +160,62 @@ def disconnect_redundant(self): def connectionLost(self, reason): finished = time.time() total_time = finished - self._started + self._state.connection_lost() + + # XXX FIXME record usage + + if False: + # Record usage. There are eight cases: + # * n0: we haven't gotten a full handshake yet (empty) + # * n1: the handshake failed, not a real client (errory) + # * n2: real client disconnected before any buddy appeared (lonely) + # * n3: real client closed as redundant after buddy appears (redundant) + # * n4: real client connected first, buddy closes first (jilted) + # * n5: real client connected first, buddy close last (happy) + # * n6: real client connected last, buddy closes first (jilted) + # * n7: real client connected last, buddy closes last (happy) + + # * non-connected clients (0,1,2,3) always write a usage record + # * for connected clients, whoever disconnects first gets to write the + # usage record (5, 7). The last disconnect doesn't write a record. + + if self._mood == "empty": # 0 + assert not self._buddy + self.factory.recordUsage(self._started, "empty", 0, + total_time, None) + elif self._mood == "errory": # 1 + assert not self._buddy + self.factory.recordUsage(self._started, "errory", 0, + total_time, None) + elif self._mood == "redundant": # 3 + assert not self._buddy + self.factory.recordUsage(self._started, "redundant", 0, + total_time, None) + elif self._mood == "jilted": # 4 or 6 + # we were connected, but our buddy hung up on us. They record the + # usage event, we do not + pass + elif self._mood == "lonely": # 2 + assert not self._buddy + self.factory.recordUsage(self._started, "lonely", 0, + total_time, None) + else: # 5 or 7 + # we were connected, we hung up first. We record the event. + assert self._mood == "happy", self._mood + assert self._buddy + starts = [self._started, self._buddy._started] + total_time = finished - min(starts) + waiting_time = max(starts) - min(starts) + total_bytes = self._total_sent + self._buddy._total_sent + self.factory.recordUsage(self._started, "happy", total_bytes, + total_time, waiting_time) + + if self._buddy: + self._buddy.buddy_disconnected() + # self.factory.transitFinished(self, self._got_token, self._got_side, + # self.describeToken()) + - # Record usage. There are eight cases: - # * n0: we haven't gotten a full handshake yet (empty) - # * n1: the handshake failed, not a real client (errory) - # * n2: real client disconnected before any buddy appeared (lonely) - # * n3: real client closed as redundant after buddy appears (redundant) - # * n4: real client connected first, buddy closes first (jilted) - # * n5: real client connected first, buddy close last (happy) - # * n6: real client connected last, buddy closes first (jilted) - # * n7: real client connected last, buddy closes last (happy) - - # * non-connected clients (0,1,2,3) always write a usage record - # * for connected clients, whoever disconnects first gets to write the - # usage record (5, 7). The last disconnect doesn't write a record. - - if self._mood == "empty": # 0 - assert not self._buddy - self.factory.recordUsage(self._started, "empty", 0, - total_time, None) - elif self._mood == "errory": # 1 - assert not self._buddy - self.factory.recordUsage(self._started, "errory", 0, - total_time, None) - elif self._mood == "redundant": # 3 - assert not self._buddy - self.factory.recordUsage(self._started, "redundant", 0, - total_time, None) - elif self._mood == "jilted": # 4 or 6 - # we were connected, but our buddy hung up on us. They record the - # usage event, we do not - pass - elif self._mood == "lonely": # 2 - assert not self._buddy - self.factory.recordUsage(self._started, "lonely", 0, - total_time, None) - else: # 5 or 7 - # we were connected, we hung up first. We record the event. - assert self._mood == "happy", self._mood - assert self._buddy - starts = [self._started, self._buddy._started] - total_time = finished - min(starts) - waiting_time = max(starts) - min(starts) - total_bytes = self._total_sent + self._buddy._total_sent - self.factory.recordUsage(self._started, "happy", total_bytes, - total_time, waiting_time) - - if self._buddy: - self._buddy.buddy_disconnected() - self.factory.transitFinished(self, self._got_token, self._got_side, - self.describeToken()) class Transit(protocol.ServerFactory): # I manage pairs of simultaneous connections to a secondary TCP port, @@ -230,6 +251,8 @@ class Transit(protocol.ServerFactory): protocol = TransitConnection def __init__(self, blur_usage, log_file, usage_db): + self.active_connections = ActiveConnections() + self.pending_requests = PendingRequests(self.active_connections) self._blur_usage = blur_usage self._log_requests = blur_usage is None if self._blur_usage: @@ -247,39 +270,6 @@ def __init__(self, blur_usage, log_file, usage_db): self._pending_requests = defaultdict(set) # token -> set((side, TransitConnection)) self._active_connections = set() # TransitConnection - def connection_got_token(self, token, new_side, new_tc): - potentials = self._pending_requests[token] - for old in potentials: - (old_side, old_tc) = old - if ((old_side is None) - or (new_side is None) - or (old_side != new_side)): - # we found a match - if self._debug_log: - log.msg("transit relay 2: %s" % new_tc.describeToken()) - - # drop and stop tracking the rest - potentials.remove(old) - for (_, leftover_tc) in potentials.copy(): - # Don't record this as errory. It's just a spare connection - # from the same side as a connection that got used. This - # can happen if the connection hint contains multiple - # addresses (we don't currently support those, but it'd - # probably be useful in the future). - leftover_tc.disconnect_redundant() - self._pending_requests.pop(token, None) - - # glue the two ends together - self._active_connections.add(new_tc) - self._active_connections.add(old_tc) - new_tc.buddy_connected(old_tc) - old_tc.buddy_connected(new_tc) - return - if self._debug_log: - log.msg("transit relay 1: %s" % new_tc.describeToken()) - potentials.add((new_side, new_tc)) - # TODO: timer - def transitFinished(self, tc, token, side, description): if token in self._pending_requests: side_tc = (side, tc) From 66f39dca471f1eacc5df71998871488fb1a60865 Mon Sep 17 00:00:00 2001 From: meejah Date: Mon, 1 Feb 2021 16:55:15 -0700 Subject: [PATCH 03/30] count totals in state-machine --- src/wormhole_transit_relay/server_state.py | 7 ++++++- src/wormhole_transit_relay/transit_server.py | 10 +++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index cf95d1d..ffa0819 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -161,6 +161,7 @@ class TransitServerState(object): _side = None _first = None _mood = "empty" + _total_sent = 0 def __init__(self, pending_requests): self._pending_requests = pending_requests @@ -274,6 +275,10 @@ def _send_ok(self): def _send_impatient(self): self._client.send(b"impatient\n") + @_machine.output() + def _count_bytes(self, data): + self._total_sent += len(data) + @_machine.output() def _send(self, data): self._client.send(data) @@ -404,7 +409,7 @@ def done(self): wait_relay.upon( got_bytes, enter=done, - outputs=[_mood_errory, _disconnect], + outputs=[_count_bytes, _mood_errory, _disconnect], ) wait_relay.upon( connection_lost, diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index 9de3b1e..abd6406 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -42,9 +42,6 @@ class TransitConnection(LineReceiver): MAX_LENGTH = 1024 - def __init__(self): - self._total_sent = 0 - def send(self, data): """ ITransitClient API @@ -104,10 +101,10 @@ def lineReceived(self, line): side = new.group(2) return self._got_handshake(token, side) - # state-machine calls us via ITransitClient interface to do - # bad handshake etc. + # we should have been switched to "raw data" mode on the first + # line received (after which rawDataReceived() is called for + # all bytes) so getting here means a bad handshake. return self._state.bad_token() - #return self._state.got_bytes(line) def rawDataReceived(self, data): # We are an IPushProducer to our buddy's IConsumer, so they'll @@ -117,7 +114,6 @@ def rawDataReceived(self, data): # point the sender will only transmit data as fast as the # receiver can handle it. self._state.got_bytes(data) - self._total_sent += len(data) def _got_handshake(self, token, side): self._state.please_relay_for_side(token, side) From 1ab5e4ffb39e3fe68e92ecff62db7935292a0946 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 00:06:19 -0700 Subject: [PATCH 04/30] start of refactoring usage-recording: pass one test --- src/wormhole_transit_relay/server_state.py | 85 +++++++++++++++++-- .../test/test_transit_server.py | 8 +- src/wormhole_transit_relay/transit_server.py | 15 +++- 3 files changed, 90 insertions(+), 18 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index ffa0819..21b2464 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -1,13 +1,18 @@ +import time from collections import defaultdict import automat from zope.interface import ( Interface, + Attribute, implementer, ) class ITransitClient(Interface): + + started_time = Attribute("timestamp when the connection was established") + def send(data): """ Send some byets to the client @@ -34,6 +39,11 @@ def disconnect_partner(): class TestClient(object): _partner = None _data = b"" + _started_time = time.time() + + @property + def started_time(self): + return _started_time def send_to_partner(self, data): print("{} GOT:{}".format(id(self), repr(data))) @@ -57,6 +67,53 @@ def disconnect_partner(self): print("disconnect_partner: {}".format(id(self._partner))) +class UsageRecorder(object): + """ + Tracks usage statistics of connections + """ + + def record(self, started, buddy_started, result, bytes_sent, buddy_bytes): + """ + :param int started: timestamp when our connection started + + :param int buddy_started: None, or the timestamp when our + partner's connection started (will be None if we don't yet + have a partner). + + :param str result: a label for the result of the connection + (one of the "moods"). + + :param int bytes_sent: number of bytes we sent + + :param int buddy_bytes: number of bytes our partner sent + """ + + # ideally self._reactor.seconds() or similar, but .. + finished = time.time() + if buddy_started is not None: + starts = [started, buddy_started] + total_time = finished - min(starts) + waiting_time = max(starts) - min(starts) + total_bytes = bytes_sent + buddy_bytes + else: + total_time = finished - started + waiting_time = None + total_bytes = bytes_sent + # probably want like "backends" here or something? original + # code logs some JSON (maybe) and writes to a database (maybe) + # and tests record in memory. + self.json_record({ + "started": started, + "total_time": total_time, + "waiting_time": waiting_time, + "total_bytes": total_bytes, + "mood": result, + }) + + def json_record(self, data): + pass + + class ActiveConnections(object): """ Tracks active connections. A connection is 'active' when both @@ -163,8 +220,9 @@ class TransitServerState(object): _mood = "empty" _total_sent = 0 - def __init__(self, pending_requests): + def __init__(self, pending_requests, usage_recorder): self._pending_requests = pending_requests + self._usage = usage_recorder def get_token(self): """ @@ -300,6 +358,17 @@ def _disconnect(self): def _disconnect_partner(self): self._client.disconnect_partner() + # some outputs to record "usage" information .. + @_machine.output() + def _record_usage(self): + self._usage.record( + started=self._client.started_time, + buddy_started=self._buddy._client.started_time if self._buddy is not None else None, + result=self._mood, + bytes_sent=self._total_sent, + buddy_bytes=self._buddy._total_sent if self._buddy is not None else None + ) + # some outputs to record the "mood" .. @_machine.output() def _mood_happy(self): @@ -404,17 +473,17 @@ def done(self): wait_relay.upon( bad_token, enter=done, - outputs=[_mood_errory, _send_bad, _disconnect], + outputs=[_mood_errory, _send_bad, _disconnect, _record_usage], ) wait_relay.upon( got_bytes, enter=done, - outputs=[_count_bytes, _mood_errory, _disconnect], + outputs=[_count_bytes, _mood_errory, _disconnect, _record_usage], ) wait_relay.upon( connection_lost, enter=done, - outputs=[_disconnect], + outputs=[_disconnect, _record_usage], ) wait_partner.upon( @@ -425,12 +494,12 @@ def done(self): wait_partner.upon( connection_lost, enter=done, - outputs=[_mood_lonely, _unregister], + outputs=[_mood_lonely, _unregister, _record_usage], ) wait_partner.upon( got_bytes, enter=done, - outputs=[_mood_impatient, _send_impatient, _disconnect, _unregister], + outputs=[_mood_impatient, _send_impatient, _disconnect, _unregister, _record_usage], ) relaying.upon( @@ -441,12 +510,12 @@ def done(self): relaying.upon( connection_lost, enter=done, - outputs=[_mood_happy_if_first, _disconnect_partner, _unregister], + outputs=[_mood_happy_if_first, _disconnect_partner, _unregister, _record_usage], ) relaying.upon( partner_connection_lost, enter=done, - outputs=[_mood_happy_if_second, _disconnect, _unregister], + outputs=[_mood_happy_if_second, _disconnect, _unregister, _record_usage], ) done.upon( diff --git a/src/wormhole_transit_relay/test/test_transit_server.py b/src/wormhole_transit_relay/test/test_transit_server.py index a4763d9..0c9762f 100644 --- a/src/wormhole_transit_relay/test/test_transit_server.py +++ b/src/wormhole_transit_relay/test/test_transit_server.py @@ -311,10 +311,7 @@ class Usage(ServerBase, unittest.TestCase): def setUp(self): super(Usage, self).setUp() self._usage = [] - def record(started, result, total_bytes, total_time, waiting_time): - self._usage.append((started, result, total_bytes, - total_time, waiting_time)) - self._transit_server.recordUsage = record + self._transit_server.usage.json_record = self._usage.append def test_empty(self): p1 = self.new_protocol() @@ -334,8 +331,7 @@ def test_short(self): # that will log the "empty" usage event self.assertEqual(len(self._usage), 1, self._usage) - (started, result, total_bytes, total_time, waiting_time) = self._usage[0] - self.assertEqual(result, "empty", self._usage) + self.assertEqual("empty", self._usage[0]["mood"]) def test_errory(self): p1 = self.new_protocol() diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index abd6406..1727610 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -29,6 +29,7 @@ def blur_size(size): TransitServerState, PendingRequests, ActiveConnections, + UsageRecorder, ITransitClient, ) from zope.interface import implementer @@ -41,6 +42,7 @@ class TransitConnection(LineReceiver): # This must be >= to the longest possible handshake message. MAX_LENGTH = 1024 + started_time = None def send(self, data): """ @@ -78,9 +80,15 @@ def describeToken(self): return d def connectionMade(self): - self._state = TransitServerState(self.factory.pending_requests) + # ideally more like self._reactor.seconds() ... but Twisted + # doesn't have a good way to get the reactor for a protocol + # (besides "use the global one") + self.started_time = time.time() + self._state = TransitServerState( + self.factory.pending_requests, + self.factory.usage, + ) self._state.connection_made(self) - self._started = time.time() self._log_requests = self.factory._log_requests try: self.transport.setTcpKeepAlive(True) @@ -154,8 +162,6 @@ def disconnect_redundant(self): self.transport.loseConnection() def connectionLost(self, reason): - finished = time.time() - total_time = finished - self._started self._state.connection_lost() # XXX FIXME record usage @@ -249,6 +255,7 @@ class Transit(protocol.ServerFactory): def __init__(self, blur_usage, log_file, usage_db): self.active_connections = ActiveConnections() self.pending_requests = PendingRequests(self.active_connections) + self.usage = UsageRecorder() self._blur_usage = blur_usage self._log_requests = blur_usage is None if self._blur_usage: From 9dfc410aa8701c280ac15604800071e115eb2ade Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 00:35:52 -0700 Subject: [PATCH 05/30] fix more tests --- .../test/test_transit_server.py | 25 +++++++------------ 1 file changed, 9 insertions(+), 16 deletions(-) diff --git a/src/wormhole_transit_relay/test/test_transit_server.py b/src/wormhole_transit_relay/test/test_transit_server.py index 0c9762f..66e2da6 100644 --- a/src/wormhole_transit_relay/test/test_transit_server.py +++ b/src/wormhole_transit_relay/test/test_transit_server.py @@ -320,8 +320,7 @@ def test_empty(self): # that will log the "empty" usage event self.assertEqual(len(self._usage), 1, self._usage) - (started, result, total_bytes, total_time, waiting_time) = self._usage[0] - self.assertEqual(result, "empty", self._usage) + self.assertEqual(self._usage[0]["mood"], "empty", self._usage) def test_short(self): p1 = self.new_protocol() @@ -340,8 +339,7 @@ def test_errory(self): # that will log the "errory" usage event, then drop the connection p1.transport.loseConnection() self.assertEqual(len(self._usage), 1, self._usage) - (started, result, total_bytes, total_time, waiting_time) = self._usage[0] - self.assertEqual(result, "errory", self._usage) + self.assertEqual(self._usage[0]["mood"], "errory", self._usage) def test_lonely(self): p1 = self.new_protocol() @@ -353,9 +351,8 @@ def test_lonely(self): p1.transport.loseConnection() self.assertEqual(len(self._usage), 1, self._usage) - (started, result, total_bytes, total_time, waiting_time) = self._usage[0] - self.assertEqual(result, "lonely", self._usage) - self.assertIdentical(waiting_time, None) + self.assertEqual(self._usage[0]["mood"], "lonely", self._usage) + self.assertIdentical(self._usage[0]["waiting_time"], None) def test_one_happy_one_jilted(self): p1 = self.new_protocol() @@ -375,9 +372,8 @@ def test_one_happy_one_jilted(self): p1.transport.loseConnection() self.assertEqual(len(self._usage), 1, self._usage) - (started, result, total_bytes, total_time, waiting_time) = self._usage[0] - self.assertEqual(result, "happy", self._usage) - self.assertEqual(total_bytes, 20) + self.assertEqual(self._usage[0]["mood"], "happy", self._usage) + self.assertEqual(self._usage[0]["total_bytes"], 20) self.assertNotIdentical(waiting_time, None) def test_redundant(self): @@ -399,18 +395,15 @@ def test_redundant(self): p1c.transport.loseConnection() self.assertEqual(len(self._usage), 1, self._usage) - (started, result, total_bytes, total_time, waiting_time) = self._usage[0] - self.assertEqual(result, "lonely", self._usage) + self.assertEqual(self._usage[0]["mood"], "lonely") p2.dataReceived(handshake(token1, side=side2)) self.assertEqual(len(self._transit_server._pending_requests), 0) self.assertEqual(len(self._usage), 2, self._usage) - (started, result, total_bytes, total_time, waiting_time) = self._usage[1] - self.assertEqual(result, "redundant", self._usage) + self.assertEqual(self._usage[1]["mood"], "redundant") # one of the these is unecessary, but probably harmless p1a.transport.loseConnection() p1b.transport.loseConnection() self.assertEqual(len(self._usage), 3, self._usage) - (started, result, total_bytes, total_time, waiting_time) = self._usage[2] - self.assertEqual(result, "happy", self._usage) + self.assertEqual(self._usage[2]["mood"], "happy") From aa3c3d1f87fa4257de94e4669bdb9343f1205a19 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 00:36:15 -0700 Subject: [PATCH 06/30] try to make 'redudant' mood work --- src/wormhole_transit_relay/server_state.py | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index 21b2464..31458b2 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -185,7 +185,8 @@ def register_token(self, token, new_side, new_tc): # can happen if the connection hint contains multiple # addresses (we don't currently support those, but it'd # probably be useful in the future). - leftover_tc.disconnect_redundant() + ##leftover_tc.disconnect_redundant() + leftover_tc.partner_connection_lost() self._requests.pop(token, None) # glue the two ends together @@ -378,6 +379,10 @@ def _mood_happy(self): def _mood_lonely(self): self._mood = "lonely" + @_machine.output() + def _mood_redundant(self): + self._mood = "redundant" + @_machine.output() def _mood_impatient(self): self._mood = "impatient" @@ -501,6 +506,11 @@ def done(self): enter=done, outputs=[_mood_impatient, _send_impatient, _disconnect, _unregister, _record_usage], ) + wait_partner.upon( + partner_connection_lost, + enter=done, + outputs=[_mood_redundant, _disconnect, _record_usage], + ) relaying.upon( got_bytes, From aa7b6c3c7df4c2c04abfee16dea11e01107506ba Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 00:50:05 -0700 Subject: [PATCH 07/30] fix more tests (that examine internals) --- .../test/test_transit_server.py | 16 +++++++++------- src/wormhole_transit_relay/transit_server.py | 4 ++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/src/wormhole_transit_relay/test/test_transit_server.py b/src/wormhole_transit_relay/test/test_transit_server.py index 66e2da6..d4f86a1 100644 --- a/src/wormhole_transit_relay/test/test_transit_server.py +++ b/src/wormhole_transit_relay/test/test_transit_server.py @@ -13,9 +13,11 @@ def handshake(token, side=None): class _Transit: def count(self): - return sum([len(potentials) - for potentials - in self._transit_server._pending_requests.values()]) + return sum([ + len(potentials) + for potentials + in self._transit_server.pending_requests._requests.values() + ]) def test_blur_size(self): blur = transit_server.blur_size @@ -48,7 +50,7 @@ def test_register(self): self.assertEqual(self.count(), 0) # the token should be removed too - self.assertEqual(len(self._transit_server._pending_requests), 0) + self.assertEqual(len(self._transit_server.pending_requests._requests), 0) def test_both_unsided(self): p1 = self.new_protocol() @@ -168,8 +170,8 @@ def test_ignore_same_side(self): side2 = b"\x02"*8 p3.dataReceived(handshake(token1, side=side2)) self.assertEqual(self.count(), 0) - self.assertEqual(len(self._transit_server._pending_requests), 0) - self.assertEqual(len(self._transit_server._active_connections), 2) + self.assertEqual(len(self._transit_server.pending_requests._requests), 0) + self.assertEqual(len(self._transit_server.active_connections._connections), 2) # That will trigger a disconnect on exactly one of (p1 or p2). # The other connection should still be connected self.assertEqual(sum([int(t.transport.connected) for t in [p1, p2]]), 1) @@ -398,7 +400,7 @@ def test_redundant(self): self.assertEqual(self._usage[0]["mood"], "lonely") p2.dataReceived(handshake(token1, side=side2)) - self.assertEqual(len(self._transit_server._pending_requests), 0) + self.assertEqual(len(self._transit_server.pending_requests._requests), 0) self.assertEqual(len(self._usage), 2, self._usage) self.assertEqual(self._usage[1]["mood"], "redundant") diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index 1727610..fba84be 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -270,8 +270,8 @@ def __init__(self, blur_usage, log_file, usage_db): self._db = get_db(usage_db) self._rebooted = time.time() # we don't track TransitConnections until they submit a token - self._pending_requests = defaultdict(set) # token -> set((side, TransitConnection)) - self._active_connections = set() # TransitConnection +## self._pending_requests = defaultdict(set) # token -> set((side, TransitConnection)) +## self._active_connections = set() # TransitConnection def transitFinished(self, tc, token, side, description): if token in self._pending_requests: From effd1a70d758f8dfa4e320f5b6c7c32db5a4da35 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 01:09:16 -0700 Subject: [PATCH 08/30] skip usage-counting if we're jilted but other side is happy? --- src/wormhole_transit_relay/server_state.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index 31458b2..7cdcb70 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -362,6 +362,10 @@ def _disconnect_partner(self): # some outputs to record "usage" information .. @_machine.output() def _record_usage(self): + if self._mood == "jilted": + if self._buddy: + if self._buddy._mood == "happy": + return self._usage.record( started=self._client.started_time, buddy_started=self._buddy._client.started_time if self._buddy is not None else None, From 76d471cdde44dc0eac239f4ee0da443b6360b97b Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 01:09:39 -0700 Subject: [PATCH 09/30] count bytes missing --- src/wormhole_transit_relay/server_state.py | 2 +- src/wormhole_transit_relay/test/test_transit_server.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index 7cdcb70..690b2c5 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -519,7 +519,7 @@ def done(self): relaying.upon( got_bytes, enter=relaying, - outputs=[_send_to_partner], + outputs=[_count_bytes, _send_to_partner], ) relaying.upon( connection_lost, diff --git a/src/wormhole_transit_relay/test/test_transit_server.py b/src/wormhole_transit_relay/test/test_transit_server.py index d4f86a1..e5d9914 100644 --- a/src/wormhole_transit_relay/test/test_transit_server.py +++ b/src/wormhole_transit_relay/test/test_transit_server.py @@ -376,7 +376,7 @@ def test_one_happy_one_jilted(self): self.assertEqual(len(self._usage), 1, self._usage) self.assertEqual(self._usage[0]["mood"], "happy", self._usage) self.assertEqual(self._usage[0]["total_bytes"], 20) - self.assertNotIdentical(waiting_time, None) + self.assertNotIdentical(self._usage[0]["waiting_time"], None) def test_redundant(self): p1a = self.new_protocol() From 478a2eaedf2ec99e359d4e9b26bd00018b06db07 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 01:16:33 -0700 Subject: [PATCH 10/30] unregister completely --- src/wormhole_transit_relay/server_state.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index 690b2c5..0e7c446 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -153,8 +153,14 @@ def __init__(self, active_connections): self._active = active_connections def unregister(self, token, side, tc): + """ + We no longer care about a particular client (e.g. it has + disconnected). + """ if token in self._requests: self._requests[token].discard((side, tc)) + if not self._requests[token]: + del self._requests[token] self._active.unregister(tc) def register_token(self, token, new_side, new_tc): From 8d2e6abf2a5939d3a447c7f96773fb99a2ef5814 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 01:32:52 -0700 Subject: [PATCH 11/30] use 'backends' for usage-recording --- src/wormhole_transit_relay/server_state.py | 104 +++++++++++++++++- .../test/test_transit_server.py | 45 ++++---- 2 files changed, 124 insertions(+), 25 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index 0e7c446..b98615e 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -67,11 +67,105 @@ def disconnect_partner(self): print("disconnect_partner: {}".format(id(self._partner))) +class IUsageWriter(Interface): + """ + Records actual usage statistics in some way + """ + + def record_usage(started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None): + """ + :param int started: timestemp when this connection began + + :param float total_time: total seconds this connection lasted + + :param float waiting_time: None or the total seconds one side + waited for the other + + :param int total_bytes: the total bytes sent. In case the + connection was concluded successfully, only one side will + record the total bytes (but count both). + + :param str mood: the 'mood' of the connection + """ + + +@implementer(IUsageWriter) +class MemoryUsageRecorder: + + def __init__(self): + self.events = [] + + def record_usage(self, started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None): + """ + IUsageWriter. + """ + data = { + "started": started, + "total_time": total_time, + "waiting_time": waiting_time, + "total_bytes": total_bytes, + "mood": mood, + } + self.events.append(data) + + +@implementer(IUsageWriter) +class LogFileUsageRecorder: + + def __init__(self, writable_file): + self._file = writable_file + + def record_usage(self, started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None): + """ + IUsageWriter. + """ + data = { + "started": started, + "total_time": total_time, + "waiting_time": waiting_time, + "total_bytes": total_bytes, + "mood": mood, + } + self._file.write(json.dumps(data)) + + + +@implementer(IUsageWriter) +class DatabaseUsageRecorder: + + def __init__(self, _db): + self._db = db + + def record_usage(self, started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None): + """ + IUsageWriter. + """ + + class UsageRecorder(object): """ Tracks usage statistics of connections """ + def __init__(self): + self._backends = set() + + def add_backend(self, backend): + """ + Add a new backend. + + :param IUsageWriter backend: the backend to add + """ + self._backends.add(backend) + + def remove_backend(self, backend): + """ + Remove an existing backend + + :param IUsageWriter backend: the backend to remove + """ + self._backends.remove(backend) + def record(self, started, buddy_started, result, bytes_sent, buddy_bytes): """ :param int started: timestamp when our connection started @@ -102,7 +196,7 @@ def record(self, started, buddy_started, result, bytes_sent, buddy_bytes): # probably want like "backends" here or something? original # code logs some JSON (maybe) and writes to a database (maybe) # and tests record in memory. - self.json_record({ + self._notify_backends({ "started": started, "total_time": total_time, "waiting_time": waiting_time, @@ -110,8 +204,12 @@ def record(self, started, buddy_started, result, bytes_sent, buddy_bytes): "mood": result, }) - def json_record(self, data): - pass + def _notify_backends(self, data): + """ + Internal helper. Tell every backend we have about a new usage. + """ + for backend in self._backends: + backend.record_usage(**data) class ActiveConnections(object): diff --git a/src/wormhole_transit_relay/test/test_transit_server.py b/src/wormhole_transit_relay/test/test_transit_server.py index e5d9914..7656be5 100644 --- a/src/wormhole_transit_relay/test/test_transit_server.py +++ b/src/wormhole_transit_relay/test/test_transit_server.py @@ -3,6 +3,7 @@ from twisted.trial import unittest from .common import ServerBase from .. import transit_server +from ..server_state import MemoryUsageRecorder def handshake(token, side=None): hs = b"please relay " + hexlify(token) @@ -312,8 +313,8 @@ class TransitWithoutLogs(_Transit, ServerBase, unittest.TestCase): class Usage(ServerBase, unittest.TestCase): def setUp(self): super(Usage, self).setUp() - self._usage = [] - self._transit_server.usage.json_record = self._usage.append + self._usage = MemoryUsageRecorder() + self._transit_server.usage.add_backend(self._usage) def test_empty(self): p1 = self.new_protocol() @@ -321,8 +322,8 @@ def test_empty(self): p1.transport.loseConnection() # that will log the "empty" usage event - self.assertEqual(len(self._usage), 1, self._usage) - self.assertEqual(self._usage[0]["mood"], "empty", self._usage) + self.assertEqual(len(self._usage.events), 1, self._usage) + self.assertEqual(self._usage.events[0]["mood"], "empty", self._usage) def test_short(self): p1 = self.new_protocol() @@ -331,8 +332,8 @@ def test_short(self): p1.transport.loseConnection() # that will log the "empty" usage event - self.assertEqual(len(self._usage), 1, self._usage) - self.assertEqual("empty", self._usage[0]["mood"]) + self.assertEqual(len(self._usage.events), 1, self._usage) + self.assertEqual("empty", self._usage.events[0]["mood"]) def test_errory(self): p1 = self.new_protocol() @@ -340,8 +341,8 @@ def test_errory(self): p1.dataReceived(b"this is a very bad handshake\n") # that will log the "errory" usage event, then drop the connection p1.transport.loseConnection() - self.assertEqual(len(self._usage), 1, self._usage) - self.assertEqual(self._usage[0]["mood"], "errory", self._usage) + self.assertEqual(len(self._usage.events), 1, self._usage) + self.assertEqual(self._usage.events[0]["mood"], "errory", self._usage) def test_lonely(self): p1 = self.new_protocol() @@ -352,9 +353,9 @@ def test_lonely(self): # now we disconnect before the peer connects p1.transport.loseConnection() - self.assertEqual(len(self._usage), 1, self._usage) - self.assertEqual(self._usage[0]["mood"], "lonely", self._usage) - self.assertIdentical(self._usage[0]["waiting_time"], None) + self.assertEqual(len(self._usage.events), 1, self._usage) + self.assertEqual(self._usage.events[0]["mood"], "lonely", self._usage) + self.assertIdentical(self._usage.events[0]["waiting_time"], None) def test_one_happy_one_jilted(self): p1 = self.new_protocol() @@ -366,17 +367,17 @@ def test_one_happy_one_jilted(self): p1.dataReceived(handshake(token1, side=side1)) p2.dataReceived(handshake(token1, side=side2)) - self.assertEqual(self._usage, []) # no events yet + self.assertEqual(self._usage.events, []) # no events yet p1.dataReceived(b"\x00" * 13) p2.dataReceived(b"\xff" * 7) p1.transport.loseConnection() - self.assertEqual(len(self._usage), 1, self._usage) - self.assertEqual(self._usage[0]["mood"], "happy", self._usage) - self.assertEqual(self._usage[0]["total_bytes"], 20) - self.assertNotIdentical(self._usage[0]["waiting_time"], None) + self.assertEqual(len(self._usage.events), 1, self._usage) + self.assertEqual(self._usage.events[0]["mood"], "happy", self._usage) + self.assertEqual(self._usage.events[0]["total_bytes"], 20) + self.assertNotIdentical(self._usage.events[0]["waiting_time"], None) def test_redundant(self): p1a = self.new_protocol() @@ -396,16 +397,16 @@ def test_redundant(self): p1c.dataReceived(handshake(token1, side=side1)) p1c.transport.loseConnection() - self.assertEqual(len(self._usage), 1, self._usage) - self.assertEqual(self._usage[0]["mood"], "lonely") + self.assertEqual(len(self._usage.events), 1, self._usage) + self.assertEqual(self._usage.events[0]["mood"], "lonely") p2.dataReceived(handshake(token1, side=side2)) self.assertEqual(len(self._transit_server.pending_requests._requests), 0) - self.assertEqual(len(self._usage), 2, self._usage) - self.assertEqual(self._usage[1]["mood"], "redundant") + self.assertEqual(len(self._usage.events), 2, self._usage) + self.assertEqual(self._usage.events[1]["mood"], "redundant") # one of the these is unecessary, but probably harmless p1a.transport.loseConnection() p1b.transport.loseConnection() - self.assertEqual(len(self._usage), 3, self._usage) - self.assertEqual(self._usage[2]["mood"], "happy") + self.assertEqual(len(self._usage.events), 3, self._usage) + self.assertEqual(self._usage.events[2]["mood"], "happy") From 81ee2dc03086d171676bca04bff97faeacdbfed6 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 02:18:36 -0700 Subject: [PATCH 12/30] more stats / recording works --- src/wormhole_transit_relay/server_state.py | 45 +++++++++++++++---- src/wormhole_transit_relay/test/common.py | 6 +-- src/wormhole_transit_relay/test/test_stats.py | 40 ++++++++++++----- .../test/test_transit_server.py | 41 ++++++++++------- src/wormhole_transit_relay/transit_server.py | 26 +++++------ 5 files changed, 101 insertions(+), 57 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index b98615e..442adbf 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -1,4 +1,5 @@ import time +import json from collections import defaultdict import automat @@ -126,29 +127,54 @@ def record_usage(self, started=None, total_time=None, waiting_time=None, total_b "total_bytes": total_bytes, "mood": mood, } - self._file.write(json.dumps(data)) - + self._file.write(json.dumps(data) + "\n") + self._file.flush() @implementer(IUsageWriter) class DatabaseUsageRecorder: - def __init__(self, _db): + def __init__(self, db): self._db = db def record_usage(self, started=None, total_time=None, waiting_time=None, total_bytes=None, mood=None): """ IUsageWriter. """ + self._db.execute( + "INSERT INTO `usage`" + " (`started`, `total_time`, `waiting_time`," + " `total_bytes`, `result`)" + " VALUES (?,?,?,?,?)", + (started, total_time, waiting_time, total_bytes, mood) + ) + # XXX FIXME see comment in transit_server + #self._update_stats() + self._db.commit() + + +def round_to(size, coarseness): + return int(coarseness*(1+int((size-1)/coarseness))) + +def blur_size(size): + if size == 0: + return 0 + if size < 1e6: + return round_to(size, 10e3) + if size < 1e9: + return round_to(size, 1e6) + return round_to(size, 100e6) -class UsageRecorder(object): + +class UsageTracker(object): """ Tracks usage statistics of connections """ - def __init__(self): + def __init__(self, blur_usage): self._backends = set() + self._blur_usage = blur_usage def add_backend(self, backend): """ @@ -181,7 +207,6 @@ def record(self, started, buddy_started, result, bytes_sent, buddy_bytes): :param int buddy_bytes: number of bytes our partner sent """ - # ideally self._reactor.seconds() or similar, but .. finished = time.time() if buddy_started is not None: @@ -193,9 +218,11 @@ def record(self, started, buddy_started, result, bytes_sent, buddy_bytes): total_time = finished - started waiting_time = None total_bytes = bytes_sent - # probably want like "backends" here or something? original - # code logs some JSON (maybe) and writes to a database (maybe) - # and tests record in memory. + + if self._blur_usage: + started = self._blur_usage * (started // self._blur_usage) + total_bytes = blur_size(total_bytes) + self._notify_backends({ "started": started, "total_time": total_time, diff --git a/src/wormhole_transit_relay/test/common.py b/src/wormhole_transit_relay/test/common.py index 53958fb..a8963fc 100644 --- a/src/wormhole_transit_relay/test/common.py +++ b/src/wormhole_transit_relay/test/common.py @@ -6,11 +6,7 @@ class ServerBase: def setUp(self): self._lp = None - if self.log_requests: - blur_usage = None - else: - blur_usage = 60.0 - self._setup_relay(blur_usage=blur_usage) + self._setup_relay(blur_usage=60.0 if self.log_requests else None) self._transit_server._debug_log = self.log_requests def _setup_relay(self, blur_usage=None, log_file=None, usage_db=None): diff --git a/src/wormhole_transit_relay/test/test_stats.py b/src/wormhole_transit_relay/test/test_stats.py index 43b912f..3c86897 100644 --- a/src/wormhole_transit_relay/test/test_stats.py +++ b/src/wormhole_transit_relay/test/test_stats.py @@ -19,9 +19,10 @@ def test_db(self): with mock.patch("time.time", return_value=T+0): t = Transit(blur_usage=None, log_file=None, usage_db=usage_db) db = self.open_db(usage_db) + usage = list(t.usage._backends)[0] with mock.patch("time.time", return_value=T+1): - t.recordUsage(started=123, result="happy", total_bytes=100, + usage.record_usage(started=123, mood="happy", total_bytes=100, total_time=10, waiting_time=2) self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(), [dict(result="happy", started=123, @@ -33,7 +34,7 @@ def test_db(self): waiting=0, connected=0)) with mock.patch("time.time", return_value=T+2): - t.recordUsage(started=150, result="errory", total_bytes=200, + usage.record_usage(started=150, mood="errory", total_bytes=200, total_time=11, waiting_time=3) self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(), [dict(result="happy", started=123, @@ -55,18 +56,22 @@ def test_db(self): def test_no_db(self): t = Transit(blur_usage=None, log_file=None, usage_db=None) + self.assertEqual(0, len(t.usage._backends)) - t.recordUsage(started=123, result="happy", total_bytes=100, - total_time=10, waiting_time=2) - t.timerUpdateStats() class LogToStdout(unittest.TestCase): def test_log(self): # emit lines of JSON to log_file, if set log_file = io.StringIO() t = Transit(blur_usage=None, log_file=log_file, usage_db=None) - t.recordUsage(started=123, result="happy", total_bytes=100, - total_time=10, waiting_time=2) + with mock.patch("time.time", return_value=133): + t.usage.record( + started=123, + buddy_started=125, + result="happy", + bytes_sent=100, + buddy_bytes=0, + ) self.assertEqual(json.loads(log_file.getvalue()), {"started": 123, "total_time": 10, "waiting_time": 2, "total_bytes": 100, @@ -77,8 +82,16 @@ def test_log_blurred(self): # requested amount, and sizes should be rounded up too log_file = io.StringIO() t = Transit(blur_usage=60, log_file=log_file, usage_db=None) - t.recordUsage(started=123, result="happy", total_bytes=11999, - total_time=10, waiting_time=2) + + with mock.patch("time.time", return_value=123 + 10): + t.usage.record( + started=123, + buddy_started=125, + result="happy", + bytes_sent=11999, + buddy_bytes=8001, + ) + print(log_file.getvalue()) self.assertEqual(json.loads(log_file.getvalue()), {"started": 120, "total_time": 10, "waiting_time": 2, "total_bytes": 20000, @@ -86,5 +99,10 @@ def test_log_blurred(self): def test_do_not_log(self): t = Transit(blur_usage=60, log_file=None, usage_db=None) - t.recordUsage(started=123, result="happy", total_bytes=11999, - total_time=10, waiting_time=2) + t.usage.record( + started=123, + buddy_started=124, + result="happy", + bytes_sent=11999, + buddy_bytes=12, + ) diff --git a/src/wormhole_transit_relay/test/test_transit_server.py b/src/wormhole_transit_relay/test/test_transit_server.py index 7656be5..e9920c0 100644 --- a/src/wormhole_transit_relay/test/test_transit_server.py +++ b/src/wormhole_transit_relay/test/test_transit_server.py @@ -3,7 +3,10 @@ from twisted.trial import unittest from .common import ServerBase from .. import transit_server -from ..server_state import MemoryUsageRecorder +from ..server_state import ( + MemoryUsageRecorder, + blur_size, +) def handshake(token, side=None): hs = b"please relay " + hexlify(token) @@ -21,22 +24,21 @@ def count(self): ]) def test_blur_size(self): - blur = transit_server.blur_size - self.failUnlessEqual(blur(0), 0) - self.failUnlessEqual(blur(1), 10e3) - self.failUnlessEqual(blur(10e3), 10e3) - self.failUnlessEqual(blur(10e3+1), 20e3) - self.failUnlessEqual(blur(15e3), 20e3) - self.failUnlessEqual(blur(20e3), 20e3) - self.failUnlessEqual(blur(1e6), 1e6) - self.failUnlessEqual(blur(1e6+1), 2e6) - self.failUnlessEqual(blur(1.5e6), 2e6) - self.failUnlessEqual(blur(2e6), 2e6) - self.failUnlessEqual(blur(900e6), 900e6) - self.failUnlessEqual(blur(1000e6), 1000e6) - self.failUnlessEqual(blur(1050e6), 1100e6) - self.failUnlessEqual(blur(1100e6), 1100e6) - self.failUnlessEqual(blur(1150e6), 1200e6) + self.failUnlessEqual(blur_size(0), 0) + self.failUnlessEqual(blur_size(1), 10e3) + self.failUnlessEqual(blur_size(10e3), 10e3) + self.failUnlessEqual(blur_size(10e3+1), 20e3) + self.failUnlessEqual(blur_size(15e3), 20e3) + self.failUnlessEqual(blur_size(20e3), 20e3) + self.failUnlessEqual(blur_size(1e6), 1e6) + self.failUnlessEqual(blur_size(1e6+1), 2e6) + self.failUnlessEqual(blur_size(1.5e6), 2e6) + self.failUnlessEqual(blur_size(2e6), 2e6) + self.failUnlessEqual(blur_size(900e6), 900e6) + self.failUnlessEqual(blur_size(1000e6), 1000e6) + self.failUnlessEqual(blur_size(1050e6), 1100e6) + self.failUnlessEqual(blur_size(1100e6), 1100e6) + self.failUnlessEqual(blur_size(1150e6), 1200e6) def test_register(self): p1 = self.new_protocol() @@ -304,17 +306,22 @@ def test_empty_handshake(self): # hang up before sending anything p1.transport.loseConnection() + class TransitWithLogs(_Transit, ServerBase, unittest.TestCase): log_requests = True + class TransitWithoutLogs(_Transit, ServerBase, unittest.TestCase): log_requests = False + class Usage(ServerBase, unittest.TestCase): + def setUp(self): super(Usage, self).setUp() self._usage = MemoryUsageRecorder() self._transit_server.usage.add_backend(self._usage) +## self._transit_server.usage._blur_usage = None def test_empty(self): p1 = self.new_protocol() diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index fba84be..ee71b06 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -12,24 +12,14 @@ DAY = 24*HOUR MB = 1000*1000 -def round_to(size, coarseness): - return int(coarseness*(1+int((size-1)/coarseness))) - -def blur_size(size): - if size == 0: - return 0 - if size < 1e6: - return round_to(size, 10e3) - if size < 1e9: - return round_to(size, 1e6) - return round_to(size, 100e6) - from wormhole_transit_relay.server_state import ( TransitServerState, PendingRequests, ActiveConnections, - UsageRecorder, + UsageTracker, + DatabaseUsageRecorder, + LogFileUsageRecorder, ITransitClient, ) from zope.interface import implementer @@ -255,7 +245,7 @@ class Transit(protocol.ServerFactory): def __init__(self, blur_usage, log_file, usage_db): self.active_connections = ActiveConnections() self.pending_requests = PendingRequests(self.active_connections) - self.usage = UsageRecorder() + self.usage = UsageTracker(blur_usage) self._blur_usage = blur_usage self._log_requests = blur_usage is None if self._blur_usage: @@ -264,10 +254,13 @@ def __init__(self, blur_usage, log_file, usage_db): else: log.msg("not blurring access times") self._debug_log = False - self._log_file = log_file +## self._log_file = log_file self._db = None if usage_db: self._db = get_db(usage_db) + self.usage.add_backend(DatabaseUsageRecorder(self._db)) + if log_file: + self.usage.add_backend(LogFileUsageRecorder(log_file)) self._rebooted = time.time() # we don't track TransitConnections until they submit a token ## self._pending_requests = defaultdict(set) # token -> set((side, TransitConnection)) @@ -317,6 +310,9 @@ def recordUsage(self, started, result, total_bytes, " VALUES (?,?,?, ?,?)", (started, total_time, waiting_time, total_bytes, result)) + # XXXX aaaaaAA! okay, so just this one type of usage also + # does some other random stats-stuff; need more + # refactorizing self._update_stats() self._db.commit() From 8045892b0380b26270f86d230b031b1c7537cb29 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 16:35:20 -0700 Subject: [PATCH 13/30] cleanup, remove dead code --- src/wormhole_transit_relay/server_state.py | 67 +++++++- src/wormhole_transit_relay/server_tap.py | 22 ++- src/wormhole_transit_relay/test/common.py | 9 +- .../test/test_service.py | 6 +- src/wormhole_transit_relay/test/test_stats.py | 11 +- src/wormhole_transit_relay/transit_server.py | 153 ++++++------------ 6 files changed, 137 insertions(+), 131 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index 442adbf..1be9146 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -8,6 +8,7 @@ Attribute, implementer, ) +from .database import get_db class ITransitClient(Interface): @@ -167,12 +168,41 @@ def blur_size(size): return round_to(size, 100e6) +def create_usage_tracker(blur_usage, log_file, usage_db): + """ + :param int blur_usage: see UsageTracker + + :param log_file: None or a file-like object to write JSON-encoded + lines of usage information to. + + :param usage_db: None or an sqlite3 database connection + + :returns: a new UsageTracker instance configured with backends. + """ + tracker = UsageTracker(blur_usage) + if usage_db: + db = get_db(usage_db) + tracker.add_backend(DatabaseUsageRecorder(db)) + if log_file: + tracker.add_backend(LogFileUsageRecorder(log_file)) + return tracker + + + + class UsageTracker(object): """ Tracks usage statistics of connections """ def __init__(self, blur_usage): + """ + :param int blur_usage: None or the number of seconds to use as a + window around which to blur time statistics (e.g. "60" means times + will be rounded to 1 minute intervals). When blur_usage is + non-zero, sizes will also be rounded into buckets of "one + megabyte", "one gigabyte" or "lots" + """ self._backends = set() self._blur_usage = blur_usage @@ -223,6 +253,9 @@ def record(self, started, buddy_started, result, bytes_sent, buddy_bytes): started = self._blur_usage * (started // self._blur_usage) total_bytes = blur_size(total_bytes) + # This is "a dict" instead of "kwargs" because we have to make + # it into a dict for the log use-case and in-memory/testing + # use-case anyway so this is less repeats of the names. self._notify_backends({ "started": started, "total_time": total_time, @@ -233,7 +266,7 @@ def record(self, started, buddy_started, result, bytes_sent, buddy_bytes): def _notify_backends(self, data): """ - Internal helper. Tell every backend we have about a new usage. + Internal helper. Tell every backend we have about a new usage record. """ for backend in self._backends: backend.record_usage(**data) @@ -241,8 +274,11 @@ def _notify_backends(self, data): class ActiveConnections(object): """ - Tracks active connections. A connection is 'active' when both - sides have shown up and they are glued together. + Tracks active connections. + + A connection is 'active' when both sides have shown up and they + are glued together (and thus could be passing data back and forth + if any is flowing). """ def __init__(self): self._connections = set() @@ -268,12 +304,20 @@ def unregister(self, side): class PendingRequests(object): """ - Tracks the tokens we have received from client connections and - maps them to their partner connections for requests that haven't - yet been 'glued together' (that is, one side hasn't yet shown up). + Tracks outstanding (non-"active") requests. + + We register client connections against the tokens we have + received. When the other side shows up we can thus match it to the + correct partner connection. At this point, the connection becomes + "active" is and is thus no longer "pending" and so will no longer + be in this collection. """ def __init__(self, active_connections): + """ + :param active_connections: an instance of ActiveConnections where + connections are put when both sides arrive. + """ self._requests = defaultdict(set) # token -> set((side, TransitConnection)) self._active = active_connections @@ -285,16 +329,23 @@ def unregister(self, token, side, tc): if token in self._requests: self._requests[token].discard((side, tc)) if not self._requests[token]: + # no more sides; token is dead del self._requests[token] self._active.unregister(tc) - def register_token(self, token, new_side, new_tc): + def register(self, token, new_side, new_tc): """ A client has connected and successfully offered a token (and optional 'side' token). If this is the first one for this token, we merely remember it. If it is the second side for this token we connect them together. + :param bytes token: the token for this connection. + + :param bytes new_side: None or the side token for this connection + + :param TransitServerState new_tc: the state-machine of the connection + :returns bool: True if we are the first side to register this token """ @@ -562,7 +613,7 @@ def _real_register_token_for_side(self, token, side): """ self._token = token self._side = side - self._first = self._pending_requests.register_token(token, side, self) + self._first = self._pending_requests.register(token, side, self) @_machine.state(initial=True) def listening(self): diff --git a/src/wormhole_transit_relay/server_tap.py b/src/wormhole_transit_relay/server_tap.py index 8fbfde2..cbf3efa 100644 --- a/src/wormhole_transit_relay/server_tap.py +++ b/src/wormhole_transit_relay/server_tap.py @@ -6,6 +6,7 @@ StreamServerEndpointService) from twisted.internet import endpoints from . import transit_server +from .server_state import create_usage_tracker from .increase_rlimits import increase_rlimits LONGDESC = """\ @@ -32,13 +33,18 @@ def opt_blur_usage(self, arg): def makeService(config, reactor=reactor): increase_rlimits() ep = endpoints.serverFromString(reactor, config["port"]) # to listen - log_file = (os.fdopen(int(config["log-fd"]), "w") - if config["log-fd"] is not None - else None) - f = transit_server.Transit(blur_usage=config["blur-usage"], - log_file=log_file, - usage_db=config["usage-db"]) + log_file = ( + os.fdopen(int(config["log-fd"]), "w") + if config["log-fd"] is not None + else None + ) + usage = create_usage_tracker( + blur_usage=config["blur-usage"], + log_file=log_file, + usage_db=config["usage-db"], + ) + factory = transit_server.Transit(usage) parent = MultiService() - StreamServerEndpointService(ep, f).setServiceParent(parent) - TimerService(5*60.0, f.timerUpdateStats).setServiceParent(parent) + StreamServerEndpointService(ep, factory).setServiceParent(parent) +### FIXME TODO TimerService(5*60.0, factory.timerUpdateStats).setServiceParent(parent) return parent diff --git a/src/wormhole_transit_relay/test/common.py b/src/wormhole_transit_relay/test/common.py index a8963fc..c51cb90 100644 --- a/src/wormhole_transit_relay/test/common.py +++ b/src/wormhole_transit_relay/test/common.py @@ -1,5 +1,6 @@ from twisted.test import proto_helpers from ..transit_server import Transit +from ..server_state import create_usage_tracker class ServerBase: log_requests = False @@ -10,8 +11,12 @@ def setUp(self): self._transit_server._debug_log = self.log_requests def _setup_relay(self, blur_usage=None, log_file=None, usage_db=None): - self._transit_server = Transit(blur_usage=blur_usage, - log_file=log_file, usage_db=usage_db) + usage = create_usage_tracker( + blur_usage=blur_usage, + log_file=log_file, + usage_db=usage_db, + ) + self._transit_server = Transit(usage) def new_protocol(self): protocol = self._transit_server.buildProtocol(('127.0.0.1', 0)) diff --git a/src/wormhole_transit_relay/test/test_service.py b/src/wormhole_transit_relay/test/test_service.py index f72765c..197d376 100644 --- a/src/wormhole_transit_relay/test/test_service.py +++ b/src/wormhole_transit_relay/test/test_service.py @@ -8,7 +8,7 @@ class Service(unittest.TestCase): def test_defaults(self): o = server_tap.Options() o.parseOptions([]) - with mock.patch("wormhole_transit_relay.server_tap.transit_server.Transit") as t: + with mock.patch("wormhole_transit_relay.server_tap.create_usage_tracker") as t: s = server_tap.makeService(o) self.assertEqual(t.mock_calls, [mock.call(blur_usage=None, @@ -18,7 +18,7 @@ def test_defaults(self): def test_blur(self): o = server_tap.Options() o.parseOptions(["--blur-usage=60"]) - with mock.patch("wormhole_transit_relay.server_tap.transit_server.Transit") as t: + with mock.patch("wormhole_transit_relay.server_tap.create_usage_tracker") as t: server_tap.makeService(o) self.assertEqual(t.mock_calls, [mock.call(blur_usage=60, @@ -28,7 +28,7 @@ def test_log_fd(self): o = server_tap.Options() o.parseOptions(["--log-fd=99"]) fd = object() - with mock.patch("wormhole_transit_relay.server_tap.transit_server.Transit") as t: + with mock.patch("wormhole_transit_relay.server_tap.create_usage_tracker") as t: with mock.patch("wormhole_transit_relay.server_tap.os.fdopen", return_value=fd) as f: server_tap.makeService(o) diff --git a/src/wormhole_transit_relay/test/test_stats.py b/src/wormhole_transit_relay/test/test_stats.py index 3c86897..93bdbd3 100644 --- a/src/wormhole_transit_relay/test/test_stats.py +++ b/src/wormhole_transit_relay/test/test_stats.py @@ -3,6 +3,7 @@ from unittest import mock from twisted.trial import unittest from ..transit_server import Transit +from ..server_state import create_usage_tracker from .. import database class DB(unittest.TestCase): @@ -17,7 +18,7 @@ def test_db(self): os.mkdir(d) usage_db = os.path.join(d, "usage.sqlite") with mock.patch("time.time", return_value=T+0): - t = Transit(blur_usage=None, log_file=None, usage_db=usage_db) + t = Transit(create_usage_tracker(blur_usage=None, log_file=None, usage_db=usage_db)) db = self.open_db(usage_db) usage = list(t.usage._backends)[0] @@ -55,7 +56,7 @@ def test_db(self): waiting=0, connected=0)) def test_no_db(self): - t = Transit(blur_usage=None, log_file=None, usage_db=None) + t = Transit(create_usage_tracker(blur_usage=None, log_file=None, usage_db=None)) self.assertEqual(0, len(t.usage._backends)) @@ -63,7 +64,7 @@ class LogToStdout(unittest.TestCase): def test_log(self): # emit lines of JSON to log_file, if set log_file = io.StringIO() - t = Transit(blur_usage=None, log_file=log_file, usage_db=None) + t = Transit(create_usage_tracker(blur_usage=None, log_file=log_file, usage_db=None)) with mock.patch("time.time", return_value=133): t.usage.record( started=123, @@ -81,7 +82,7 @@ def test_log_blurred(self): # if blurring is enabled, timestamps should be rounded to the # requested amount, and sizes should be rounded up too log_file = io.StringIO() - t = Transit(blur_usage=60, log_file=log_file, usage_db=None) + t = Transit(create_usage_tracker(blur_usage=60, log_file=log_file, usage_db=None)) with mock.patch("time.time", return_value=123 + 10): t.usage.record( @@ -98,7 +99,7 @@ def test_log_blurred(self): "mood": "happy"}) def test_do_not_log(self): - t = Transit(blur_usage=60, log_file=None, usage_db=None) + t = Transit(create_usage_tracker(blur_usage=60, log_file=None, usage_db=None)) t.usage.record( started=123, buddy_started=124, diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index ee71b06..a897bd4 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -4,7 +4,6 @@ from twisted.python import log from twisted.internet import protocol from twisted.protocols.basic import LineReceiver -from .database import get_db SECONDS = 1.0 MINUTE = 60*SECONDS @@ -79,7 +78,7 @@ def connectionMade(self): self.factory.usage, ) self._state.connection_made(self) - self._log_requests = self.factory._log_requests +## self._log_requests = self.factory._log_requests try: self.transport.setTcpKeepAlive(True) except AttributeError: @@ -131,8 +130,8 @@ def __buddy_connected(self, them): # there will be two producer/consumer pairs. def __buddy_disconnected(self): - if self._log_requests: - log.msg("buddy_disconnected %s" % self.describeToken()) +## if self._log_requests: +## log.msg("buddy_disconnected %s" % self.describeToken()) self._buddy = None self._mood = "jilted" self.transport.loseConnection() @@ -210,117 +209,61 @@ def connectionLost(self, reason): class Transit(protocol.ServerFactory): - # I manage pairs of simultaneous connections to a secondary TCP port, - # both forwarded to the other. Clients must begin each connection with - # "please relay TOKEN for SIDE\n" (or a legacy form without the "for - # SIDE"). Two connections match if they use the same TOKEN and have - # different SIDEs (the redundant connections are dropped when a match is - # made). Legacy connections match any with the same TOKEN, ignoring SIDE - # (so two legacy connections will match each other). - - # I will send "ok\n" when the matching connection is established, or - # disconnect if no matching connection is made within MAX_WAIT_TIME - # seconds. I will disconnect if you send data before the "ok\n". All data - # you get after the "ok\n" will be from the other side. You will not - # receive "ok\n" until the other side has also connected and submitted a - # matching token (and differing SIDE). - - # In addition, the connections will be dropped after MAXLENGTH bytes have - # been sent by either side, or MAXTIME seconds have elapsed after the - # matching connections were established. A future API will reveal these - # limits to clients instead of causing mysterious spontaneous failures. - - # These relay connections are not half-closeable (unlike full TCP - # connections, applications will not receive any data after half-closing - # their outgoing side). Applications must negotiate shutdown with their - # peer and not close the connection until all data has finished - # transferring in both directions. Applications which only need to send - # data in one direction can use close() as usual. - + """ + I manage pairs of simultaneous connections to a secondary TCP port, + both forwarded to the other. Clients must begin each connection with + "please relay TOKEN for SIDE\n" (or a legacy form without the "for + SIDE"). Two connections match if they use the same TOKEN and have + different SIDEs (the redundant connections are dropped when a match is + made). Legacy connections match any with the same TOKEN, ignoring SIDE + (so two legacy connections will match each other). + + I will send "ok\n" when the matching connection is established, or + disconnect if no matching connection is made within MAX_WAIT_TIME + seconds. I will disconnect if you send data before the "ok\n". All data + you get after the "ok\n" will be from the other side. You will not + receive "ok\n" until the other side has also connected and submitted a + matching token (and differing SIDE). + + In addition, the connections will be dropped after MAXLENGTH bytes have + been sent by either side, or MAXTIME seconds have elapsed after the + matching connections were established. A future API will reveal these + limits to clients instead of causing mysterious spontaneous failures. + + These relay connections are not half-closeable (unlike full TCP + connections, applications will not receive any data after half-closing + their outgoing side). Applications must negotiate shutdown with their + peer and not close the connection until all data has finished + transferring in both directions. Applications which only need to send + data in one direction can use close() as usual. + """ + + # TODO: unused MAX_WAIT_TIME = 30*SECONDS + # TODO: unused MAXLENGTH = 10*MB + # TODO: unused MAXTIME = 60*SECONDS protocol = TransitConnection - def __init__(self, blur_usage, log_file, usage_db): + def __init__(self, usage): self.active_connections = ActiveConnections() self.pending_requests = PendingRequests(self.active_connections) - self.usage = UsageTracker(blur_usage) - self._blur_usage = blur_usage - self._log_requests = blur_usage is None - if self._blur_usage: - log.msg("blurring access times to %d seconds" % self._blur_usage) - log.msg("not logging Transit connections to Twisted log") - else: - log.msg("not blurring access times") + self.usage = usage + if False: + # these logs-message should be made by the usage-tracker + # .. or in the "tap" setup? + if blur_usage: + log.msg("blurring access times to %d seconds" % self._blur_usage) + log.msg("not logging Transit connections to Twisted log") + else: + log.msg("not blurring access times") self._debug_log = False -## self._log_file = log_file - self._db = None - if usage_db: - self._db = get_db(usage_db) - self.usage.add_backend(DatabaseUsageRecorder(self._db)) - if log_file: - self.usage.add_backend(LogFileUsageRecorder(log_file)) + self._rebooted = time.time() - # we don't track TransitConnections until they submit a token -## self._pending_requests = defaultdict(set) # token -> set((side, TransitConnection)) -## self._active_connections = set() # TransitConnection - - def transitFinished(self, tc, token, side, description): - if token in self._pending_requests: - side_tc = (side, tc) - self._pending_requests[token].discard(side_tc) - if not self._pending_requests[token]: # set is now empty - del self._pending_requests[token] - if self._debug_log: - log.msg("transitFinished %s" % (description,)) - self._active_connections.discard(tc) - # we could update the usage database "current" row immediately, or wait - # until the 5-minute timer updates it. If we update it now, just after - # losing a connection, we should probably also update it just after - # establishing one (at the end of connection_got_token). For now I'm - # going to omit these, but maybe someday we'll turn them both on. The - # consequence is that a manual execution of the munin scripts ("munin - # run wormhole_transit_active") will give the wrong value just after a - # connect/disconnect event. Actual munin graphs should accurately - # report connections that last longer than the 5-minute sampling - # window, which is what we actually care about. - #self.timerUpdateStats() - - def recordUsage(self, started, result, total_bytes, - total_time, waiting_time): - if self._debug_log: - log.msg(format="Transit.recordUsage {bytes}B", bytes=total_bytes) - if self._blur_usage: - started = self._blur_usage * (started // self._blur_usage) - total_bytes = blur_size(total_bytes) - if self._log_file is not None: - data = {"started": started, - "total_time": total_time, - "waiting_time": waiting_time, - "total_bytes": total_bytes, - "mood": result, - } - self._log_file.write(json.dumps(data)+"\n") - self._log_file.flush() - if self._db: - self._db.execute("INSERT INTO `usage`" - " (`started`, `total_time`, `waiting_time`," - " `total_bytes`, `result`)" - " VALUES (?,?,?, ?,?)", - (started, total_time, waiting_time, - total_bytes, result)) - # XXXX aaaaaAA! okay, so just this one type of usage also - # does some other random stats-stuff; need more - # refactorizing - self._update_stats() - self._db.commit() - - def timerUpdateStats(self): - if self._db: - self._update_stats() - self._db.commit() + # XXX TODO self._rebooted and the below could be in a separate + # object? or in the DatabaseUsageRecorder .. but not here def _update_stats(self): # current status: should be zero when idle rebooted = self._rebooted From 651d12f7ea1c8344af301ec4f161c6583f1724cb Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 16:57:16 -0700 Subject: [PATCH 14/30] we never remove backends --- src/wormhole_transit_relay/server_state.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index 1be9146..e2923e8 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -214,14 +214,6 @@ def add_backend(self, backend): """ self._backends.add(backend) - def remove_backend(self, backend): - """ - Remove an existing backend - - :param IUsageWriter backend: the backend to remove - """ - self._backends.remove(backend) - def record(self, started, buddy_started, result, bytes_sent, buddy_bytes): """ :param int started: timestamp when our connection started From 5552a75575de7959120da40cc87a3cbe473c9fbe Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 16:57:49 -0700 Subject: [PATCH 15/30] remove old test-code --- src/wormhole_transit_relay/server_state.py | 73 ---------------------- 1 file changed, 73 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index e2923e8..e5c7bc3 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -37,38 +37,6 @@ def disconnect_partner(): """ -@implementer(ITransitClient) -class TestClient(object): - _partner = None - _data = b"" - _started_time = time.time() - - @property - def started_time(self): - return _started_time - - def send_to_partner(self, data): - print("{} GOT:{}".format(id(self), repr(data))) - if self._partner: - self._partner._client.send(data) - - def send(self, data): - print("{} SEND:{}".format(id(self), repr(data))) - self._data += data - - def disconnect(self): - print("disconnect") - - def connect_partner(self, other): - print("connect_partner: {} <--> {}".format(id(self), id(other))) - assert self._partner is None, "double partner" - self._partner = other - - def disconnect_partner(self): - assert self._partner is not None, "no partner" - print("disconnect_partner: {}".format(id(self._partner))) - - class IUsageWriter(Interface): """ Records actual usage statistics in some way @@ -716,44 +684,3 @@ def done(self): enter=done, outputs=[], ) - - - - -# actions: -# - send("ok") -# - send("bad handshake") -# - disconnect -# - ... - -if __name__ == "__main__": - active = ActiveConnections() - pending = PendingRequests(active) - - server0 = TransitServerState(pending) - client0 = TestClient() - server1 = TransitServerState(pending) - client1 = TestClient() - server0.connection_made(client0) - server0.please_relay(b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") - - # this would be an error, because our partner hasn't shown up yet - # print(server0.got_bytes(b"asdf")) - - print("about to relay client1") - server1.connection_made(client1) - server1.please_relay(b"bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") - print("done") - - # XXX the PendingRequests stuff should do this, going "by hand" for now -# server0.got_partner(client1) -# server1.got_partner(client0) - - # should be connected now - server0.got_bytes(b"asdf") - # client1 should receive b"asdf" - - server0.connection_lost() - print("----[ received data on both sides ]----") - print("client0:{}".format(repr(client0._data))) - print("client1:{}".format(repr(client1._data))) From ddaadddadaebd3aa40486539e666d0b34303aeb7 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 17:39:55 -0700 Subject: [PATCH 16/30] pass actual database, not config --- src/wormhole_transit_relay/server_state.py | 5 +---- src/wormhole_transit_relay/server_tap.py | 3 ++- src/wormhole_transit_relay/test/test_stats.py | 9 +++------ 3 files changed, 6 insertions(+), 11 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index e5c7bc3..9f83736 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -149,15 +149,12 @@ def create_usage_tracker(blur_usage, log_file, usage_db): """ tracker = UsageTracker(blur_usage) if usage_db: - db = get_db(usage_db) - tracker.add_backend(DatabaseUsageRecorder(db)) + tracker.add_backend(DatabaseUsageRecorder(usage_db)) if log_file: tracker.add_backend(LogFileUsageRecorder(log_file)) return tracker - - class UsageTracker(object): """ Tracks usage statistics of connections diff --git a/src/wormhole_transit_relay/server_tap.py b/src/wormhole_transit_relay/server_tap.py index cbf3efa..7f89409 100644 --- a/src/wormhole_transit_relay/server_tap.py +++ b/src/wormhole_transit_relay/server_tap.py @@ -38,10 +38,11 @@ def makeService(config, reactor=reactor): if config["log-fd"] is not None else None ) + db = None if config["usage-db"] is None else get_db(config["usage-db"]) usage = create_usage_tracker( blur_usage=config["blur-usage"], log_file=log_file, - usage_db=config["usage-db"], + usage_db=db, ) factory = transit_server.Transit(usage) parent = MultiService() diff --git a/src/wormhole_transit_relay/test/test_stats.py b/src/wormhole_transit_relay/test/test_stats.py index 93bdbd3..df574d0 100644 --- a/src/wormhole_transit_relay/test/test_stats.py +++ b/src/wormhole_transit_relay/test/test_stats.py @@ -7,19 +7,16 @@ from .. import database class DB(unittest.TestCase): - def open_db(self, dbfile): - db = sqlite3.connect(dbfile) - database._initialize_db_connection(db) - return db def test_db(self): T = 1519075308.0 d = self.mktemp() os.mkdir(d) usage_db = os.path.join(d, "usage.sqlite") + db = database.get_db(usage_db) with mock.patch("time.time", return_value=T+0): - t = Transit(create_usage_tracker(blur_usage=None, log_file=None, usage_db=usage_db)) - db = self.open_db(usage_db) + t = Transit(create_usage_tracker(blur_usage=None, log_file=None, usage_db=db)) + self.assertEqual(len(t.usage._backends), 1) usage = list(t.usage._backends)[0] with mock.patch("time.time", return_value=T+1): From 7a582370a35cb10ba5a9b154275caee890fef996 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 17:47:26 -0700 Subject: [PATCH 17/30] restore 2 missing log-lines --- src/wormhole_transit_relay/server_state.py | 6 ++++++ src/wormhole_transit_relay/transit_server.py | 8 -------- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index 9f83736..60e3101 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -8,6 +8,7 @@ Attribute, implementer, ) +from twisted.python import log from .database import get_db @@ -170,6 +171,11 @@ def __init__(self, blur_usage): """ self._backends = set() self._blur_usage = blur_usage + if blur_usage: + log.msg("blurring access times to %d seconds" % self._blur_usage) +## XXX log.msg("not logging Transit connections to Twisted log") + else: + log.msg("not blurring access times") def add_backend(self, backend): """ diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index a897bd4..29e3279 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -250,14 +250,6 @@ def __init__(self, usage): self.active_connections = ActiveConnections() self.pending_requests = PendingRequests(self.active_connections) self.usage = usage - if False: - # these logs-message should be made by the usage-tracker - # .. or in the "tap" setup? - if blur_usage: - log.msg("blurring access times to %d seconds" % self._blur_usage) - log.msg("not logging Transit connections to Twisted log") - else: - log.msg("not blurring access times") self._debug_log = False self._rebooted = time.time() From e0c711ff0caf45f6ab67027e37c553d1e21a5bac Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 17:47:40 -0700 Subject: [PATCH 18/30] cleanup / dead code --- src/wormhole_transit_relay/test/test_stats.py | 2 +- .../test/test_transit_server.py | 1 - src/wormhole_transit_relay/transit_server.py | 63 ++----------------- 3 files changed, 6 insertions(+), 60 deletions(-) diff --git a/src/wormhole_transit_relay/test/test_stats.py b/src/wormhole_transit_relay/test/test_stats.py index df574d0..aeb197e 100644 --- a/src/wormhole_transit_relay/test/test_stats.py +++ b/src/wormhole_transit_relay/test/test_stats.py @@ -87,7 +87,7 @@ def test_log_blurred(self): buddy_started=125, result="happy", bytes_sent=11999, - buddy_bytes=8001, + buddy_bytes=0, ) print(log_file.getvalue()) self.assertEqual(json.loads(log_file.getvalue()), diff --git a/src/wormhole_transit_relay/test/test_transit_server.py b/src/wormhole_transit_relay/test/test_transit_server.py index e9920c0..c98490f 100644 --- a/src/wormhole_transit_relay/test/test_transit_server.py +++ b/src/wormhole_transit_relay/test/test_transit_server.py @@ -2,7 +2,6 @@ from binascii import hexlify from twisted.trial import unittest from .common import ServerBase -from .. import transit_server from ..server_state import ( MemoryUsageRecorder, blur_size, diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index 29e3279..7865c22 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -1,6 +1,6 @@ from __future__ import print_function, unicode_literals -import re, time, json -from collections import defaultdict +import re +import time from twisted.python import log from twisted.internet import protocol from twisted.protocols.basic import LineReceiver @@ -16,9 +16,6 @@ TransitServerState, PendingRequests, ActiveConnections, - UsageTracker, - DatabaseUsageRecorder, - LogFileUsageRecorder, ITransitClient, ) from zope.interface import implementer @@ -152,59 +149,9 @@ def disconnect_redundant(self): def connectionLost(self, reason): self._state.connection_lost() - - # XXX FIXME record usage - - if False: - # Record usage. There are eight cases: - # * n0: we haven't gotten a full handshake yet (empty) - # * n1: the handshake failed, not a real client (errory) - # * n2: real client disconnected before any buddy appeared (lonely) - # * n3: real client closed as redundant after buddy appears (redundant) - # * n4: real client connected first, buddy closes first (jilted) - # * n5: real client connected first, buddy close last (happy) - # * n6: real client connected last, buddy closes first (jilted) - # * n7: real client connected last, buddy closes last (happy) - - # * non-connected clients (0,1,2,3) always write a usage record - # * for connected clients, whoever disconnects first gets to write the - # usage record (5, 7). The last disconnect doesn't write a record. - - if self._mood == "empty": # 0 - assert not self._buddy - self.factory.recordUsage(self._started, "empty", 0, - total_time, None) - elif self._mood == "errory": # 1 - assert not self._buddy - self.factory.recordUsage(self._started, "errory", 0, - total_time, None) - elif self._mood == "redundant": # 3 - assert not self._buddy - self.factory.recordUsage(self._started, "redundant", 0, - total_time, None) - elif self._mood == "jilted": # 4 or 6 - # we were connected, but our buddy hung up on us. They record the - # usage event, we do not - pass - elif self._mood == "lonely": # 2 - assert not self._buddy - self.factory.recordUsage(self._started, "lonely", 0, - total_time, None) - else: # 5 or 7 - # we were connected, we hung up first. We record the event. - assert self._mood == "happy", self._mood - assert self._buddy - starts = [self._started, self._buddy._started] - total_time = finished - min(starts) - waiting_time = max(starts) - min(starts) - total_bytes = self._total_sent + self._buddy._total_sent - self.factory.recordUsage(self._started, "happy", total_bytes, - total_time, waiting_time) - - if self._buddy: - self._buddy.buddy_disconnected() - # self.factory.transitFinished(self, self._got_token, self._got_side, - # self.describeToken()) +# XXX this probably resulted in a log message we've not refactored yet +# self.factory.transitFinished(self, self._got_token, self._got_side, +# self.describeToken()) From 6fca7fc10008fd59af445584159991afc992514e Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 18:16:30 -0700 Subject: [PATCH 19/30] fix global stats-gathering / recording --- src/wormhole_transit_relay/server_state.py | 26 ++++++++- src/wormhole_transit_relay/server_tap.py | 4 +- src/wormhole_transit_relay/test/common.py | 2 +- src/wormhole_transit_relay/test/test_stats.py | 53 ++++++++++++++----- src/wormhole_transit_relay/transit_server.py | 35 ++++++------ 5 files changed, 81 insertions(+), 39 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index 60e3101..613eee3 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -118,8 +118,10 @@ def record_usage(self, started=None, total_time=None, waiting_time=None, total_b " VALUES (?,?,?,?,?)", (started, total_time, waiting_time, total_bytes, mood) ) - # XXX FIXME see comment in transit_server - #self._update_stats() + # original code did "self._update_stats()" here, thus causing + # "global" stats update on every connection update .. should + # we repeat this behavior, or really only record every + # 60-seconds with the timer? self._db.commit() @@ -227,6 +229,26 @@ def record(self, started, buddy_started, result, bytes_sent, buddy_bytes): "mood": result, }) + def update_stats(self, rebooted, updated, connected, waiting, + incomplete_bytes): + """ + Update general statistics. + """ + # in original code, this is only recorded in the database + # .. perhaps a better way to do this, but .. + for backend in self._backends: + if isinstance(backend, DatabaseUsageRecorder): + backend._db.execute("DELETE FROM `current`") + backend._db.execute( + "INSERT INTO `current`" + " (`rebooted`, `updated`, `connected`, `waiting`," + " `incomplete_bytes`)" + " VALUES (?, ?, ?, ?, ?)", + (int(rebooted), int(updated), connected, waiting, + incomplete_bytes) + ) + + def _notify_backends(self, data): """ Internal helper. Tell every backend we have about a new usage record. diff --git a/src/wormhole_transit_relay/server_tap.py b/src/wormhole_transit_relay/server_tap.py index 7f89409..704d404 100644 --- a/src/wormhole_transit_relay/server_tap.py +++ b/src/wormhole_transit_relay/server_tap.py @@ -44,8 +44,8 @@ def makeService(config, reactor=reactor): log_file=log_file, usage_db=db, ) - factory = transit_server.Transit(usage) + factory = transit_server.Transit(usage, reactor.seconds) parent = MultiService() StreamServerEndpointService(ep, factory).setServiceParent(parent) -### FIXME TODO TimerService(5*60.0, factory.timerUpdateStats).setServiceParent(parent) + TimerService(5*60.0, factory.update_stats).setServiceParent(parent) return parent diff --git a/src/wormhole_transit_relay/test/common.py b/src/wormhole_transit_relay/test/common.py index c51cb90..834caec 100644 --- a/src/wormhole_transit_relay/test/common.py +++ b/src/wormhole_transit_relay/test/common.py @@ -16,7 +16,7 @@ def _setup_relay(self, blur_usage=None, log_file=None, usage_db=None): log_file=log_file, usage_db=usage_db, ) - self._transit_server = Transit(usage) + self._transit_server = Transit(usage, lambda: 123456789.0) def new_protocol(self): protocol = self._transit_server.buildProtocol(('127.0.0.1', 0)) diff --git a/src/wormhole_transit_relay/test/test_stats.py b/src/wormhole_transit_relay/test/test_stats.py index aeb197e..20f69de 100644 --- a/src/wormhole_transit_relay/test/test_stats.py +++ b/src/wormhole_transit_relay/test/test_stats.py @@ -9,19 +9,31 @@ class DB(unittest.TestCase): def test_db(self): + T = 1519075308.0 + + class Timer: + t = T + def __call__(self): + return self.t + get_time = Timer() + d = self.mktemp() os.mkdir(d) usage_db = os.path.join(d, "usage.sqlite") db = database.get_db(usage_db) - with mock.patch("time.time", return_value=T+0): - t = Transit(create_usage_tracker(blur_usage=None, log_file=None, usage_db=db)) + t = Transit( + create_usage_tracker(blur_usage=None, log_file=None, usage_db=db), + get_time, + ) self.assertEqual(len(t.usage._backends), 1) usage = list(t.usage._backends)[0] - with mock.patch("time.time", return_value=T+1): - usage.record_usage(started=123, mood="happy", total_bytes=100, - total_time=10, waiting_time=2) + get_time.t = T + 1 + usage.record_usage(started=123, mood="happy", total_bytes=100, + total_time=10, waiting_time=2) + t.update_stats() + self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(), [dict(result="happy", started=123, total_bytes=100, total_time=10, waiting_time=2), @@ -31,9 +43,10 @@ def test_db(self): incomplete_bytes=0, waiting=0, connected=0)) - with mock.patch("time.time", return_value=T+2): - usage.record_usage(started=150, mood="errory", total_bytes=200, - total_time=11, waiting_time=3) + get_time.t = T + 2 + usage.record_usage(started=150, mood="errory", total_bytes=200, + total_time=11, waiting_time=3) + t.update_stats() self.assertEqual(db.execute("SELECT * FROM `usage`").fetchall(), [dict(result="happy", started=123, total_bytes=100, total_time=10, waiting_time=2), @@ -45,15 +58,18 @@ def test_db(self): incomplete_bytes=0, waiting=0, connected=0)) - with mock.patch("time.time", return_value=T+3): - t.timerUpdateStats() + get_time.t = T + 3 + t.update_stats() self.assertEqual(db.execute("SELECT * FROM `current`").fetchone(), dict(rebooted=T+0, updated=T+3, incomplete_bytes=0, waiting=0, connected=0)) def test_no_db(self): - t = Transit(create_usage_tracker(blur_usage=None, log_file=None, usage_db=None)) + t = Transit( + create_usage_tracker(blur_usage=None, log_file=None, usage_db=None), + lambda: 0, + ) self.assertEqual(0, len(t.usage._backends)) @@ -61,7 +77,10 @@ class LogToStdout(unittest.TestCase): def test_log(self): # emit lines of JSON to log_file, if set log_file = io.StringIO() - t = Transit(create_usage_tracker(blur_usage=None, log_file=log_file, usage_db=None)) + t = Transit( + create_usage_tracker(blur_usage=None, log_file=log_file, usage_db=None), + lambda: 0, + ) with mock.patch("time.time", return_value=133): t.usage.record( started=123, @@ -79,7 +98,10 @@ def test_log_blurred(self): # if blurring is enabled, timestamps should be rounded to the # requested amount, and sizes should be rounded up too log_file = io.StringIO() - t = Transit(create_usage_tracker(blur_usage=60, log_file=log_file, usage_db=None)) + t = Transit( + create_usage_tracker(blur_usage=60, log_file=log_file, usage_db=None), + lambda: 0, + ) with mock.patch("time.time", return_value=123 + 10): t.usage.record( @@ -96,7 +118,10 @@ def test_log_blurred(self): "mood": "happy"}) def test_do_not_log(self): - t = Transit(create_usage_tracker(blur_usage=60, log_file=None, usage_db=None)) + t = Transit( + create_usage_tracker(blur_usage=60, log_file=None, usage_db=None), + lambda: 0, + ) t.usage.record( started=123, buddy_started=124, diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index 7865c22..640e972 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -193,33 +193,28 @@ class Transit(protocol.ServerFactory): MAXTIME = 60*SECONDS protocol = TransitConnection - def __init__(self, usage): + def __init__(self, usage, get_timestamp): self.active_connections = ActiveConnections() self.pending_requests = PendingRequests(self.active_connections) self.usage = usage self._debug_log = False + self._timestamp = get_timestamp + self._rebooted = self._timestamp() - self._rebooted = time.time() - - # XXX TODO self._rebooted and the below could be in a separate - # object? or in the DatabaseUsageRecorder .. but not here - def _update_stats(self): - # current status: should be zero when idle - rebooted = self._rebooted - updated = time.time() - connected = len(self._active_connections) / 2 + def update_stats(self): # TODO: when a connection is half-closed, len(active) will be odd. a # moment later (hopefully) the other side will disconnect, but # _update_stats isn't updated until later. - waiting = len(self._pending_requests) + # "waiting" doesn't count multiple parallel connections from the same # side - incomplete_bytes = sum(tc._total_sent - for tc in self._active_connections) - self._db.execute("DELETE FROM `current`") - self._db.execute("INSERT INTO `current`" - " (`rebooted`, `updated`, `connected`, `waiting`," - " `incomplete_bytes`)" - " VALUES (?, ?, ?, ?, ?)", - (rebooted, updated, connected, waiting, - incomplete_bytes)) + self.usage.update_stats( + rebooted=self._rebooted, + updated=self._timestamp(), + connected=len(self.active_connections._connections), + waiting=len(self.pending_requests._requests), + incomplete_bytes=sum( + tc._total_sent + for tc in self.active_connections._connections + ), + ) From acf4b00021af4be960991c2c16baefe9b16cfd3a Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 18:17:09 -0700 Subject: [PATCH 20/30] pyflakes --- src/wormhole_transit_relay/server_state.py | 1 - src/wormhole_transit_relay/server_tap.py | 1 + src/wormhole_transit_relay/test/test_stats.py | 2 +- 3 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index 613eee3..d89d335 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -9,7 +9,6 @@ implementer, ) from twisted.python import log -from .database import get_db class ITransitClient(Interface): diff --git a/src/wormhole_transit_relay/server_tap.py b/src/wormhole_transit_relay/server_tap.py index 704d404..b4028d2 100644 --- a/src/wormhole_transit_relay/server_tap.py +++ b/src/wormhole_transit_relay/server_tap.py @@ -8,6 +8,7 @@ from . import transit_server from .server_state import create_usage_tracker from .increase_rlimits import increase_rlimits +from .database import get_db LONGDESC = """\ This plugin sets up a 'Transit Relay' server for magic-wormhole. This service diff --git a/src/wormhole_transit_relay/test/test_stats.py b/src/wormhole_transit_relay/test/test_stats.py index 20f69de..5dca530 100644 --- a/src/wormhole_transit_relay/test/test_stats.py +++ b/src/wormhole_transit_relay/test/test_stats.py @@ -1,5 +1,5 @@ from __future__ import print_function, unicode_literals -import os, io, json, sqlite3 +import os, io, json from unittest import mock from twisted.trial import unittest from ..transit_server import Transit From f8278b9db3da068d103fadc954aa8c9549540a17 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 20:05:27 -0700 Subject: [PATCH 21/30] log again --- src/wormhole_transit_relay/transit_server.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index 640e972..18f3b6d 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -52,6 +52,7 @@ def disconnect_partner(self): """ ITransitClient API """ + print("buddy_disconnected {}".format(self._buddy.get_token())) self._buddy._client.transport.loseConnection() self._buddy = None From fb7f0309100e9fb7c194d3dbc94f0835e9d5642d Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 20:52:14 -0700 Subject: [PATCH 22/30] guard --- src/wormhole_transit_relay/transit_server.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index 18f3b6d..4c71616 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -52,9 +52,10 @@ def disconnect_partner(self): """ ITransitClient API """ - print("buddy_disconnected {}".format(self._buddy.get_token())) - self._buddy._client.transport.loseConnection() - self._buddy = None + if self._buddy is not None: + # print("buddy_disconnected {}".format(self._buddy.get_token())) + self._buddy._client.transport.loseConnection() + self._buddy = None def describeToken(self): d = "-" From 5c62ebc3c2f2490eff274964b9d2469ef555584f Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 21:46:32 -0700 Subject: [PATCH 23/30] does this ever get called? --- src/wormhole_transit_relay/server_state.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index d89d335..b747965 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -692,11 +692,11 @@ def done(self): enter=done, outputs=[_mood_happy_if_first, _disconnect_partner, _unregister, _record_usage], ) - relaying.upon( - partner_connection_lost, - enter=done, - outputs=[_mood_happy_if_second, _disconnect, _unregister, _record_usage], - ) +# relaying.upon( +# partner_connection_lost, +# enter=done, +# outputs=[_mood_happy_if_second, _disconnect, _unregister], # no _record_usage; other side will +# ) done.upon( connection_lost, From 7792e109eada5ed759a09537c94bdf224d4fd6b3 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 21:59:47 -0700 Subject: [PATCH 24/30] re-instate log message --- src/wormhole_transit_relay/transit_server.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index 4c71616..7bd7d91 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -53,7 +53,7 @@ def disconnect_partner(self): ITransitClient API """ if self._buddy is not None: - # print("buddy_disconnected {}".format(self._buddy.get_token())) + log.msg("buddy_disconnected {}".format(self._buddy.get_token())) self._buddy._client.transport.loseConnection() self._buddy = None From 456a75701990b4a6f8264bf33452031fb7982d80 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 22:00:57 -0700 Subject: [PATCH 25/30] not sure we can hit this state at all --- src/wormhole_transit_relay/server_state.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index b747965..949176e 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -572,17 +572,6 @@ def _mood_happy_if_first(self): else: self._mood = "jilted" - @_machine.output() - def _mood_happy_if_second(self): - """ - We disconnected second so we're only happy if we also connected - second. - """ - if self._first: - self._mood = "jilted" - else: - self._mood = "happy" - def _real_register_token_for_side(self, token, side): """ A client has connected and sent a valid version 1 or version 2 @@ -692,11 +681,6 @@ def done(self): enter=done, outputs=[_mood_happy_if_first, _disconnect_partner, _unregister, _record_usage], ) -# relaying.upon( -# partner_connection_lost, -# enter=done, -# outputs=[_mood_happy_if_second, _disconnect, _unregister], # no _record_usage; other side will -# ) done.upon( connection_lost, From 0130e7551418eb49ecd38f5b7ff89cac84eabe6e Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 22:01:35 -0700 Subject: [PATCH 26/30] unused --- src/wormhole_transit_relay/server_state.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/wormhole_transit_relay/server_state.py b/src/wormhole_transit_relay/server_state.py index 949176e..a773f87 100644 --- a/src/wormhole_transit_relay/server_state.py +++ b/src/wormhole_transit_relay/server_state.py @@ -407,12 +407,6 @@ def get_token(self): d += "-" return d - def get_mood(self): - """ - :returns str: description of the current 'mood' of the connection - """ - return self._mood - @_machine.input() def connection_made(self, client): """ From ffd2d2de3e1d190cd8128b40466b270819e8b1d5 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 22:11:17 -0700 Subject: [PATCH 27/30] unused --- src/wormhole_transit_relay/test/test_transit_server.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/wormhole_transit_relay/test/test_transit_server.py b/src/wormhole_transit_relay/test/test_transit_server.py index c98490f..70f49a8 100644 --- a/src/wormhole_transit_relay/test/test_transit_server.py +++ b/src/wormhole_transit_relay/test/test_transit_server.py @@ -320,7 +320,6 @@ def setUp(self): super(Usage, self).setUp() self._usage = MemoryUsageRecorder() self._transit_server.usage.add_backend(self._usage) -## self._transit_server.usage._blur_usage = None def test_empty(self): p1 = self.new_protocol() From 9673b74fb373314a7da2f026939dd56c18c977bb Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 12 Feb 2021 22:14:46 -0700 Subject: [PATCH 28/30] dead code, correct input --- src/wormhole_transit_relay/transit_server.py | 28 ++++---------------- 1 file changed, 5 insertions(+), 23 deletions(-) diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index 7bd7d91..a9cdddd 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -112,41 +112,23 @@ def rawDataReceived(self, data): self._state.got_bytes(data) def _got_handshake(self, token, side): - self._state.please_relay_for_side(token, side) - # self._mood = "lonely" # until buddy connects + if side is not None: + self._state.please_relay_for_side(token, side) + else: + self._state.please_relay(token) self.setRawMode() - def __buddy_connected(self, them): - self._buddy = them - self._mood = "happy" - self.sendLine(b"ok") - self._sent_ok = True - # Connect the two as a producer/consumer pair. We use streaming=True, - # so this expects the IPushProducer interface, and uses - # pauseProducing() to throttle, and resumeProducing() to unthrottle. - self._buddy.transport.registerProducer(self.transport, True) - # The Transit object calls buddy_connected() on both protocols, so - # there will be two producer/consumer pairs. - - def __buddy_disconnected(self): -## if self._log_requests: -## log.msg("buddy_disconnected %s" % self.describeToken()) - self._buddy = None - self._mood = "jilted" - self.transport.loseConnection() - def disconnect_error(self): # we haven't finished the handshake, so there are no tokens tracking # us - self._mood = "errory" self.transport.loseConnection() + # XXX probably should be logged by state? if self.factory._debug_log: log.msg("transitFailed %r" % self) def disconnect_redundant(self): # this is called if a buddy connected and we were found unnecessary. # Any token-tracking cleanup will have been done before we're called. - self._mood = "redundant" self.transport.loseConnection() def connectionLost(self, reason): From 4685b9824738542db256e72b1c2cf5005b5a74fb Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 19 Feb 2021 17:17:42 -0700 Subject: [PATCH 29/30] dead code --- src/wormhole_transit_relay/transit_server.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index a9cdddd..0b23a66 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -57,16 +57,6 @@ def disconnect_partner(self): self._buddy._client.transport.loseConnection() self._buddy = None - def describeToken(self): - d = "-" - if self._got_token: - d = self._got_token[:16].decode("ascii") - if self._got_side: - d += "-" + self._got_side.decode("ascii") - else: - d += "-" - return d - def connectionMade(self): # ideally more like self._reactor.seconds() ... but Twisted # doesn't have a good way to get the reactor for a protocol @@ -136,6 +126,7 @@ def connectionLost(self, reason): # XXX this probably resulted in a log message we've not refactored yet # self.factory.transitFinished(self, self._got_token, self._got_side, # self.describeToken()) +# XXX describeToken -> self._state.get_token() From ed1025d12c7efecc2abe7eb3affafe80817e8d62 Mon Sep 17 00:00:00 2001 From: meejah Date: Fri, 19 Feb 2021 17:18:08 -0700 Subject: [PATCH 30/30] change from review: inline _got_handshake --- src/wormhole_transit_relay/transit_server.py | 26 ++++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/src/wormhole_transit_relay/transit_server.py b/src/wormhole_transit_relay/transit_server.py index 0b23a66..d9dabfd 100644 --- a/src/wormhole_transit_relay/transit_server.py +++ b/src/wormhole_transit_relay/transit_server.py @@ -74,25 +74,32 @@ def connectionMade(self): pass def lineReceived(self, line): + """ + LineReceiver API + """ # old: "please relay {64}\n" + token = None old = re.search(br"^please relay (\w{64})$", line) if old: token = old.group(1) - return self._got_handshake(token, None) + self._state.please_relay(token) # new: "please relay {64} for side {16}\n" new = re.search(br"^please relay (\w{64}) for side (\w{16})$", line) if new: token = new.group(1) side = new.group(2) - return self._got_handshake(token, side) + self._state.please_relay_for_side(token, side) - # we should have been switched to "raw data" mode on the first - # line received (after which rawDataReceived() is called for - # all bytes) so getting here means a bad handshake. - return self._state.bad_token() + if token is None: + self._state.bad_token() + else: + self.setRawMode() def rawDataReceived(self, data): + """ + LineReceiver API + """ # We are an IPushProducer to our buddy's IConsumer, so they'll # throttle us (by calling pauseProducing()) when their outbound # buffer is full (e.g. when their downstream pipe is full). In @@ -101,13 +108,6 @@ def rawDataReceived(self, data): # receiver can handle it. self._state.got_bytes(data) - def _got_handshake(self, token, side): - if side is not None: - self._state.please_relay_for_side(token, side) - else: - self._state.please_relay(token) - self.setRawMode() - def disconnect_error(self): # we haven't finished the handshake, so there are no tokens tracking # us