From b0817a04f8074388e1615ef7f00850863f7c4527 Mon Sep 17 00:00:00 2001 From: George Pantelakis Date: Thu, 4 Apr 2024 16:06:33 +0200 Subject: [PATCH] tlsfuzzer: add Hamming weight extraction and more tlsfuzzer/extract.py: Added support to create a Hamming weight measurements file and invert file. Also implemented --skip-invert flag to skip the invert nonce analysis. Finally, prevented recalculation of invert in Hamming weight invert analysis. tlsfuzzer/analysis.py: Fixed typo, status report on Skillings-Mack and fixed _bit_size_data_used to reflect the correct value and write it to report.txt --- tests/test_tlsfuzzer_analysis.py | 9 +- tests/test_tlsfuzzer_extract.py | 409 +++++++++++++++++++------------ tlsfuzzer/analysis.py | 27 +- tlsfuzzer/extract.py | 247 ++++++++++++++++--- 4 files changed, 485 insertions(+), 207 deletions(-) diff --git a/tests/test_tlsfuzzer_analysis.py b/tests/test_tlsfuzzer_analysis.py index f6ea95ecb..558f2b5be 100644 --- a/tests/test_tlsfuzzer_analysis.py +++ b/tests/test_tlsfuzzer_analysis.py @@ -998,7 +998,7 @@ def test_call_with_parametrized_smart_analysis(self): bit_size_desire_ci = 5 bit_recognition_size = 2 args = ["analysis.py", "-o", output, "--bit-size", - "--bit-size-desire-ci", bit_size_desire_ci, + "--bit-size-desired-ci", bit_size_desire_ci, "--bit-recognition-size", bit_recognition_size,] mock_init = mock.Mock() mock_init.return_value = None @@ -1797,7 +1797,6 @@ def test_figure_out_analysis_data_size(self, print_mock, calc_diff_conf_int_mock): k_sizes = [256, 255, 254, 253, 252] old_bit_recall_size = self.analysis.bit_recognition_size - self.analysis._bit_size_data_used = 1000 self.analysis.verbose = True calc_diff_conf_int_mock.return_value = { @@ -1811,24 +1810,28 @@ def test_figure_out_analysis_data_size(self, print_mock, # first test run self.analysis._bit_size_data_limit = 10000 + self.analysis._bit_size_data_used = 1000 self.analysis.bit_recognition_size = 1 self.analysis._figure_out_analysis_data_size(k_sizes) self.assertEqual(self.analysis._bit_size_data_limit, 8281000) # second test run self.analysis._bit_size_data_limit = 10000 + self.analysis._bit_size_data_used = 1000 self.analysis.bit_recognition_size = 2 self.analysis._figure_out_analysis_data_size(k_sizes) self.assertEqual(self.analysis._bit_size_data_limit, 8281000) # third test run self.analysis._bit_size_data_limit = 10000 + self.analysis._bit_size_data_used = 1000 self.analysis.bit_recognition_size = 3 self.analysis._figure_out_analysis_data_size(k_sizes) self.assertEqual(self.analysis._bit_size_data_limit, 8281000) # forth test run self.analysis._bit_size_data_limit = 10000 + self.analysis._bit_size_data_used = 1000 self.analysis.bit_recognition_size = 30 self.analysis._figure_out_analysis_data_size(k_sizes) self.assertEqual(self.analysis._bit_size_data_limit, 8281000) @@ -1836,6 +1839,7 @@ def test_figure_out_analysis_data_size(self, print_mock, # forth test run self.analysis.verbose = False self.analysis._bit_size_data_limit = 10000 + self.analysis._bit_size_data_used = 1000 self.analysis.bit_recognition_size = 4 self.analysis._figure_out_analysis_data_size(k_sizes) self.assertEqual(self.analysis._bit_size_data_limit, 8281000) @@ -1843,7 +1847,6 @@ def test_figure_out_analysis_data_size(self, print_mock, # restore of changed variables self._bit_size_data_limit = 10000 self.analysis.bit_recognition_size = old_bit_recall_size - self.analysis._bit_size_data_used = None self.assertEqual(self.analysis.output, "/tmp") diff --git a/tests/test_tlsfuzzer_extract.py b/tests/test_tlsfuzzer_extract.py index 456926b38..4ab600619 100644 --- a/tests/test_tlsfuzzer_extract.py +++ b/tests/test_tlsfuzzer_extract.py @@ -15,6 +15,7 @@ from socket import inet_aton from os.path import join, dirname, abspath import hashlib +from random import choice from tlsfuzzer.utils.log import Log @@ -836,6 +837,51 @@ def test_workers_option(self, mock_parse, mock_process, mock_write, mock_log.assert_not_called() mock_process.assert_called_once() + @mock.patch('tlsfuzzer.extract.Log') + @mock.patch('tlsfuzzer.extract.Extract._write_pkts') + @mock.patch('tlsfuzzer.extract.Extract._write_csv') + @mock.patch( + 'tlsfuzzer.extract.Extract.process_and_create_multiple_csv_files' + ) + @mock.patch('tlsfuzzer.extract.Extract.parse') + def test_skip_invert_option(self, mock_parse, mock_process, mock_write, + mock_write_pkt, mock_log): + output = "/tmp" + raw_data = "/tmp/data" + data_size = 32 + raw_sigs = "/tmp/sigs" + raw_times = "/tmp/times" + priv_key = "/tmp/key" + args = ["extract.py", + "-o", output, + "--raw-data", raw_data, + "--data-size", data_size, + "--raw-sigs", raw_sigs, + "--raw-times", raw_times, + "--priv-key-ecdsa", priv_key, + "--skip-invert"] + mock_init = mock.Mock() + mock_init.return_value = None + with mock.patch('tlsfuzzer.extract.Extract.__init__', mock_init): + with mock.patch("sys.argv", args): + main() + mock_init.assert_called_once_with( + mock.ANY, None, output, None, None, + raw_times, None, binary=None, endian="little", + no_quickack=False, delay=None, carriage_return=None, + data=raw_data, data_size=data_size, sigs=raw_sigs, + priv_key=priv_key, key_type="ecdsa", + frequency=None, hash_func=hashlib.sha256, + workers=None, verbose=False, rsa_keys=None) + mock_write.assert_not_called() + mock_write_pkt.assert_not_called() + mock_log.assert_not_called() + mock_process.assert_called_once() + + files_passes_in_process = mock_process.call_args[0][0] + for mode in files_passes_in_process.values(): + self.assertNotIn("invert", mode) + def test_specify_to_private_keys(self): args = [ "extract.py", "-o", "/tmp", "--raw-data", "/tmp/data", @@ -1334,14 +1380,34 @@ class TestMeasurementCreation(unittest.TestCase): def setUp(self): self.builtin_open = open self.times_used_write = 0 + self.times_used_write_on_hamming = 0 self.k_time_map = [] + out_dir = join(dirname(abspath(__file__)), "measurements_test_files") + raw_times = join(dirname(abspath(__file__)), + "measurements_test_files", "times.bin") + raw_sigs = join(dirname(abspath(__file__)), + "measurements_test_files", "sigs.bin") + raw_data = join(dirname(abspath(__file__)), + "measurements_test_files", "data.bin") + priv_key = join(dirname(abspath(__file__)), + "measurements_test_files", "priv_key.pem") + + self.extract = Extract( + output=out_dir, raw_times=raw_times, binary=8, + sigs=raw_sigs, data=raw_data, data_size=32, priv_key=priv_key, + key_type="ecdsa" + ) + def custom_generator(self, data): for item in data: yield item - def add_to_times_used_write (self, i): - self.times_used_write += i + def add_to_times_used_write (self, i, hamming=False): + if hamming: + self.times_used_write_on_hamming += i + else: + self.times_used_write += i def file_emulator(self, *args, **kwargs): name = args[0] @@ -1365,10 +1431,14 @@ def file_emulator(self, *args, **kwargs): ) return r + if "tmp_HWI_values.csv" in name: + r = mock.mock_open()(name, mode) + return r + if "w" in mode and "measurements" in name: r = mock.mock_open()(name, mode) r.write.side_effect = lambda s: ( - self.add_to_times_used_write(1) + self.add_to_times_used_write(1, hamming=("hamming" in name)) ) return r elif "w" in mode: @@ -1383,27 +1453,14 @@ def file_emulator(self, *args, **kwargs): def test_measurement_creation_with_verbose_and_frequency( self, mock_print, mock_file ): - out_dir = join(dirname(abspath(__file__)), "measurements_test_files") - raw_times = join(dirname(abspath(__file__)), - "measurements_test_files", "times.bin") - raw_sigs = join(dirname(abspath(__file__)), - "measurements_test_files", "sigs.bin") - raw_data = join(dirname(abspath(__file__)), - "measurements_test_files", "data.bin") - priv_key = join(dirname(abspath(__file__)), - "measurements_test_files", "priv_key.pem") + self.extract.frequency = 1 + self.extract.verbose = True mock_file.side_effect = self.file_emulator self.times_used_write = 0 - extract = Extract( - output=out_dir, raw_times=raw_times, binary=8, - sigs=raw_sigs, data=raw_data, data_size=32, priv_key=priv_key, - key_type="ecdsa", frequency=1, verbose=True - ) - - extract.process_measurements_and_create_csv_file( - extract.ecdsa_iter(), extract.ecdsa_max_value() + self.extract.process_measurements_and_create_csv_file( + self.extract.ecdsa_iter(), self.extract.ecdsa_max_value() ) self.assertGreater( @@ -1411,32 +1468,22 @@ def test_measurement_creation_with_verbose_and_frequency( "At least one measurement should have been written." ) + self.times_used_write = 0 + self.extract.frequency = None + self.extract.verbose = False + @mock.patch('__main__.__builtins__.open') - def test_measurement_creation_with_invert_k_size( + def test_measurement_creation_with_k_size_invert( self, mock_file ): - out_dir = join(dirname(abspath(__file__)), "measurements_test_files") - raw_times = join(dirname(abspath(__file__)), - "measurements_test_files", "times.bin") - raw_sigs = join(dirname(abspath(__file__)), - "measurements_test_files", "sigs.bin") - raw_data = join(dirname(abspath(__file__)), - "measurements_test_files", "data.bin") - priv_key = join(dirname(abspath(__file__)), - "measurements_test_files", "priv_key.pem") + self.extract._temp_HWI_name = "tmp_HWI_values.csv" mock_file.side_effect = self.file_emulator self.times_used_write = 0 - extract = Extract( - output=out_dir, raw_times=raw_times, binary=8, - sigs=raw_sigs, data=raw_data, data_size=32, priv_key=priv_key, - key_type="ecdsa" - ) - - extract.process_measurements_and_create_csv_file( - extract.ecdsa_iter(return_type="invert-k-size"), - extract.ecdsa_max_value() + self.extract.process_measurements_and_create_csv_file( + self.extract.ecdsa_iter(return_type="k-size-invert"), + self.extract.ecdsa_max_value() ) self.assertGreater( @@ -1444,68 +1491,95 @@ def test_measurement_creation_with_invert_k_size( "At least one measurement should have been written." ) + self.times_used_write = 0 + self.extract._temp_HWI_name = None + @mock.patch('__main__.__builtins__.open') def test_measurement_creation_with_hamming_weight( self, mock_file ): - out_dir = join(dirname(abspath(__file__)), "measurements_test_files") - raw_times = join(dirname(abspath(__file__)), - "measurements_test_files", "times.bin") - raw_sigs = join(dirname(abspath(__file__)), - "measurements_test_files", "sigs.bin") - raw_data = join(dirname(abspath(__file__)), - "measurements_test_files", "data.bin") - priv_key = join(dirname(abspath(__file__)), - "measurements_test_files", "priv_key.pem") - mock_file.side_effect = self.file_emulator self.times_used_write = 0 - extract = Extract( - output=out_dir, raw_times=raw_times, binary=8, - sigs=raw_sigs, data=raw_data, data_size=32, priv_key=priv_key, - key_type="ecdsa" + self.extract.process_measurements_and_create_hamming_csv_file( + self.extract.ecdsa_iter(return_type="hamming-weight") ) - extract.process_measurements_and_create_csv_file( - extract.ecdsa_iter(return_type="hamming-weight"), 128 + self.assertGreater( + self.times_used_write, 0, + "At least one measurement should have been written." ) + self.times_used_write = 0 + + @mock.patch('tlsfuzzer.extract.Extract._check_for_iter_left_overs') + @mock.patch('__main__.__builtins__.print') + @mock.patch('__main__.__builtins__.open') + def test_measurement_creation_with_hamming_weight_non_exact_multiple( + self, mock_file, mock_print, mock_left_overs + ): + self.extract.verbose = True + + def custom_ecdsa_iter(): + counter = 0 + even_list = [127, 128, 129] + odd_list = [125, 126, 130, 130] + + while counter < 106: + if counter % 2 == 0: + yield choice(even_list) + else: + yield choice(odd_list) + counter += 1 + + mock_file.side_effect = self.file_emulator + self.times_used_write = 0 + + self.extract.process_measurements_and_create_hamming_csv_file( + custom_ecdsa_iter()) + self.assertGreater( self.times_used_write, 0, "At least one measurement should have been written." ) + mock_print.assert_called() + + self.times_used_write = 0 + self.extract.verbose = False @mock.patch('__main__.__builtins__.open') - def test_measurement_creation_with_invalid_iter_option( + def test_measurement_creation_with_hamming_weight_invert( self, mock_file ): - out_dir = join(dirname(abspath(__file__)), "measurements_test_files") - raw_times = join(dirname(abspath(__file__)), - "measurements_test_files", "times.bin") - raw_sigs = join(dirname(abspath(__file__)), - "measurements_test_files", "sigs.bin") - raw_data = join(dirname(abspath(__file__)), - "measurements_test_files", "data.bin") - priv_key = join(dirname(abspath(__file__)), - "measurements_test_files", "priv_key.pem") - mock_file.side_effect = self.file_emulator + self.times_used_write = 0 - extract = Extract( - output=out_dir, raw_times=raw_times, binary=8, - sigs=raw_sigs, data=raw_data, data_size=32, priv_key=priv_key, - key_type="ecdsa" + self.extract.process_measurements_and_create_hamming_csv_file( + self.extract.ecdsa_iter(return_type="hamming-weight-invert") ) + self.assertGreater( + self.times_used_write, 0, + "At least one measurement should have been written." + ) + + self.times_used_write = 0 + + @mock.patch('__main__.__builtins__.open') + def test_measurement_creation_with_invalid_iter_option( + self, mock_file + ): + mock_file.side_effect = self.file_emulator + with self.assertRaises(ValueError) as e: - extract.process_measurements_and_create_csv_file( - extract.ecdsa_iter(return_type="not-an-option"), - extract.ecdsa_max_value() + self.extract.process_measurements_and_create_csv_file( + self.extract.ecdsa_iter(return_type="not-an-option"), + self.extract.ecdsa_max_value() ) self.assertIn( - "Iterator return must be k-size, invert-k-size or hamming-weight.", + "Iterator return must be k-size[-invert] " + "or hamming-weight[-invert]", str(e.exception) ) @@ -1513,144 +1587,167 @@ def test_measurement_creation_with_invalid_iter_option( def test_measurement_creation_with_wrong_hash_func( self, mock_file ): - out_dir = join(dirname(abspath(__file__)), "measurements_test_files") - raw_times = join(dirname(abspath(__file__)), - "measurements_test_files", "times.bin") - raw_sigs = join(dirname(abspath(__file__)), - "measurements_test_files", "sigs.bin") - raw_data = join(dirname(abspath(__file__)), - "measurements_test_files", "data.bin") - priv_key = join(dirname(abspath(__file__)), - "measurements_test_files", "priv_key.pem") + self.extract.hash_func = hashlib.sha384 mock_file.side_effect = self.file_emulator - extract = Extract( - output=out_dir, raw_times=raw_times, binary=8, - sigs=raw_sigs, data=raw_data, data_size=32, priv_key=priv_key, - key_type="ecdsa", hash_func=hashlib.sha384 - ) - with self.assertRaises(ValueError) as e: - extract.process_measurements_and_create_csv_file( - extract.ecdsa_iter(), extract.ecdsa_max_value() + self.extract.process_measurements_and_create_csv_file( + self.extract.ecdsa_iter(), self.extract.ecdsa_max_value() ) self.assertIn("Failed to calculate k from given signatures.", str(e.exception)) + self.extract.hash_func = hashlib.sha256 + @mock.patch('__main__.__builtins__.open') def test_measurement_creation_with_non_existing_data_file( self, mock_file ): - out_dir = join(dirname(abspath(__file__)), "measurements_test_files") - raw_times = join(dirname(abspath(__file__)), - "measurements_test_files", "times.bin") - raw_sigs = join(dirname(abspath(__file__)), - "measurements_test_files", "sigs.bin") - raw_data = join(dirname(abspath(__file__)), - "measurements_test_files", "data2.bin") - priv_key = join(dirname(abspath(__file__)), - "measurements_test_files", "priv_key.pem") + self.extract.data = self.extract.data.replace("data", "data2") mock_file.side_effect = self.file_emulator - extract = Extract( - output=out_dir, raw_times=raw_times, binary=8, - sigs=raw_sigs, data=raw_data, data_size=32, priv_key=priv_key, - key_type="ecdsa" - ) - with self.assertRaises(FileNotFoundError) as e: - extract.process_measurements_and_create_csv_file( - extract.ecdsa_iter(), extract.ecdsa_max_value() + self.extract.process_measurements_and_create_csv_file( + self.extract.ecdsa_iter(), self.extract.ecdsa_max_value() ) self.assertIn("No such file or directory", str(e.exception)) + self.extract.data = self.extract.data.replace("data2", "data") + @mock.patch('builtins.print') @mock.patch('__main__.__builtins__.open') def test_measurement_creation_with_incomplete_times( self, mock_file, mock_print ): - raw_times = join(dirname(abspath(__file__)), - "measurements_test_files", "times.bin") - raw_sigs = join(dirname(abspath(__file__)), - "measurements_test_files", "sigs.bin") - raw_data = join(dirname(abspath(__file__)), - "measurements_test_files", "data.bin") - priv_key = join(dirname(abspath(__file__)), - "measurements_test_files", "priv_key.pem") + original_output = self.extract.output + self.extract.output = "/tmp/minerva" mock_file.side_effect = self.file_emulator times = self.custom_generator( [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ] ) - extract = Extract( - output="/tmp/minerva", raw_times=raw_times, binary=8, - sigs=raw_sigs, data=raw_data, data_size=32, priv_key=priv_key, - key_type="ecdsa", verbose=True - ) - with self.assertRaises(ValueError) as e: - extract.process_measurements_and_create_csv_file( - times, extract.ecdsa_max_value() + self.extract.process_measurements_and_create_csv_file( + times, self.extract.ecdsa_max_value() ) self.assertIn("There are some extra values that are not used.", str(e.exception)) + self.extract.output = original_output + + @mock.patch('__main__.__builtins__.print') @mock.patch('__main__.__builtins__.open') def test_multiple_measurement_creation( - self, mock_file + self, mock_file, mock_print ): - out_dir = join(dirname(abspath(__file__)), "measurements_test_files") - raw_times = join(dirname(abspath(__file__)), - "measurements_test_files", "times.bin") - raw_sigs = join(dirname(abspath(__file__)), - "measurements_test_files", "sigs.bin") - raw_data = join(dirname(abspath(__file__)), - "measurements_test_files", "data.bin") - priv_key = join(dirname(abspath(__file__)), - "measurements_test_files", "priv_key.pem") + self.extract.verbose = True mock_file.side_effect = self.file_emulator self.times_used_write = 0 - extract = Extract( - output=out_dir, raw_times=raw_times, binary=8, - sigs=raw_sigs, data=raw_data, data_size=32, priv_key=priv_key, - key_type="ecdsa" + self.extract.process_and_create_multiple_csv_files({ + "measurements.csv": "k-size" + }) + + self.assertGreater( + self.times_used_write, 0, + "At least one measurement should have been written." ) + mock_print.assert_called() + + self.times_used_write = 0 + self.extract.verbose = False + + @mock.patch('__main__.__builtins__.print') + @mock.patch('__main__.__builtins__.open') + def test_multiple_measurement_creation_hamming_weight( + self, mock_file, mock_print + ): + self.extract.verbose = True - extract.process_and_create_multiple_csv_files({ - "measurements.csv": "k-size", + mock_file.side_effect = self.file_emulator + self.times_used_write = 0 + + self.extract.process_and_create_multiple_csv_files({ + "measurements-hamming.csv": "hamming-weight" }) self.assertGreater( - self.times_used_write, 0, + self.times_used_write_on_hamming, 0, "At least one measurement should have been written." ) + mock_print.assert_called() - def test_k_extractions(self): - out_dir = join(dirname(abspath(__file__)), "measurements_test_files") - raw_times = join(dirname(abspath(__file__)), - "measurements_test_files", "times.bin") - raw_sigs = join(dirname(abspath(__file__)), - "measurements_test_files", "sigs.bin") - raw_data = join(dirname(abspath(__file__)), - "measurements_test_files", "data.bin") - priv_key = join(dirname(abspath(__file__)), - "measurements_test_files", "priv_key.pem") + self.times_used_write_on_hamming = 0 + self.extract.verbose = False - extract = Extract( - output=out_dir, raw_times=raw_times, binary=8, - sigs=raw_sigs, data=raw_data, data_size=32, priv_key=priv_key, - key_type="ecdsa" + @mock.patch('__main__.__builtins__.print') + @mock.patch('tlsfuzzer.extract.remove') + @mock.patch('tlsfuzzer.extract.Extract.ecdsa_iter') + @mock.patch('__main__.__builtins__.open') + def test_multiple_measurement_creation_invert( + self, mock_file, mock_ecdsa_iter, mock_remove, mock_print + ): + def custom_ecdsa_iter(return_type): + counter = 0 + + even_list = None + odd_list = None + if return_type == "k-size-invert": + even_list = [256] + odd_list = [255, 254, 253, 252] + elif return_type == "hamming-weight-invert": + even_list = [127, 128, 129] + odd_list = [125, 126, 130, 130] + + while counter < 500: + if counter % 2 == 0: + yield choice(even_list) + else: + yield choice(odd_list) + counter += 1 + + mock_file.side_effect = self.file_emulator + mock_ecdsa_iter.side_effect = custom_ecdsa_iter + self.times_used_write = 0 + self.times_used_write_on_hamming = 0 + + self.extract.process_and_create_multiple_csv_files({ + "measurements-hamming-invert.csv": "hamming-weight-invert", + "measurements-invert.csv": "k-size-invert", + }) + + mock_print.assert_not_called() + self.assertGreater( + self.times_used_write, 0, + "At least one measurement should have been written." + ) + self.assertGreater( + self.times_used_write_on_hamming, 0, + "At least one measurement should have been written." ) - k_value = extract._ecdsa_calculate_k(( + self.extract.verbose = True + + self.extract.process_and_create_multiple_csv_files({ + "measurements-hamming-invert.csv": "hamming-weight-invert", + "measurements-invert.csv": "k-size-invert", + }) + + mock_print.assert_called() + + self.times_used_write = 0 + self.times_used_write_on_hamming = 0 + self.extract.verbose = False + + def test_k_extractions(self): + k_value = self.extract._ecdsa_calculate_k(( b'0F\x02!\x00\xbe.W"U\t9\x88\xe1o\xbbJ_\x03\x91\xf8+F\t\x08\xdc' b'\xd3\x99\x14(\x96\xe4\x8f\xb0\xc0\xcc7\x02!\x00\xbcd+\x80\xf7' b'\x19\xed\xee&\xdd!\'\xcd3\xb3\x05\xb5\x824q\x05\xcb\x95A\xe9f' diff --git a/tlsfuzzer/analysis.py b/tlsfuzzer/analysis.py index 2800efb23..511b39c30 100644 --- a/tlsfuzzer/analysis.py +++ b/tlsfuzzer/analysis.py @@ -124,7 +124,7 @@ def main(): sign_test = True bit_size_analysis = False smart_analysis = True - bit_size_desire_ci = 1e-9 + bit_size_desired_ci = 1e-9 bit_recognition_size = 4 measurements_filename = "measurements.csv" skip_sanity = False @@ -144,7 +144,7 @@ def main(): "status-newline", "bit-size", "no-smart-analysis", - "bit-size-desire-ci=", + "bit-size-desired-ci=", "bit-recognition-size=", "measurements=", "skip-sanity", @@ -189,8 +189,8 @@ def main(): hamming_weight_analysis = True elif opt == "--no-smart-analysis": smart_analysis = False - elif opt == "--bit-size-desire-ci": - bit_size_desire_ci = int(arg) * 1e-9 + elif opt == "--bit-size-desired-ci": + bit_size_desired_ci = float(arg) * 1e-9 elif opt == "--bit-recognition-size": bit_recognition_size = int(arg) elif opt == "--measurements": @@ -203,7 +203,7 @@ def main(): multithreaded_graph, verbose, clock_freq, alpha, workers, delay, carriage_return, bit_size_analysis or hamming_weight_analysis, - smart_analysis, bit_size_desire_ci, + smart_analysis, bit_size_desired_ci, bit_recognition_size, measurements_filename, skip_sanity, wilcoxon_test, t_test, sign_test) if hamming_weight_analysis: @@ -223,7 +223,7 @@ def __init__(self, output, draw_ecdf_plot=True, draw_scatter_plot=True, verbose=False, clock_frequency=None, alpha=None, workers=None, delay=None, carriage_return=None, bit_size_analysis=False, smart_bit_size_analysis=True, - bit_size_desire_ci=1e-9, bit_recognition_size=4, + bit_size_desired_ci=1e-9, bit_recognition_size=4, measurements_filename="measurements.csv", skip_sanity=False, run_wilcoxon_test=True, run_t_test=True, run_sign_test=True): self.verbose = verbose @@ -250,7 +250,7 @@ def __init__(self, output, draw_ecdf_plot=True, draw_scatter_plot=True, if bit_size_analysis and smart_bit_size_analysis: self._bit_size_data_limit = 10000 # staring amount of samples self._bit_size_data_used = None - self.bit_size_desire_ci = bit_size_desire_ci + self.bit_size_desired_ci = bit_size_desired_ci self.bit_recognition_size = \ bit_recognition_size if bit_recognition_size >= 0 else 1 else: @@ -1635,13 +1635,13 @@ def skillings_mack_test(self, name): status=status) finally: if self.verbose: + status[2].set() + progress.join() + print() print("[i] Skillings-Mack test done in {:.3}s".format( time.time() - start_time)) print("[i] Skillings-Mack p-value: {0:.6e}".format( sm_test.p_value)) - status[2].set() - progress.join() - print() finally: del data @@ -1709,7 +1709,6 @@ def _bit_size_write_summary(self, verdict, skillings_mack_pvalue): all_sign_test_values = list(self._bit_size_sign_test.values()) all_wilcoxon_values = list(self._bit_size_wilcoxon_test.values()) with open(join(self.output, "analysis_results/report.txt"), "w") as fp: - print(self._bit_size_data_used) fp.write( "Skilling-Mack test p-value: {0:.6e}\n" .format(skillings_mack_pvalue) + @@ -2094,15 +2093,17 @@ def _figure_out_analysis_data_size(self, k_sizes): print("[W] There is not enough data on recognicion size to " + "calulate desired sample size. Using all available samples.") self._bit_size_data_limit = None + self._bit_size_data_used = None return smaller_recognition_ci = min( x for x in non_zero_recognition_cis if x > 0 ) - magnitude_diff = smaller_recognition_ci / self.bit_size_desire_ci + magnitude_diff = smaller_recognition_ci / self.bit_size_desired_ci self._bit_size_data_limit = round( (magnitude_diff ** 2) * self._bit_size_data_used ) + self._bit_size_data_used = None if self.verbose: if self.bit_recognition_size == 1: @@ -2118,7 +2119,7 @@ def _figure_out_analysis_data_size(self, k_sizes): "[i] Calculated that {0:,} samples are needed for " .format(self._bit_size_data_limit) + "{0:.3}s CI in the {1} larger bit size." - .format(self.bit_size_desire_ci, size_text) + .format(self.bit_size_desired_ci, size_text) ) self.output = old_output diff --git a/tlsfuzzer/extract.py b/tlsfuzzer/extract.py index 70fc2391e..db49a49dc 100644 --- a/tlsfuzzer/extract.py +++ b/tlsfuzzer/extract.py @@ -82,13 +82,15 @@ def help_msg(): print(" --priv-key-ecdsa FILE Read the ecdsa private key from PEM file.") print(" --clock-frequency freq Assume that the times in the file are not") print(" specified in seconds but rather in clock cycles of") - print(" a clock running at requency 'freq' specified in") + print(" a clock running at frequency 'freq' specified in") print(" MHz. Use when the clock source are the raw reads") print(" from the Time Stamp Counter register or similar.") print(" --hash-func func Specifies the hash function to use for") print(" extracting the k value. The function should be") print(" available in hashlib module. The default function") print(" is sha256.") + print(" --skip-invert Skipping the creation of inverse of K value") + print(" measurement files.") print(" --rsa-keys FILE Analyse the times based on RSA private keys") print(" values. Creates separate measurements.csv files") print(" for the d, p, q, dP, dQ, and qInv values.") @@ -127,15 +129,16 @@ def main(): carriage_return = None data = None data_size = None + prehashed = False sigs = None priv_key = None key_type = None freq = None hash_func_name = None - workers = None + invert = True rsa_keys = None + workers = None verbose = False - prehashed = False argv = sys.argv[1:] @@ -148,8 +151,9 @@ def main(): "no-quickack", "status-delay=", "status-newline", "raw-data=", "data-size=", "prehashed", "raw-sigs=", "priv-key-ecdsa=", - "clock-frequency=", "hash-func=", "workers=", - "verbose", "rsa-keys="]) + "clock-frequency=", "hash-func=", + "skip-invert", "workers=", "rsa-keys=", + "verbose"]) for opt, arg in opts: if opt == '-l': logfile = arg @@ -198,6 +202,8 @@ def main(): freq = float(arg) * 1e6 elif opt == "--hash-func": hash_func_name = arg + elif opt == "--skip-invert": + invert = False elif opt == "--workers": workers = int(arg) elif opt == "--help": @@ -266,10 +272,19 @@ def main(): extract.parse() if all([raw_times, data, data_size, sigs, priv_key]): - extract.process_and_create_multiple_csv_files({ + files = { "measurements.csv": "k-size", - "measurements-invert.csv": "invert-k-size", - }) + "measurements-hamming-weight.csv": "hamming-weight" + } + + if invert: + file_list = list(files.keys()) + + for file in file_list: + invert_file_name = file.split(".")[0] + '-invert.csv' + files[invert_file_name] = "invert-" + files[file] + + extract.process_and_create_multiple_csv_files(files) if rsa_keys: extract.process_rsa_keys() @@ -362,6 +377,7 @@ def __init__(self, log=None, capture=None, output=None, ip_address=None, self._max_value = None self._fin_as_resp = fin_as_resp self.rsa_keys = rsa_keys + self._temp_HWI_name = None if data and data_size: try: @@ -851,8 +867,10 @@ def _get_data_from_csv_file(self, filename, col_name=None, column = 0 if len(columns) > 1 and col_name is None: - raise ValueError("Multiple columns in raw_times file and " - "no column name specified!") + raise ValueError( + "Multiple columns in {0} and ".format(filename) + + "no column name specified!" + ) if col_name: column = columns.index(col_name) @@ -975,8 +993,20 @@ def _convert_to_hamming_weight(self, value_iter): def _calculate_invert_k(self, value_iter): """Iterator. It will calculate the invert K.""" n_value = self.priv_key.curve.order - for value in value_iter: - yield ecdsa.ecdsa.numbertheory.inverse_mod(value, n_value) + + if self._temp_HWI_name and not exists(self._temp_HWI_name): + with open(self._temp_HWI_name, "w") as fp: + fp.write("{0}\n".format("invert_K_bit_count")) + for value in value_iter: + invert = ecdsa.ecdsa.numbertheory.inverse_mod( + value, n_value) + invert_bit_count = bit_count(invert) + fp.write("{0}\n".format(invert_bit_count)) + yield invert + else: + for value in value_iter: + invert = ecdsa.ecdsa.numbertheory.inverse_mod(value, n_value) + yield invert def ecdsa_iter(self, return_type="k-size"): """ @@ -1027,18 +1057,23 @@ def ecdsa_iter(self, return_type="k-size"): k_map_filename, col_name="k_value", convert_to_int=True ) - if return_type == "k-size": + if "invert" in return_type: + if ("hamming-weight" in return_type and self._temp_HWI_name + and exists(self._temp_HWI_name)): + return self._get_data_from_csv_file( + filename=self._temp_HWI_name + ) + else: + k_iter = self._calculate_invert_k(k_iter) + + if "k-size" in return_type: k_wrap_iter = self._convert_to_bit_size(k_iter) - elif return_type == "invert-k-size": - k_wrap_iter = self._convert_to_bit_size( - self._calculate_invert_k(k_iter) - ) - elif return_type == "hamming-weight": + elif "hamming-weight" in return_type: k_wrap_iter = self._convert_to_hamming_weight(k_iter) else: raise ValueError( "Iterator return must be " - "k-size, invert-k-size or hamming-weight." + "k-size[-invert] or hamming-weight[-invert]" ) return k_wrap_iter @@ -1149,7 +1184,7 @@ def _create_and_write_sanity_entries(self): ) if len(row) / 2 > self._max_tuple_size: - self._max_tuple_size = len(row) / 2 + self._max_tuple_size = len(row) // 2 if state != WAIT_FOR_SECOND_BARE_MAX_VALUE and len(row) == 2: if state == WAIT_FOR_NON_BARE_MAX_VALUE: @@ -1183,27 +1218,27 @@ def _create_and_write_sanity_entries(self): self._max_value, sanity_entries_count )) - def _check_for_iter_left_overs(self, iterator, desc=''): - left_overs = [] + def _check_for_iter_left_overs( + self, iterator, desc='Left-overs on iterator:'): + left_overs_counter = 0 for item in iterator: - left_overs.append(item) - if len(left_overs) > 0 and self.verbose: - if desc: - print(desc) - else: - print("Left-overs on iterator:") - for item in left_overs: + left_overs_counter += 1 + if self.verbose: print(item) + if left_overs_counter > 0: + if self.verbose: + print(desc + " {0}".format(left_overs_counter)) + raise ValueError("There are some extra values that are not used.") def process_measurements_and_create_csv_file( self, values_iter, comparing_value ): """ - Processing all the measurements from the given files and - creates a randomized measurement file with tuples associating - the max values with non max values. + Processing all the nonces and associated time measurements from the + given files and creates a randomized measurement file with tuples + associating the max values with non max values. """ self._measurements_fp = open( join(self.output, self.measurements_csv), "w" @@ -1319,26 +1354,168 @@ def process_measurements_and_create_csv_file( ) print( '[i] Biggest tuple size in file: {0}\n' - .format(int(self._max_tuple_size)) + + .format(self._max_tuple_size) + '[i] Written rows: {0:,}'.format(self._row) ) self._measurements_fp.close() + def _write_hamming_weight_line(self, row, line_to_write): + """ + Takes one or more possible Hamming weight values for each key value, + selecting one in random for each key and writes the created line into + the measurements file. + """ + measurements_dropped = 0 + + for key in line_to_write: + measurements_dropped += len(line_to_write[key]) - 1 + line_to_write[key] = choice(line_to_write[key]) + self._measurements_fp.write("{0},{1},{2}\n".format( + row, key, line_to_write[key])) + + return measurements_dropped + + def process_measurements_and_create_hamming_csv_file( + self, values_iter, items_in_tuple = 20): + """ + Processing all the nonces and associated time measurements from the + given files and creates a file with tuples associating the Hamming + weight of the nonces. + """ + self._measurements_fp = open( + join(self.output, self.measurements_csv), "w" + ) + + line_to_write = defaultdict(list) + items_buffered = 0 + row = 0 + measurements_dropped = 0 + min_tuple_size = items_in_tuple + last_tuple_size = 0 + + time_iter = self._get_time_from_file() + + if self.verbose: + print("[i] Creating {0} file...".format(self.measurements_csv)) + + progress = None + status = [0] + if self.verbose and self._total_measurements: + status = [0, self._total_measurements, Event()] + kwargs = {} + kwargs['unit'] = ' signatures' + kwargs['prefix'] = 'decimal' + kwargs['delay'] = self.delay + kwargs['end'] = self.carriage_return + progress = Thread(target=progress_report, args=(status,), + kwargs=kwargs) + progress.start() + + try: + for value, time_value in izip(values_iter, time_iter): + status[0] += 1 + + line_to_write[value].append(time_value) + items_buffered += 1 + + if items_buffered == items_in_tuple: + line_measurements_drop = self._write_hamming_weight_line( + row, line_to_write + ) + + min_tuple_size = min( + len(line_to_write.keys()), min_tuple_size) + measurements_dropped += line_measurements_drop + row += 1 + line_to_write = defaultdict(list) + items_buffered = 0 + finally: + if progress: + status[2].set() + progress.join() + print() + + if items_buffered > 0: + line_measurements_drop = self._write_hamming_weight_line( + row, line_to_write + ) + + last_tuple_size = len(line_to_write.keys()) + measurements_dropped += line_measurements_drop + row += 1 + + self._check_for_iter_left_overs(values_iter) + self._check_for_iter_left_overs(time_iter) + + if self.verbose: + if self._total_measurements: + print( + '[i] Measurements that have been dropped: {0:,} ({1:.2f}%)' + .format( + measurements_dropped, + (measurements_dropped * 100) + / self._total_measurements + ) + ) + + if last_tuple_size > 0: + print( + '[i] Smallest non-last tuple size in file: {0}\n' + .format(min_tuple_size) + + '[i] Last tuple size in file: {0}'.format(last_tuple_size) + ) + else: + print( + '[i] Smallest tuple size in file: {0}' + .format(min_tuple_size) + ) + + print('[i] Written rows: {0:,}'.format(row)) + + self._measurements_fp.close() + def process_and_create_multiple_csv_files(self, files = { "measurements.csv": "k-size" }): original_measuremments_csv = self.measurements_csv + skipped_h_weight_invert = False + h_weight_invert_file = None + h_weight_invert_mode = None if exists(join(self.output, "ecdsa-k-time-map.csv")): remove(join(self.output, "ecdsa-k-time-map.csv")) + for file, mode in files.items(): + if "hamming-weight" in mode and "invert" in mode: + skipped_h_weight_invert = True + h_weight_invert_file = file + h_weight_invert_mode = mode + self._temp_HWI_name = join(self.output, "tmp_HWI_values.csv") + for file in files: self.measurements_csv = file - self.process_measurements_and_create_csv_file( - self.ecdsa_iter(return_type=files[file]), self.ecdsa_max_value() + if "hamming-weight" in files[file]: + if "invert" in files[file]: + continue + + self.process_measurements_and_create_hamming_csv_file( + self.ecdsa_iter(return_type=files[file]) + ) + else: + self.process_measurements_and_create_csv_file( + self.ecdsa_iter(return_type=files[file]), + self.ecdsa_max_value() + ) + + if skipped_h_weight_invert: + self.measurements_csv = h_weight_invert_file + self.process_measurements_and_create_hamming_csv_file( + self.ecdsa_iter(return_type=h_weight_invert_mode) ) + remove(self._temp_HWI_name) + self._temp_HWI_name = None self.measurements_csv = original_measuremments_csv