From 5095a7e8013cfb980ee7ba0d3aabb3a742dacf31 Mon Sep 17 00:00:00 2001 From: Roman Belozerov <110590013+RomanBelozerov@users.noreply.github.com> Date: Fri, 28 Apr 2023 11:53:00 +0400 Subject: [PATCH] tcp segmentation tests (#443) * deproxy_server.py: - remove `send_response` method - it is duplicate logic; - now logic of sending response is in `initiate_send` method; - added sending response with tcp segmentation. * added running tests with TSP segmentation. Works for deproxy client/server. * added second file for disabled tests. It needed for running test with TCP segmentation. * updated tests_disabled_2.json. Removed `\r\n` from response body for some tests, because it is response splitting attack. Removed `connection` arg for `receive_request` method, because it is not used. * deproxy_server and deproxy_client: updated sending data with TCP segmentation. Now they do not send data if they received TCP RST. * tests_disabled_2.json new name - tests_disabled_tcpseg.json --- cache/test_cache.py | 5 +- cache/test_cache_methods.py | 2 +- cache/test_purge.py | 10 +- forwarding/test_forwarded_hdr.py | 3 +- framework/deproxy_client.py | 22 +- framework/deproxy_server.py | 56 ++--- http2_general/test_h2_frame.py | 32 --- nonidempotent/test_nonidempotent.py | 4 +- pipelining/test_pipelining.py | 11 +- run_config.py | 2 + run_tests.py | 28 ++- tests_disabled_tcpseg.json | 349 ++++++++++++++++++++++++++++ 12 files changed, 438 insertions(+), 86 deletions(-) create mode 100644 tests_disabled_tcpseg.json diff --git a/cache/test_cache.py b/cache/test_cache.py index a9b18c318..00d37afc7 100644 --- a/cache/test_cache.py +++ b/cache/test_cache.py @@ -9,7 +9,7 @@ from framework.deproxy_client import DeproxyClient from framework.deproxy_server import StaticDeproxyServer from helpers import checks_for_tests as checks -from helpers import tf_cfg +from helpers import deproxy, tf_cfg from helpers.control import Tempesta MIXED_CONFIG = ( @@ -71,7 +71,7 @@ def _test(self, uri: str, cache_mode: int, should_be_cached: bool, tempesta_conf + "Content-Length: 13\r\n" + "Content-Type: text/html\r\n" + "\r\n" - + "\r\n", + + "" ) client: DeproxyClient = self.get_client("deproxy") @@ -399,7 +399,6 @@ def test(self): class TestChunkedResponse(tester.TempestaTest): - backends = [ { "id": "chunked", diff --git a/cache/test_cache_methods.py b/cache/test_cache_methods.py index bf19c385f..841d91338 100644 --- a/cache/test_cache_methods.py +++ b/cache/test_cache_methods.py @@ -283,7 +283,7 @@ class TestMultipleMethods(TempestaTest): + "Last-Modified: Mon, 12 Dec 2016 13:59:39 GMT\r\n" + f"Date: {HttpMessage.date_time_string()}\r\n" + "\r\n" - + "<>/html\r\n" + + "<>/html" ), }, ] diff --git a/cache/test_purge.py b/cache/test_purge.py index 52347fdcc..cb08786b5 100644 --- a/cache/test_purge.py +++ b/cache/test_purge.py @@ -49,7 +49,7 @@ class TestPurge(TempestaTest): + "Last-Modified: Mon, 12 Dec 2016 13:59:39 GMT\r\n" + f"Date: {HttpMessage.date_time_string()}\r\n" + "\r\n" - + "\r\n" + + "" ), }, ] @@ -116,10 +116,7 @@ def test_purge(self): self.assertIn("age", client.last_response.headers) checks.check_tempesta_cache_stats( - self.get_tempesta(), - cache_hits=4, - cache_misses=4, - cl_msg_served_from_cache=4 + self.get_tempesta(), cache_hits=4, cache_misses=4, cl_msg_served_from_cache=4 ) self.assertEqual(len(self.get_server("deproxy").requests), 4) @@ -421,7 +418,7 @@ def test_purge_get_uncacheable(self): + f"Date: {HttpMessage.date_time_string()}\r\n" + "Cache-Control: private\r\n" + "\r\n" - + "\r\n", + + "" ) client.send_request( @@ -446,7 +443,6 @@ def test_purge_get_uncacheable(self): class TestPurgeGet(TempestaTest): - backends = [ # /server-1: default transfer encoding { diff --git a/forwarding/test_forwarded_hdr.py b/forwarding/test_forwarded_hdr.py index e5493699f..184797da4 100644 --- a/forwarding/test_forwarded_hdr.py +++ b/forwarding/test_forwarded_hdr.py @@ -10,7 +10,6 @@ class TestForwardedBase(tester.TempestaTest, base=True): - backends = [ { "id": "backend1", @@ -59,7 +58,7 @@ def prepare_request(self, req_param): def client_send_req(self, client, req): curr_responses = len(client.responses) client.make_requests(req) - client.wait_for_response(timeout=1) + client.wait_for_response(timeout=3) self.assertEqual(curr_responses + 1, len(client.responses)) return client.last_response diff --git a/framework/deproxy_client.py b/framework/deproxy_client.py index ef4c0f01e..9f4b96b87 100644 --- a/framework/deproxy_client.py +++ b/framework/deproxy_client.py @@ -17,6 +17,7 @@ from hpack import Encoder from hyperframe.frame import WindowUpdateFrame +import run_config from helpers import deproxy, selfproxy, stateful, tf_cfg __author__ = "Tempesta Technologies, Inc." @@ -180,6 +181,10 @@ def handle_write(self): reqs = self.request_buffers tf_cfg.dbg(4, "\tDeproxy: Client: Send request to Tempesta.") tf_cfg.dbg(5, reqs[self.cur_req_num]) + + if run_config.TCP_SEGMENTATION and self.segment_size == 0: + self.segment_size = run_config.TCP_SEGMENTATION + if self.segment_size != 0 and not self.selfproxy_present: sent = self.send(reqs[self.cur_req_num][: self.segment_size].encode()) else: @@ -606,12 +611,19 @@ def handle_write(self): reqs = self.request_buffers tf_cfg.dbg(4, "\tDeproxy: Client: Send request to Tempesta.") tf_cfg.dbg(5, reqs[self.cur_req_num]) + + if run_config.TCP_SEGMENTATION and self.segment_size == 0: + self.segment_size = run_config.TCP_SEGMENTATION + if self.segment_size != 0: - for chunk in [ - reqs[self.cur_req_num][i : (i + self.segment_size)] - for i in range(0, len(reqs[self.cur_req_num]), self.segment_size) - ]: - sent = self.send(chunk) + try: + for chunk in [ + reqs[self.cur_req_num][i : (i + self.segment_size)] + for i in range(0, len(reqs[self.cur_req_num]), self.segment_size) + ]: + sent = self.socket.send(chunk) + except BrokenPipeError as e: + tf_cfg.dbg(4, f"\tDeproxy: Client: Received error - {e}.") else: sent = self.send(reqs[self.cur_req_num]) if sent < 0: diff --git a/framework/deproxy_server.py b/framework/deproxy_server.py index d01f3d5a0..fbe63821b 100644 --- a/framework/deproxy_server.py +++ b/framework/deproxy_server.py @@ -7,6 +7,7 @@ import framework.port_checks as port_checks import framework.tester +import run_config from helpers import deproxy, error, remote, stateful, tempesta, tf_cfg from .templates import fill_template @@ -19,6 +20,7 @@ class ServerConnection(asyncore.dispatcher_with_send): def __init__(self, server, sock=None, keep_alive=None): asyncore.dispatcher_with_send.__init__(self, sock) + self.out_buffer = b"" self.server = server self.keep_alive = keep_alive self.last_segment_time = 0 @@ -31,18 +33,29 @@ def initiate_send(self): data with too small chunks of 512 bytes. However if server.segment_size is set (!=0), use this value. """ - num_sent = 0 - num_sent = asyncore.dispatcher.send( - self, - self.out_buffer[: self.server.segment_size if self.server.segment_size > 0 else 4096*2], - ) - self.out_buffer = self.out_buffer[num_sent:] + if run_config.TCP_SEGMENTATION and self.server.segment_size == 0: + self.server.segment_size = run_config.TCP_SEGMENTATION + + if self.server.segment_size: + for chunk in [ + self.out_buffer[i : (i + self.server.segment_size)] + for i in range(0, len(self.out_buffer), self.server.segment_size) + ]: + try: + self.socket.send(chunk) + except ConnectionResetError as e: + tf_cfg.dbg(4, f"\tDeproxy: Server: Connection: Received error - {e}") + break + else: + self.socket.sendall(self.out_buffer) + + self.out_buffer = b"" + self.last_segment_time = time.time() + self.responses_done += 1 - def send_pending_and_close(self): - while len(self.out_buffer): - self.initiate_send() - self.handle_close() + if self.responses_done == self.keep_alive and self.keep_alive: + self.handle_close() def writable(self): if ( @@ -52,18 +65,6 @@ def writable(self): return False return asyncore.dispatcher_with_send.writable(self) - def send_response(self, response): - if response: - tf_cfg.dbg(4, "\tDeproxy: SrvConnection: Send response.") - tf_cfg.dbg(5, response) - self.socket.sendall(response.encode()) - else: - tf_cfg.dbg(4, "\tDeproxy: SrvConnection: Don't have response") - if self.keep_alive: - self.responses_done += 1 - if self.responses_done == self.keep_alive: - self.send_pending_and_close() - def handle_error(self): _, v, _ = sys.exc_info() error.bug("\tDeproxy: SrvConnection: %s" % v) @@ -98,10 +99,13 @@ def handle_read(self): return tf_cfg.dbg(4, "\tDeproxy: SrvConnection: Receive request.") tf_cfg.dbg(5, self.request_buffer) - response, need_close = self.server.receive_request(request, self) + response, need_close = self.server.receive_request(request) self.request_buffer = "" if response: - self.send_response(response) + tf_cfg.dbg(4, "\tDeproxy: SrvConnection: Send response.") + tf_cfg.dbg(5, response) + self.out_buffer += response.encode() + self.initiate_send() if need_close: self.close() @@ -186,7 +190,7 @@ def wait_for_connections(self, timeout=1): return True @abc.abstractmethod - def receive_request(self, request, connection): + def receive_request(self, request): raise NotImplementedError("Not implemented 'receive_request()'") @@ -205,7 +209,7 @@ def run_start(self): def set_response(self, response): self.response = response - def receive_request(self, request, connection): + def receive_request(self, request): self.requests.append(request) self.last_request = request return self.response, False diff --git a/http2_general/test_h2_frame.py b/http2_general/test_h2_frame.py index 2074e3ee4..14fa168cc 100644 --- a/http2_general/test_h2_frame.py +++ b/http2_general/test_h2_frame.py @@ -59,38 +59,6 @@ def test_empty_data_frame(self): self.__assert_test(client=deproxy_cl, request_body=request_body, request_number=1) - def test_tcp_framing_for_request_headers(self): - """Client sends PRI+SETTING+HEADERS frames by 1-byte chunks.""" - client = self.get_client("deproxy") - client.segment_size = 1 - self.start_all_services() - client.parsing = False - - client.make_request(self.post_request) - - self.__assert_test(client=client, request_body="", request_number=1) - - def test_tcp_framing_for_request(self): - """Client sends request by n-byte chunks.""" - client = self.get_client("deproxy") - self.start_all_services() - client.parsing = False - - chunk_sizes = [1, 2, 3, 4, 8, 16] - for chunk_size in chunk_sizes: - with self.subTest(chunk_size=chunk_size): - client.segment_size = chunk_size - client.make_request(self.post_request, False) - - request_body = "0123456789" - client.make_request(request_body, True) - - self.__assert_test( - client=client, - request_body=request_body, - request_number=chunk_sizes.index(chunk_size) + 1, - ) - def test_settings_frame(self): """ Create tls connection and send preamble + correct settings frame. diff --git a/nonidempotent/test_nonidempotent.py b/nonidempotent/test_nonidempotent.py index 1550708e5..cb84094fc 100644 --- a/nonidempotent/test_nonidempotent.py +++ b/nonidempotent/test_nonidempotent.py @@ -91,9 +91,9 @@ class DeproxyDropServer(deproxy_server.StaticDeproxyServer): do_drop = True - def receive_request(self, request, connection): + def receive_request(self, request): uri = request.uri - r, close = deproxy_server.StaticDeproxyServer.receive_request(self, request, connection) + r, close = deproxy_server.StaticDeproxyServer.receive_request(self, request) if "/drop/" in uri and self.do_drop: self.do_drop = False return "", True diff --git a/pipelining/test_pipelining.py b/pipelining/test_pipelining.py index 8c2db2834..b68fd857e 100644 --- a/pipelining/test_pipelining.py +++ b/pipelining/test_pipelining.py @@ -10,9 +10,9 @@ class DeproxyEchoServer(deproxy_server.StaticDeproxyServer): - def receive_request(self, request, connection): + def receive_request(self, request): id = request.uri - r, close = deproxy_server.StaticDeproxyServer.receive_request(self, request, connection) + r, close = deproxy_server.StaticDeproxyServer.receive_request(self, request) resp = deproxy.Response(r) resp.body = id resp.headers["Content-Length"] = len(resp.body) @@ -32,10 +32,10 @@ def run_start(self): self.nka = 0 DeproxyEchoServer.run_start(self) - def receive_request(self, request, connection): + def receive_request(self, request): self.nka += 1 tf_cfg.dbg(5, "\trequests = %i of %i" % (self.nka, self.ka)) - r, close = DeproxyEchoServer.receive_request(self, request, connection) + r, close = DeproxyEchoServer.receive_request(self, request) if self.nka < self.ka and not close: return r, False resp = deproxy.Response(r) @@ -93,7 +93,6 @@ def build_tempesta_fault(tempesta): class PipeliningTest(tester.TempestaTest): - backends = [ { "id": "deproxy", @@ -261,7 +260,6 @@ def test_failovering(self): class PipeliningTestFI(tester.TempestaTest): - backends = [ { "id": "deproxy", @@ -386,7 +384,6 @@ def test_failovering(self): class H2MultiplexedTest(tester.TempestaTest): - backends = [ { "id": "deproxy", diff --git a/run_config.py b/run_config.py index 461302195..d3aee2b25 100644 --- a/run_config.py +++ b/run_config.py @@ -10,3 +10,5 @@ REQUESTS_COUNT = int(tf_cfg.cfg.get("General", "stress_requests_count")) DURATION = int(tf_cfg.cfg.get("General", "duration")) + +TCP_SEGMENTATION = 0 diff --git a/run_tests.py b/run_tests.py index 2772d7ce2..6750d2232 100755 --- a/run_tests.py +++ b/run_tests.py @@ -49,6 +49,16 @@ def usage(): -D, --debug-files - Don't remove generated config files -Z, --run-disabled - Run only tests from list of disabled -I, --ignore-errors - Don't exit on import/syntax errors in tests +-s, --save-tcpdump - Enable tcpdump for test. Results is saved to + file with name of test. Works with -R option. + Default path: /var/tcpdump//