diff --git a/tests/test_tlsfuzzer_analysis.py b/tests/test_tlsfuzzer_analysis.py index 558f2b5be..ed3f34c0c 100644 --- a/tests/test_tlsfuzzer_analysis.py +++ b/tests/test_tlsfuzzer_analysis.py @@ -13,6 +13,7 @@ import sys import os import tempfile +from collections import defaultdict failed_import = False try: @@ -362,6 +363,51 @@ def test_report_bit_size(self, mock_skilling_mack, mock_bit_sizes, mock_write_summary.assert_called() self.assertEqual(ret, 0) + with mock.patch("tlsfuzzer.analysis.os.path.exists") as mock_exists: + with mock.patch("__main__.__builtins__.open") as mock_open: + mock_open.side_effect = mock.mock_open(read_data="2000,2") + mock_exists.return_value = True + analysis._total_bit_size_data = 0 + + ret = analysis.generate_report(bit_size=True) + + self.assertEqual(ret, 0) + self.assertEqual(analysis._total_bit_size_data, 2000) + + @mock.patch("tlsfuzzer.analysis.Analysis.analyse_hamming_weights") + @mock.patch("__main__.__builtins__.open") + def test_report_hamming_weights(self, mock_open, mock_hamming_weights): + report_text = "testing_hamming_weight_report" + self.writen_text = "" + mock_hamming_weights.return_value = 0 + + def add_to_written(x): + self.writen_text += x + + def file_selector(*args, **kwargs): + file_name = args[0] + try: + mode = args[1] + except IndexError: + mode = "r" + + r = mock.mock_open()(file_name, mode) + + if "w" in mode: + r.write.side_effect = lambda s: (add_to_written(s)) + + return r + + mock_open.side_effect = file_selector + + analysis = Analysis("/tmp", bit_size_analysis=True) + analysis._hamming_weight_report = report_text + ret = analysis.generate_report(hamming_weight=True) + + mock_hamming_weights.assert_called_once_with() + self.assertEqual(ret, 0) + self.assertEqual(self.writen_text, report_text) + def test_setting_alpha(self): with mock.patch( "tlsfuzzer.analysis.Analysis.load_data", self.mock_read_csv @@ -551,6 +597,50 @@ def test__desc_stats(self): 'IQR': 0.45029303825, 'MAD': 0.250156351}) + @mock.patch("tlsfuzzer.analysis.Analysis.load_data") + @mock.patch("builtins.open") + def test__write_summary(self, mock_open, mock_load_data): + mock_open.side_effect = mock.mock_open() + + fake_conf_ints = { + 'mean': (0, 0, 0), + 'median': (0, 0, 0), + 'trim_mean_05': (0, 0, 0), + 'trim_mean_25': (0, 0, 0), + 'trim_mean_45': (0, 0, 0), + 'trimean': (0, 0, 0) + } + + tests = [ + (None, (0, 0, 0), 0, "Definite side-channel detected"), + (1e-10, (0, 0, 0), 1, "Definite side-channel detected"), + (1e-6, (0, 0, 0), 1, "Results suggesting side-channel found"), + (1, (0, 0, 0), 0, "ERROR"), + (1, (1e-11, 0, 2e-11), 0, "Implementation verified as not"), + (1, (1e-10, 0, 6e-10), 0, "Implementation most likely not"), + (1, (1e-3, 0, 2e-3), 0, "Large confidence intervals detected"), + (1, (1, 0, 2), 0, "Very large confidence intervals detected"), + ] + + analysis = Analysis("/tmp") + analysis.class_names = {"0":"0", "1":"1"} + analysis.alpha = 1e-5 + + for test in tests: + with mock.patch("builtins.print") as mock_print: + fake_conf_ints['mean'] = test[1] + difference = analysis._write_summary( + 0, None, [1e-4, 1e-10, 1e-5, 1], ["0", "1"], + test[0], fake_conf_ints + ) + self.assertEqual(difference, test[2]) + for i in mock_print.mock_calls: + if test[3] in str(i): + break + else: + self.assertTrue(False) + + @unittest.skipIf(failed_import, "Could not import analysis. Skipping related tests.") @@ -1024,18 +1114,14 @@ def test_call_with_Hamming_weight(self): 'tlsfuzzer.analysis.Analysis.generate_report' ) as mock_report: with mock.patch('tlsfuzzer.analysis.Analysis.__init__', mock_init): - with mock.patch( - 'tlsfuzzer.analysis.Analysis.analyse_hamming_weights' - ) as mock_hamming: - with mock.patch("sys.argv", args): - main() - mock_init.assert_called_once_with( - output, True, True, True, False, False, None, None, - None, None, None, True, True, - 1e-9, 4, - 'measurements.csv', False, True, True, True) - mock_report.assert_not_called() - mock_hamming.assert_called_once_with() + with mock.patch("sys.argv", args): + main() + mock_init.assert_called_once_with( + output, True, True, True, False, False, None, None, + None, None, None, True, True, 1e-9, 4, + 'measurements.csv', False, True, True, True) + mock_report.assert_called_once_with( + bit_size=False, hamming_weight=True) def test_call_Hamming_weight_with_minimal_analysis(self): output = "/tmp" @@ -1047,18 +1133,14 @@ def test_call_Hamming_weight_with_minimal_analysis(self): 'tlsfuzzer.analysis.Analysis.generate_report' ) as mock_report: with mock.patch('tlsfuzzer.analysis.Analysis.__init__', mock_init): - with mock.patch( - 'tlsfuzzer.analysis.Analysis.analyse_hamming_weights' - ) as mock_hamming: - with mock.patch("sys.argv", args): - main() - mock_init.assert_called_once_with( - output, True, True, True, False, False, None, None, - None, None, None, True, True, - 1e-9, 4, - 'measurements.csv', False, False, False, False) - mock_report.assert_not_called() - mock_hamming.assert_called_once_with() + with mock.patch("sys.argv", args): + main() + mock_init.assert_called_once_with( + output, True, True, True, False, False, None, None, + None, None, None, True, True, 1e-9, 4, + 'measurements.csv', False, False, False, False) + mock_report.assert_called_once_with( + bit_size=False, hamming_weight=True) def test_help(self): args = ["analysis.py", "--help"] @@ -1358,6 +1440,76 @@ class dotDict(dict): wilcoxon_test_mock.assert_called() calc_values_mock.assert_called() + @mock.patch("tlsfuzzer.analysis.Analysis._figure_out_analysis_data_size") + @mock.patch("tlsfuzzer.analysis.Analysis._calc_exact_values") + @mock.patch("tlsfuzzer.analysis.Analysis.conf_plot_for_all_k") + @mock.patch("tlsfuzzer.analysis.Analysis.graph_worst_pair") + @mock.patch("tlsfuzzer.analysis.Analysis.diff_scatter_plot") + @mock.patch("tlsfuzzer.analysis.Analysis.diff_ecdf_plot") + @mock.patch("tlsfuzzer.analysis.Analysis.conf_interval_plot") + @mock.patch("tlsfuzzer.analysis.Analysis.wilcoxon_test") + @mock.patch("tlsfuzzer.analysis.Analysis.rel_t_test") + @mock.patch("tlsfuzzer.analysis.Analysis.load_data") + @mock.patch("tlsfuzzer.analysis.Analysis.create_k_specific_dirs") + @mock.patch("tlsfuzzer.analysis.shutil.rmtree") + @mock.patch("builtins.open") + def test_bit_size_measurement_analysis_main_2_samples(self, open_mock, + rmtree_mock, dir_creation_mock, load_data_mock, rel_t_test_mock, + wilcoxon_test_mock, interval_plot_mock, ecdf_plot_mock, + scatter_plot_mock, worst_pair_mock, conf_plot_mock, + calc_values_mock, figure_out_mock): + + def file_selector(*args, **kwargs): + file_name = args[0] + try: + mode = args[1] + except IndexError: + mode = "r" + + if "w" in mode: + return mock.mock_open()(file_name, mode) + + if "timing.csv" in file_name: + k_size = file_name.split("/")[-2] + return mock.mock_open( + read_data="256,{0}".format(k_size) + + ("\n0.5,0.4\n0.4,0.5") + )(file_name, mode) + + return mock.mock_open( + read_data="0,256,3\n0,255,102\n0,254,103\n1,256,4\n" + + "1,254,104\n1,253,105" + )(file_name, mode) + + open_mock.side_effect = file_selector + dir_creation_mock.return_value = [256, 255, 254, 253] + + class dotDict(dict): + __getattr__ = dict.__getitem__ + + binomtest_mock = mock.Mock() + + calc_values_mock.return_value = { + "mean": 0.5, "median": 0.5, "trim_mean_05": 0.5, + "trim_mean_25": 0.5, "trim_mean_45": 0.5, "trimean": 0.5 + } + + try: + with mock.patch( + "tlsfuzzer.analysis.stats.binomtest", binomtest_mock + ): + self.analysis.analyze_bit_sizes() + except AttributeError: + with mock.patch( + "tlsfuzzer.analysis.stats.binom_test", binomtest_mock + ): + self.analysis.analyze_bit_sizes() + + binomtest_mock.assert_not_called() + rel_t_test_mock.assert_not_called() + wilcoxon_test_mock.assert_not_called() + calc_values_mock.assert_called() + @mock.patch("tlsfuzzer.analysis.Analysis._figure_out_analysis_data_size") @mock.patch("tlsfuzzer.analysis.Analysis._calc_exact_values") @mock.patch("tlsfuzzer.analysis.Analysis.conf_plot_for_all_k") @@ -1421,23 +1573,52 @@ class dotDict(dict): "trim_mean_25": 0.5, "trim_mean_45": 0.5, "trimean": 0.5 } + old_alpha = self.analysis.alpha + self.analysis.alpha = 10 + try: with mock.patch( "tlsfuzzer.analysis.stats.binomtest", binomtest_mock ): binomtest_mock.return_value = dotDict(binomtest_result) - self.analysis.analyze_bit_sizes() + ret_val = self.analysis.analyze_bit_sizes() except AttributeError: with mock.patch( "tlsfuzzer.analysis.stats.binom_test", binomtest_mock ): binomtest_mock.return_value = binomtest_result["pvalue"] - self.analysis.analyze_bit_sizes() + ret_val = self.analysis.analyze_bit_sizes() + + self.analysis.alpha = old_alpha binomtest_mock.assert_called() rel_t_test_mock.assert_called() wilcoxon_test_mock.assert_called() calc_values_mock.assert_called() + self.assertEqual(ret_val, 1) + + with mock.patch("builtins.print"): + self.analysis.verbose = True + old_bit_size_data_used = self.analysis._bit_size_data_used + self.analysis._bit_size_data_used = 1000 + old_total_bit_size_data_used = self.analysis._bit_size_data_used + + try: + with mock.patch( + "tlsfuzzer.analysis.stats.binomtest", binomtest_mock + ): + binomtest_mock.return_value = dotDict(binomtest_result) + self.analysis.analyze_bit_sizes() + except AttributeError: + with mock.patch( + "tlsfuzzer.analysis.stats.binom_test", binomtest_mock + ): + binomtest_mock.return_value = binomtest_result["pvalue"] + self.analysis.analyze_bit_sizes() + + self.analysis.verbose = False + self.analysis._bit_size_data_used = old_bit_size_data_used + self.analysis._bit_size_data_used = old_total_bit_size_data_used @mock.patch("tlsfuzzer.analysis.Analysis._figure_out_analysis_data_size") @mock.patch("tlsfuzzer.analysis.Analysis._calc_exact_values") @@ -1498,6 +1679,7 @@ class dotDict(dict): } self.analysis.verbose = True + self.analysis.smart_bit_size_analysis = False try: with mock.patch( @@ -1513,6 +1695,7 @@ class dotDict(dict): self.analysis.analyze_bit_sizes() self.analysis.verbose = False + self.analysis.smart_bit_size_analysis = True binomtest_mock.assert_called() rel_t_test_mock.assert_called() @@ -1558,6 +1741,7 @@ def file_selector(*args, **kwargs): @mock.patch("builtins.open") def test_bit_size_measurement_analysis_create_k_dirs(self, open_mock, makedirs_mock, thread_start_mock, thread_join_mock): + self.k_by_size = defaultdict(list) def file_selector(*args, **kwargs): file_name = args[0] @@ -1566,36 +1750,53 @@ def file_selector(*args, **kwargs): except IndexError: mode = "r" - if "w" in mode: - return mock.mock_open()(file_name, mode) + if type(file_name) == int: + return self.builtin_open(*args, **kwargs) + + if "k-by-size" in file_name: + if "w" in mode: + r = mock.mock_open()(*args, **kwargs) + r.write.side_effect = lambda s: ( + self.k_by_size[file_name].append(s) + ) + return r + else: + print(self.k_by_size[file_name]) + print() + return mock.mock_open( + read_data = "".join(self.k_by_size[file_name]) + )(*args, **kwargs) return mock.mock_open( - read_data= "0,256,1\n0,255,102\n0,254,103\n0,256,2\n" + - "1,256,3\n1,254,104\n1,253,105" - )(file_name, mode) + read_data="0,256,1\n0,255,102\n0,254,103\n0,256,2\n" + + "1,256,3\n1,254,104\n1,253,105" + )(*args, **kwargs) open_mock.side_effect = file_selector ret_value = self.analysis.create_k_specific_dirs() self.assertEqual(ret_value, [256, 255, 254, 253]) + self.k_by_size.clear() self.analysis.clock_frequency = 10000000 ret_value = self.analysis.create_k_specific_dirs() self.assertEqual(ret_value, [256, 255, 254, 253]) self.analysis.clock_frequency = None + self.k_by_size.clear() self.analysis.skip_sanity = True ret_value = self.analysis.create_k_specific_dirs() self.analysis.skip_sanity = False self.assertEqual(ret_value, [255, 254, 253]) with mock.patch("builtins.print"): + self.k_by_size.clear() self.analysis.verbose = True ret_value = self.analysis.create_k_specific_dirs() self.analysis.verbose = False @mock.patch("builtins.open") - def test_check_data_for_rel_t_test_all_zeros(self, open_mock): + def test_check_data_for_zero_all_zeros(self, open_mock): def file_selector(*args, **kwargs): file_name = args[0] @@ -1610,12 +1811,12 @@ def file_selector(*args, **kwargs): open_mock.side_effect = file_selector - ret_value = self.analysis._check_data_for_rel_t_test() + ret_value = self.analysis._check_data_for_zero() self.assertEqual(ret_value, False) @mock.patch("builtins.open") - def test_check_data_for_rel_t_test_two_non_zero(self, open_mock): + def test_check_data_for_zero_two_non_zero(self, open_mock): def file_selector(*args, **kwargs): file_name = args[0] @@ -1630,12 +1831,12 @@ def file_selector(*args, **kwargs): open_mock.side_effect = file_selector - ret_value = self.analysis._check_data_for_rel_t_test() + ret_value = self.analysis._check_data_for_zero() self.assertEqual(ret_value, False) @mock.patch("builtins.open") - def test_check_data_for_rel_t_test_five_non_zero(self, open_mock): + def test_check_data_for_zero_five_non_zero(self, open_mock): def file_selector(*args, **kwargs): file_name = args[0] @@ -1650,7 +1851,7 @@ def file_selector(*args, **kwargs): open_mock.side_effect = file_selector - ret_value = self.analysis._check_data_for_rel_t_test() + ret_value = self.analysis._check_data_for_zero() self.assertEqual(ret_value, True) @@ -1734,9 +1935,18 @@ def test_bit_size_come_to_verdict(self): difference, verdict = \ self.analysis._bit_size_come_to_verdict(test[2], test[0]) - self.assertEqual(difference, test[3]) + self.assertEqual(test[3], difference) self.assertIn(test[4], verdict) + # Final test with no k-sizes in it + self.analysis._bit_size_bootstraping = {} + difference, verdict = \ + self.analysis._bit_size_come_to_verdict(0, 1) + self.assertEqual(2, difference) + self.assertIn("Not enough", verdict) + + self.analysis._bit_size_bootstraping = None + @mock.patch("builtins.open") def test_bit_size_write_summary(self, open_mock): _summary = [] @@ -1768,28 +1978,31 @@ def file_selector(*args, **kwargs): } self.analysis._bit_size_sign_test = {255: 0.3, 254: 0.7, 253: 0.4} self.analysis._bit_size_wilcoxon_test = {255: 0.2, 254: 0.8, 253: 0.6} - self.analysis._bit_size_data_used = 1000 + self.analysis._total_bit_size_data = 100000 + self.analysis._total_bit_size_data_used = 10000 self.analysis._bit_size_write_summary("passed", 0.5) self.assertEqual( - _summary[0], "Skilling-Mack test p-value: 5.000000e-01" + _summary[2], "Skilling-Mack test p-value: 5.000000e-01" ) self.assertEqual( - _summary[1], + _summary[3], "Sign test p-values (min, average, max): " + "3.00e-01, 4.67e-01, 7.00e-01" ) self.assertEqual( - _summary[2], + _summary[4], "Wilcoxon test p-values (min, average, max): " + "2.00e-01, 5.33e-01, 8.00e-01" ) self.assertEqual( - _summary[3], - "Used 1,000 data observations for results" + _summary[5], + "Used 10,000 out of 100,000 available " + + "data observations for results." ) - self.assertEqual(_summary[4], "passed") + self.assertEqual(_summary[6], "passed") + @mock.patch("tlsfuzzer.analysis.Analysis.calc_diff_conf_int") @mock.patch("builtins.print") @@ -1797,52 +2010,46 @@ def test_figure_out_analysis_data_size(self, print_mock, calc_diff_conf_int_mock): k_sizes = [256, 255, 254, 253, 252] old_bit_recall_size = self.analysis.bit_recognition_size - self.analysis.verbose = True - - calc_diff_conf_int_mock.return_value = { - 'mean': (-5.0e-07, -9.9e-08, 2.5e-07), - 'median': (-8.0e-08, -3.8e-08, 1.1e-08), - 'trim_mean_05': (-1.6e-07, 1.5e-08, 2.1e-07), - 'trim_mean_25': (-6.9e-08, -2.4e-08, 2.5e-08), - 'trim_mean_45': (-7.9e-08, -3.5e-08, 1.3e-08), - 'trimean': (-7.9e-08, -2.2e-08, 3.86e-08) - } - # first test run + def custom_calc_conf_int(pair): + self.analysis._bit_size_data_used = 1000 + if self._all_cis_zeros: + return { + 'mean': (0, 0, 0), + 'median': (0, 0, 0), + 'trim_mean_05': (0, 0, 0), + 'trim_mean_25': (0, 0, 0), + 'trim_mean_45': (0, 0, 0), + 'trimean': (0, 0, 0) + } + else: + return { + 'mean': (-5.0e-07, -9.9e-08, 2.5e-07), + 'median': (-8.0e-08, -3.8e-08, 1.1e-08), + 'trim_mean_05': (-1.6e-07, 1.5e-08, 2.1e-07), + 'trim_mean_25': (-6.9e-08, -2.4e-08, 2.5e-08), + 'trim_mean_45': (-7.9e-08, -3.5e-08, 1.3e-08), + 'trimean': (-7.9e-08, -2.2e-08, 3.86e-08) + } + + calc_diff_conf_int_mock.side_effect = custom_calc_conf_int + + self._all_cis_zeros = True self.analysis._bit_size_data_limit = 10000 - self.analysis._bit_size_data_used = 1000 self.analysis.bit_recognition_size = 1 self.analysis._figure_out_analysis_data_size(k_sizes) - self.assertEqual(self.analysis._bit_size_data_limit, 8281000) - - # second test run - self.analysis._bit_size_data_limit = 10000 - self.analysis._bit_size_data_used = 1000 - self.analysis.bit_recognition_size = 2 - self.analysis._figure_out_analysis_data_size(k_sizes) - self.assertEqual(self.analysis._bit_size_data_limit, 8281000) - - # third test run - self.analysis._bit_size_data_limit = 10000 - self.analysis._bit_size_data_used = 1000 - self.analysis.bit_recognition_size = 3 - self.analysis._figure_out_analysis_data_size(k_sizes) - self.assertEqual(self.analysis._bit_size_data_limit, 8281000) - - # forth test run - self.analysis._bit_size_data_limit = 10000 - self.analysis._bit_size_data_used = 1000 - self.analysis.bit_recognition_size = 30 - self.analysis._figure_out_analysis_data_size(k_sizes) - self.assertEqual(self.analysis._bit_size_data_limit, 8281000) + print_mock.assert_called_once_with( + "[W] There is not enough data on recognition size to " + + "calculate desired sample size. Using all available samples." + ) - # forth test run - self.analysis.verbose = False - self.analysis._bit_size_data_limit = 10000 - self.analysis._bit_size_data_used = 1000 - self.analysis.bit_recognition_size = 4 - self.analysis._figure_out_analysis_data_size(k_sizes) - self.assertEqual(self.analysis._bit_size_data_limit, 8281000) + self._all_cis_zeros = False + for size in [1, 2, 3, 30, 4]: + self.analysis.verbose = not (size == 30) + self.analysis._bit_size_data_limit = 10000 + self.analysis.bit_recognition_size = size + self.analysis._figure_out_analysis_data_size(k_sizes) + self.assertEqual(self.analysis._bit_size_data_limit, 8281000) # restore of changed variables self._bit_size_data_limit = 10000 @@ -1870,7 +2077,7 @@ def test_hamming_analysis_negative(self, mock_print): data.write("{0},{1},{2}\n".format(tuple_num, i, j)) analysis = Analysis( - tmpdirname, verbose=False, draw_conf_interval_plot=False, + tmpdirname, verbose=True, draw_conf_interval_plot=False, bit_size_analysis=True, run_wilcoxon_test=False, run_t_test=False, run_sign_test=False, draw_ecdf_plot=False ) @@ -1883,7 +2090,8 @@ def test_hamming_analysis_negative(self, mock_print): self.assertEqual(ret, 0) self.assertIn( - mock.call('Sample large enough to detect 1 ns difference: True'), + mock.call( + '[i] Sample large enough to detect 1 ns difference: True'), mock_print.mock_calls ) for i in mock_print.mock_calls: @@ -1910,7 +2118,7 @@ def test_hamming_analysis_positive(self, mock_print): data.write("{0},{1},{2}\n".format(tuple_num, i, j)) analysis = Analysis( - tmpdirname, verbose=False, draw_conf_interval_plot=False, + tmpdirname, verbose=True, draw_conf_interval_plot=False, bit_size_analysis=True, run_wilcoxon_test=False, run_t_test=False, run_sign_test=False, draw_ecdf_plot=False ) @@ -1923,11 +2131,13 @@ def test_hamming_analysis_positive(self, mock_print): self.assertEqual(ret, 1) self.assertNotIn( - mock.call('Sample large enough to detect 1 ns difference: True'), + mock.call( + '[i] Sample large enough to detect 1 ns difference: True'), mock_print.mock_calls ) self.assertNotIn( - mock.call('Sample large enough to detect 1 ns difference: False'), + mock.call( + '[i] Sample large enough to detect 1 ns difference: False'), mock_print.mock_calls ) for i in mock_print.mock_calls: @@ -1980,7 +2190,8 @@ def test_hamming_analysis_quick( self.assertEqual(ret, 0) self.assertIn( - mock.call('Sample large enough to detect 1 ns difference: True'), + mock.call( + '[i] Sample large enough to detect 1 ns difference: True'), mock_print.mock_calls ) for i in mock_print.mock_calls: @@ -1988,3 +2199,29 @@ def test_hamming_analysis_quick( break else: self.assertFalse(True) + + def test_hamming_analysis_not_verbose(self): + with tempfile.TemporaryDirectory() as tmpdirname: + with open(os.path.join(tmpdirname, "measurements.csv"), "w") \ + as data: + for tuple_num in range(100): + groups = sorted(list(set( + np.random.binomial(256, 0.5, 20) + ))) + values = np.random.normal(1e-3, 1e-10, size=len(groups)) + + for i, j in zip(groups, values): + data.write("{0},{1},{2}\n".format(tuple_num, i, j)) + + analysis = Analysis( + tmpdirname, verbose=False, draw_conf_interval_plot=False, + bit_size_analysis=True, run_wilcoxon_test=False, + run_t_test=False, run_sign_test=True, draw_ecdf_plot=False + ) + + ret = analysis.analyse_hamming_weights() + + self.assertLess(1e-6, analysis.skillings_mack_test( + os.path.join(tmpdirname, "measurements.bin"))) + + self.assertEqual(ret, 0) \ No newline at end of file diff --git a/tlsfuzzer/analysis.py b/tlsfuzzer/analysis.py index fffb05ad1..12aa43732 100644 --- a/tlsfuzzer/analysis.py +++ b/tlsfuzzer/analysis.py @@ -41,7 +41,7 @@ mpl.use('Agg') -VERSION = 6 +VERSION = 8 _diffs = None @@ -206,10 +206,12 @@ def main(): smart_analysis, bit_size_desired_ci, bit_recognition_size, measurements_filename, skip_sanity, wilcoxon_test, t_test, sign_test) - if hamming_weight_analysis: - ret = analysis.analyse_hamming_weights() - else: - ret = analysis.generate_report(bit_size=bit_size_analysis) + + ret = analysis.generate_report( + bit_size=bit_size_analysis, + hamming_weight=hamming_weight_analysis + ) + return ret else: raise ValueError("Missing -o option!") @@ -250,12 +252,16 @@ def __init__(self, output, draw_ecdf_plot=True, draw_scatter_plot=True, if bit_size_analysis and smart_bit_size_analysis: self._bit_size_data_limit = 10000 # staring amount of samples self._bit_size_data_used = None + self._total_bit_size_data = 0 + self._total_bit_size_data_used = 0 self.bit_size_desired_ci = bit_size_desired_ci self.bit_recognition_size = \ bit_recognition_size if bit_recognition_size >= 0 else 1 else: self._bit_size_data_limit = None self._bit_size_data_used = None + self._total_bit_size_data = 0 + self._total_bit_size_data_used = 0 if not bit_size_analysis: data = self.load_data() @@ -264,6 +270,7 @@ def __init__(self, output, draw_ecdf_plot=True, draw_scatter_plot=True, self._bit_size_sign_test = {} self._bit_size_wilcoxon_test = {} self._bit_size_bootstraping = {} + self._hamming_weight_report = "" self._bit_size_methods = { "mean": "Mean", @@ -372,12 +379,16 @@ def load_data(self): data = pd.DataFrame(timing_bin, columns=columns, copy=False) if self._bit_size_data_limit: + len_data = len(data) if not self._bit_size_data_used: - len_data = len(data) self._bit_size_data_used = min( len_data, self._bit_size_data_limit ) - data = data.iloc[:self._bit_size_data_limit] + start = 0 + data_diff = len_data - self._bit_size_data_limit + if data_diff > 0: + start = np.random.randint(0, data_diff) + data = data.iloc[start:start + self._bit_size_data_limit] else: if not self._bit_size_data_used: self._bit_size_data_used = len(data) @@ -1436,34 +1447,38 @@ def _write_summary(self, difference, p_vals, sign_p_vals, worst_pair, ["mean", "median", "trim_mean_05", "trim_mean_25", "trim_mean_45"]) if max(small_cis) == 0: - print("WARNING: all 95% CIs are equal 0. Too small sammple" - " or too low clock resolution for the measurement.") - # when measuring values below clock frequency - # or very small pieces of code with high resolution clock - # it may cause the 95% CI to equal 0.0; that's not a realistic - # value so ignore it - # (for median it would be nice to actually check if we're not - # in the vicinity of the clock resolution, and ignore median - # then, but that's much more complex so don't do it for now) - small_ci = min(i for i in small_cis if i != 0) - if small_ci < 1e-10: - explanation = ( - "Implementation verified as not " - "providing a timing side-channel signal") - elif small_ci < 1e-9: - explanation = ( - "Implementation most likely not " - "providing a timing side-channel signal") - elif small_ci < 1e-2: explanation = ( - "Large confidence intervals detected, " - "collecting more data necessary. Side channel " - "leakage smaller than {0:.3e}s is possible".format( - small_ci)) + "All 95% CIs are equal 0. Too small sammple" + " or too low clock resolution for the measurement.") + print("ERROR: " + explanation) else: - explanation = ( - "Very large confidence intervals detected. " - "Incorrect or missing --clock-frequency option?") + # when measuring values below clock frequency + # or very small pieces of code with high resolution clock + # it may cause the 95% CI to equal 0.0; that's not a + # realistic value so ignore it + # (for median it would be nice to actually check if we're + # not in the vicinity of the clock resolution, and ignore + # median then, but that's much more complex so don't do it + # for now) + small_ci = min(i for i in small_cis if i != 0) + if small_ci < 1e-10: + explanation = ( + "Implementation verified as not " + "providing a timing side-channel signal") + elif small_ci < 1e-9: + explanation = ( + "Implementation most likely not " + "providing a timing side-channel signal") + elif small_ci < 1e-2: + explanation = ( + "Large confidence intervals detected, " + "collecting more data necessary. Side channel " + "leakage smaller than {0:.3e}s is possible".format( + small_ci)) + else: + explanation = ( + "Very large confidence intervals detected. " + "Incorrect or missing --clock-frequency option?") txt = "Layperson explanation: {0}".format(explanation) print(txt) @@ -1517,7 +1532,7 @@ def _long_format_to_binary(self, name, name_bin): os.path.isfile(measurements_bin_shape_path) and \ os.path.isfile(measurements_csv_path) and \ os.path.getmtime(measurements_csv_path) < \ - os.path.getmtime(measurements_bin_path): + os.path.getmtime(measurements_bin_path): # pragma: no cover return if self.verbose: @@ -1567,6 +1582,8 @@ def _long_format_to_binary(self, name, name_bin): row_written += len(chunk.index) del measurements_bin + self._total_bit_size_data = row_written + with open(measurements_bin_shape_path, "w") as shape_f: shape_f.write("{0},3\n".format(row_written)) @@ -1590,14 +1607,12 @@ def _remove_suffix(self, string, suffix): return new_string - def skillings_mack_test(self, name): + def skillings_mack_test(self, measurements_bin_path): """ Calculate the p-value of the Skillings-Mack test for the Hamming weight data. """ - measurements_bin_path = name - data = np.memmap(measurements_bin_path, dtype=[('block', np.dtype('i8')), ('group', np.dtype('i2')), @@ -1710,6 +1725,8 @@ def _bit_size_write_summary(self, verdict, skillings_mack_pvalue): all_wilcoxon_values = list(self._bit_size_wilcoxon_test.values()) with open(join(self.output, "analysis_results/report.txt"), "w") as fp: fp.write( + "tlsfuzzer analyse.py version {0} bit size analysis\n\n" + .format(VERSION) + "Skilling-Mack test p-value: {0:.6e}\n" .format(skillings_mack_pvalue) + "Sign test p-values (min, average, max): " + @@ -1726,8 +1743,12 @@ def _bit_size_write_summary(self, verdict, skillings_mack_pvalue): np.average(all_wilcoxon_values), max(all_wilcoxon_values), ) + - "Used {0:,} data observations for results\n" - .format(self._bit_size_data_used) + + "Used {0:,} out of {1:,} available data observations " + .format( + self._total_bit_size_data_used, + self._total_bit_size_data + ) + + "for results.\n" + verdict + "\n\n" + ("-" * 88) + "\n" + "| size | Sign test | Wilcoxon test " + "| {0} | {1} |\n" @@ -1773,16 +1794,26 @@ def _bit_size_write_summary(self, verdict, skillings_mack_pvalue): fp.write(("-" * 88) + "\n") - def generate_report(self, bit_size=False): + def generate_report(self, bit_size=False, hamming_weight=False): """ Compiles a report consisting of statistical tests and plots. :return: int 0 if no difference was detected, 1 otherwise """ - if bit_size: + if hamming_weight: + difference = self.analyse_hamming_weights() + with open(join( + self.output, "analysis_results/report.Hamming_weight.txt" + ), "w") as fp: + fp.write(self._hamming_weight_report) + elif bit_size: name = join(self.output, self.measurements_filename) name_bin = self._remove_suffix(name, '.csv') + '.bin' self._long_format_to_binary(name, name_bin) + if (self._total_bit_size_data == 0 + and os.path.exists(name_bin + ".shape")): + with open(name_bin + ".shape") as fp: + self._total_bit_size_data = int(fp.read().split(',')[0]) skillings_mack_pvalue = self.skillings_mack_test(name_bin) ret_val = self.analyze_bit_sizes() @@ -1790,7 +1821,6 @@ def generate_report(self, bit_size=False): ret_val, skillings_mack_pvalue ) self._bit_size_write_summary(verdict, skillings_mack_pvalue) - else: # the Friedman test is fairly long running, non-multithreadable # and with fairly limited memory use, so run it in background @@ -1957,7 +1987,7 @@ def create_k_specific_dirs(self): progress = Thread(target=progress_report, args=(status,), kwargs=kwargs) progress.start() - except FileNotFoundError: + except FileNotFoundError: # pragma: no cover pass measurement_iter = self._read_bit_size_measurement_file(status=status) @@ -1993,25 +2023,10 @@ def create_k_specific_dirs(self): k_sizes = list(k_size_process_pipe.keys()) k_sizes = sorted(k_sizes, reverse=True) - if self.skip_sanity and max_k_size in k_sizes: - k_sizes.remove(max_k_size) - if self.verbose: print("[i] Max K size detected: {0}".format(max_k_size)) print("[i] Min K size detected: {0}".format(k_sizes[-1])) - if not self.skip_sanity: - max_k_folder_path = join( - self.output, - "analysis_results/k-by-size/{0}".format(max_k_size) - ) - with open(join(max_k_folder_path, "timing.csv"), 'r') as fp: - for count, line in enumerate(fp): - pass - if count < 2: - shutil.rmtree(max_k_folder_path) - k_sizes.remove(max_k_size) - return k_sizes def conf_plot_for_all_k(self, k_sizes): @@ -2084,7 +2099,7 @@ def conf_plot_for_all_k(self, k_sizes): ), bbox_inches="tight" ) - def _check_data_for_rel_t_test(self): + def _check_data_for_zero(self): non_zero_diffs = 0 ret_val = False @@ -2107,37 +2122,45 @@ def _check_data_for_rel_t_test(self): def _figure_out_analysis_data_size(self, k_sizes): pair = TestPair(0, 1) old_output = self.output + old_vebose = self.verbose + self.verbose = False + max_limit = 0 if self.bit_recognition_size >= len(k_sizes): self.bit_recognition_size = len(k_sizes) - 1 - k_size = k_sizes[self.bit_recognition_size] - self.output = join( - self.output, "analysis_results/k-by-size/{0}".format(k_size) - ) - - recognition_results = self.calc_diff_conf_int(pair) - recognition_cis = [ - recognition_results[method][2] - recognition_results[method][0] - for method in recognition_results - ] - non_zero_recognition_cis = [x for x in recognition_cis if x > 0] - - if len(non_zero_recognition_cis) == 0: - print("[W] There is not enough data on recognicion size to " + - "calulate desired sample size. Using all available samples.") - self._bit_size_data_limit = None + for index in range(self.bit_recognition_size - 1, -1, -1): + k_size = k_sizes[index] + self.output = join( + old_output, "analysis_results/k-by-size/{0}".format(k_size)) + + recognition_results = self.calc_diff_conf_int(pair) + recognition_cis = [ + recognition_results[method][2] - recognition_results[method][0] + for method in recognition_results + ] + non_zero_recognition_cis = [x for x in recognition_cis if x > 0] + + if len(non_zero_recognition_cis) == 0: + print("[W] There is not enough data on recognition size to " + + "calculate desired sample size. " + + "Using all available samples.") + self._bit_size_data_limit = None + self._bit_size_data_used = None + self.verbose = old_vebose + self.output = old_output + return + + smaller_recognition_ci = min( + x for x in non_zero_recognition_cis if x > 0) + magnitude_diff = smaller_recognition_ci / self.bit_size_desired_ci + max_limit = max(max_limit, round( + (magnitude_diff ** 2) * self._bit_size_data_used)) self._bit_size_data_used = None - return - smaller_recognition_ci = min( - x for x in non_zero_recognition_cis if x > 0 - ) - magnitude_diff = smaller_recognition_ci / self.bit_size_desired_ci - self._bit_size_data_limit = round( - (magnitude_diff ** 2) * self._bit_size_data_used - ) - self._bit_size_data_used = None + self._bit_size_data_limit = max_limit + self.verbose = old_vebose + self.output = old_output if self.verbose: if self.bit_recognition_size == 1: @@ -2156,8 +2179,6 @@ def _figure_out_analysis_data_size(self, k_sizes): .format(self.bit_size_desired_ci, size_text) ) - self.output = old_output - def analyze_bit_sizes(self): """ Analyses K bit-sizes and creates the plots and the test result files @@ -2247,29 +2268,33 @@ def analyze_bit_sizes(self): ) # Paired t-test - if self._check_data_for_rel_t_test(): + if self._check_data_for_zero(): results = self.rel_t_test() output_files['paired_t_test'].write( "K size of {0}: {1}\n".format(k_size, results[(0, 1)]) ) + + results = self.wilcoxon_test() + pvalue = results[(0, 1)] + output_files['wilcoxon_test'].write( + "K size of {0}: {1}\n".format(k_size, pvalue) + ) + self._bit_size_wilcoxon_test[k_size] = pvalue + if pvalue < alpha_with_correction: + ret_val = 1 else: if self.verbose: print("[i] Not enough data to perform reliable " "paired t-test.") + print("[i] Not enough data to perform reliable " + "Wilcoxon signed-rank test.") output_files['paired_t_test'].write( "K size of {0}: Too few points\n".format(k_size) ) - - # Wilcoxon test - results = self.wilcoxon_test() - pvalue = results[(0, 1)] - output_files['wilcoxon_test'].write( - "K size of {0}: {1}\n".format(k_size, pvalue) - ) - self._bit_size_wilcoxon_test[k_size] = pvalue - if pvalue < alpha_with_correction: - ret_val = 1 + output_files['wilcoxon_test'].write( + "K size of {0}: Too few points\n".format(k_size) + ) # Creating graphs self.conf_interval_plot() @@ -2277,7 +2302,7 @@ def analyze_bit_sizes(self): self.diff_scatter_plot() try: self.graph_worst_pair(testPair) - except AssertionError: + except AssertionError: # pragma: no cover if self.verbose: print( "[i] Couldn't create worst pair graph.".format( @@ -2349,6 +2374,10 @@ def analyze_bit_sizes(self): ) output_files['bootstrap_test'].write("\n") + if self._bit_size_data_used: + self._total_bit_size_data_used += self._bit_size_data_used + self._bit_size_data_used = None + for key in output_files: output_files[key].close() @@ -2412,13 +2441,9 @@ def _split_data_to_pairwise(self, name): key=lambda x: x[1]) most_common = set(i for i, j in group_counts[-5:]) - slope_path = join( - self.output, - "analysis_results/by-pair-sizes/slope") - try: - os.makedirs(slope_path) - except FileExistsError: - pass + slope_path = join(self.output, + "analysis_results/by-pair-sizes/slope") + os.makedirs(slope_path, exist_ok=True) pair_writers['slope'] = open( join(slope_path, "timing.csv"), "w") @@ -2534,17 +2559,28 @@ def _analyse_weight_pairs(self, pairs): self.class_names = list(data) self.run_sign_test = True - results = self.sign_test() - print("Slope sign test: {0}".format(results[(0, 1)])) + sign_test_results = self.sign_test() + sign_test_text = "Slope sign test: {0}".format( + sign_test_results[(0, 1)]) self.run_wilcoxon_test = True - results = self.wilcoxon_test() - print("Slope Wilcoxon signed rank test: {0}".format( - results[(0, 1)])) + wilcoxon_test_results = self.wilcoxon_test() + wilcoxon_test_text = "Slope Wilcoxon signed rank test: {0}"\ + .format(wilcoxon_test_results[(0, 1)]) self.run_t_test = True - results = self.rel_t_test() - print("Slope t-test: {0}".format(results[(0, 1)])) + rel_t_test_results = self.rel_t_test() + rel_t_test_text = "Slope t-test: {0}".format( + rel_t_test_results[(0, 1)]) + + self._hamming_weight_report += '\n' + self._hamming_weight_report += sign_test_text + '\n' + self._hamming_weight_report += wilcoxon_test_text + '\n' + self._hamming_weight_report += rel_t_test_text + '\n' + if self.verbose: + print("[i] " + sign_test_text) + print("[i] " + wilcoxon_test_text) + print("[i] " + rel_t_test_text) # conf_interval_plot is disabled by the draw_conf_interval_plot old_conf_interval = self.draw_conf_interval_plot @@ -2638,7 +2674,12 @@ def _analyse_weight_pairs(self, pairs): "analysis_results/by-pair-sizes/slope") boots = dict() - print("Bootstrapped confidence intervals for the time/weight slope") + self._hamming_weight_report += ("\nBootstrapped confidence " + + "intervals for the time/weight " + + "slope\n") + if self.verbose: + print("[i] Bootstrapped confidence intervals " + + "for the time/weight slope") for method, method_name in methods.items(): with open(join(in_dir, "bootstrapped_{0}.csv".format(method)), "r", encoding='utf-8') as fp: @@ -2647,15 +2688,23 @@ def _analyse_weight_pairs(self, pairs): ] quantile = np.quantile(boots[method], [0.025, 0.975, 0.5]) - print("{0} of differences: {4:.5e} s/bit, 95% CI: " - "{1:.5e} s/bit, {2:.5e} s/bit (±{3:.3e} s/bit)".format( - method_name, quantile[0], quantile[1], - (quantile[1] - quantile[0])/2, quantile[2])) + quantile_text = "{0} of differences: ".format(method_name) + quantile_text += "{0:.5e} s/bit, 95% CI: {1:.5e} s/bit, ".format( + quantile[2], quantile[0]) + quantile_text += "{0:.5e} s/bit (±{1:.3e} s/bit)".format( + quantile[1], (quantile[1] - quantile[0])/2) - def analyse_hamming_weights(self, name=None): - if name is None: - name = "measurements.csv" - name = join(self.output, name) + self._hamming_weight_report += quantile_text + '\n' + if self.verbose: + print("[i] " + quantile_text) + + def analyse_hamming_weights(self): + name = join(self.output, self.measurements_filename) + + self._hamming_weight_report += "tlsfuzzer analyse.py version {0} "\ + .format(VERSION) + self._hamming_weight_report += "Hamming weight analysis " + self._hamming_weight_report += "(experimental)\n\n" # first make sure the binary file exists name_bin = self._remove_suffix(name, '.csv') + ".bin" @@ -2663,36 +2712,38 @@ def analyse_hamming_weights(self, name=None): skillings_mack_p_value = self.skillings_mack_test(name_bin) - print("Skillings-Mack test p-value: {0}".format( - skillings_mack_p_value)) + self._hamming_weight_report += "Skillings-Mack test p-value: {0}\n"\ + .format(skillings_mack_p_value) most_common, pairs = self._split_data_to_pairwise(name_bin) - ns_sm_p_value = None - hundred_ps_sm_p_value = None + sm_p_values = {} if skillings_mack_p_value > 1e-5: tmp_file = name_bin + ".tmp" - shutil.copyfile(name_bin, tmp_file) - self._add_value_to_group(tmp_file, most_common[0], 1e-9) - ns_sm_p_value = self.skillings_mack_test(tmp_file) - print("1ns: {}".format(ns_sm_p_value)) - os.remove(tmp_file) - - shutil.copyfile(name_bin, tmp_file) - self._add_value_to_group(tmp_file, most_common[0], 1e-10) - hundred_ps_sm_p_value = self.skillings_mack_test(tmp_file) - print("0.1ns: {}".format(hundred_ps_sm_p_value)) - os.remove(tmp_file) + self._hamming_weight_report += "Skillings-Mack test p-value after " + self._hamming_weight_report += "intoducing a side-channel of:\n" + + for time in [10, 1, 0.1]: + shutil.copyfile(name_bin, tmp_file) + self._add_value_to_group(tmp_file, most_common[0], time * 1e-9) + p_value = self.skillings_mack_test(tmp_file) + sm_p_values[time] = p_value + self._hamming_weight_report += "\t{0}ns: {1}\n".format( + time, p_value) + if self.verbose: + print("[i] {0}ns: {1}".format(time, p_value)) + os.remove(tmp_file) self._analyse_weight_pairs(pairs) - print("Skillings-Mack test p-value: {0}".format( - skillings_mack_p_value)) - if ns_sm_p_value is not None: - print("Sample large enough to detect 1 ns difference: {}" - .format(ns_sm_p_value < 1e-9)) - print("Sample large enough to detect 0.1 ns difference: {}" - .format(hundred_ps_sm_p_value < 1e-9)) + if self.verbose: + print("[i] Skillings-Mack test p-value: {0}".format( + skillings_mack_p_value)) + if len(sm_p_values.keys()) is not None: + for time in sm_p_values: + print(("[i] Sample large enough to detect {0} ns " + "difference: {1}").format( + time, sm_p_values[time] < 1e-9)) if skillings_mack_p_value < self.alpha: return 1