diff --git a/kdcproxy/__init__.py b/kdcproxy/__init__.py index 1493b30..f61e867 100644 --- a/kdcproxy/__init__.py +++ b/kdcproxy/__init__.py @@ -60,6 +60,11 @@ def __init__(self, code, msg, headers=[]): def __str__(self): return "%d %s" % (self.code, httplib.responses[self.code]) +class SocketException(Exception): + + def __init__(self, message, sock): + super(Exception, self).__init__(message) + self.sockfno = sock.fileno() class Application: MAX_LENGTH = 128 * 1024 @@ -68,10 +73,23 @@ class Application: "udp": socket.SOCK_DGRAM, } + def addr2socktypename(self, addr): + ret = None + for name in self.SOCKTYPES: + if self.SOCKTYPES[name] == addr[1]: + ret = name + break + return ret + def __init__(self): self.__resolver = MetaResolver() def __await_reply(self, pr, rsocks, wsocks, timeout): + starting_time = time.time() + send_error = None + recv_error = None + failing_sock = None + reactivations = {} extra = 0 read_buffers = {} while (timeout + extra) > time.time(): @@ -92,6 +110,9 @@ def __await_reply(self, pr, rsocks, wsocks, timeout): pass for sock in w: + (react_n, react_t) = reactivations.get(sock, (-1, 0.0)) + if react_t > time.time(): + continue try: if self.sock_type(sock) == socket.SOCK_DGRAM: # If we proxy over UDP, remove the 4-byte length @@ -101,8 +122,13 @@ def __await_reply(self, pr, rsocks, wsocks, timeout): sock.sendall(pr.request) extra = 10 # New connections get 10 extra seconds except Exception as e: - logging.warning("Conection broken while writing (%s)", e) + send_error = e + failing_sock = sock + reactivations[sock] = (react_n + 1, + time.time() + 2.0**(react_n+1) / 10) continue + if sock in reactivations: + del reactivations[sock] rsocks.append(sock) wsocks.remove(sock) @@ -110,7 +136,8 @@ def __await_reply(self, pr, rsocks, wsocks, timeout): try: reply = self.__handle_recv(sock, read_buffers) except Exception as e: - logging.warning("Connection broken while reading (%s)", e) + recv_error = e + failing_sock = sock if self.sock_type(sock) == socket.SOCK_STREAM: # Remove broken TCP socket from readers rsocks.remove(sock) @@ -118,6 +145,21 @@ def __await_reply(self, pr, rsocks, wsocks, timeout): if reply is not None: return reply + if reactivations: + raise SocketException("Timeout while sending packets after %.2fs " + "and %d tries: %s" % ( + (timeout + extra) - starting_time, + sum(map(lambda r: r[0], + reactivations.values())), + send_error), + failing_sock) + elif recv_error is not None: + raise SocketException("Timeout while receiving packets after " + "%.2fs: %s" % ( + (timeout + extra) - starting_time, + recv_error), + failing_sock) + return None def __handle_recv(self, sock, read_buffers): @@ -215,6 +257,7 @@ def __call__(self, env, start_response): reply = None wsocks = [] rsocks = [] + sockfno2addr = {} for server in map(urlparse.urlparse, servers): # Enforce valid, supported URIs scheme = server.scheme.lower().split("+", 1) @@ -261,6 +304,7 @@ def __call__(self, env, start_response): continue except io.BlockingIOError: pass + sockfno2addr[sock.fileno()] = addr wsocks.append(sock) # Resend packets to UDP servers @@ -271,7 +315,15 @@ def __call__(self, env, start_response): # Call select() timeout = time.time() + (15 if addr is None else 2) - reply = self.__await_reply(pr, rsocks, wsocks, timeout) + try: + reply = self.__await_reply(pr, rsocks, wsocks, timeout) + except SocketException as e: + fail_addr = sockfno2addr[e.sockfno] + fail_socktype = self.addr2socktypename(fail_addr) + fail_ip = fail_addr[4][0] + fail_port = fail_addr[4][1] + logging.warning("Exchange with %s:[%s]:%d failed: %s", + fail_socktype, fail_ip, fail_port, e) if reply is not None: break