diff --git a/reloading/test_reloading.py b/reloading/test_reloading.py index 8ad81ee..64c1b1e 100644 --- a/reloading/test_reloading.py +++ b/reloading/test_reloading.py @@ -6,7 +6,8 @@ from reloading import reloading SRC_FILE_NAME = 'temporary_testing_file.py' -SRC_FILE_CONTENT = ''' + +TEST_CHANGING_SOURCE_CONTENT = ''' from reloading import reloading from time import sleep @@ -15,6 +16,57 @@ print('INITIAL_FILE_CONTENTS') ''' +TEST_KEEP_LOCAL_VARIABLES_CONTENT = ''' +from reloading import reloading +from time import sleep + +fpath = "DON'T CHANGE ME" +for epoch in reloading(range(1)): + assert fpath == "DON'T CHANGE ME" +''' + +TEST_PERSIST_AFTER_LOOP = ''' +from reloading import reloading +from time import sleep + +state = 'INIT' +for epoch in reloading(range(1)): + state = 'CHANGED' + +assert state == 'CHANGED' +''' + + +def run_and_update_source(init_src, updated_src=None, update_after=0.2): + '''Runs init_src in a subprocess and updates source to updated_src after + update_after seconds. Returns the standard output of the subprocess and + whether the subprocess produced an uncaught exception. + ''' + with open(SRC_FILE_NAME, 'w') as f: + f.write(init_src) + + cmd = ['python', SRC_FILE_NAME] + with sp.Popen(cmd, stdout=sp.PIPE, stderr=sp.PIPE) as proc: + if updated_src != None: + time.sleep(update_after) + with open(SRC_FILE_NAME, 'w') as f: + f.write(updated_src) + + try: + stdout, _ = proc.communicate(timeout=2) + stdout = stdout.decode('utf-8') + has_error = False + except: + stdout = '' + has_error = True + proc.terminate() + + if os.path.isfile(SRC_FILE_NAME): + os.remove(SRC_FILE_NAME) + + return stdout, has_error + + class TestReloading(unittest.TestCase): def test_simple_looping(self): @@ -23,29 +75,21 @@ def test_simple_looping(self): iters += 1 def test_changing_source(self): - with open(SRC_FILE_NAME, 'w') as f: - f.write(SRC_FILE_CONTENT) + stdout, _ = run_and_update_source( + init_src=TEST_CHANGING_SOURCE_CONTENT, + updated_src=TEST_CHANGING_SOURCE_CONTENT.replace('INITIAL', 'CHANGED').rstrip('\n')) - cmd = ['python', SRC_FILE_NAME] - with sp.Popen(cmd, stdout=sp.PIPE) as proc: - # wait for first loop iterations to run before changing source file - time.sleep(0.2) - with open(SRC_FILE_NAME, 'w') as f: - f.write(SRC_FILE_CONTENT.replace('INITIAL', 'CHANGED').rstrip('\n')) + self.assertTrue('INITIAL_FILE_CONTENTS' in stdout and + 'CHANGED_FILE_CONTENTS' in stdout) - # check if output contains results from before and after change - stdout = proc.stdout.read().decode('utf-8') - self.assertTrue('INITIAL_FILE_CONTENTS' in stdout and - 'CHANGED_FILE_CONTENTS' in stdout) + def test_keep_local_variables(self): + _, has_error = run_and_update_source(init_src=TEST_KEEP_LOCAL_VARIABLES_CONTENT) + self.assertFalse(has_error) - def test_local_vars_are_not_overwritten(self): - fpath = "test" - for _ in reloading(range(1)): - self.assertEqual(fpath, "test") + def test_persist_after_loop(self): + _, has_error = run_and_update_source(init_src=TEST_PERSIST_AFTER_LOOP) + self.assertFalse(has_error) - def tearDown(self): - if os.path.isfile(SRC_FILE_NAME): - os.remove(SRC_FILE_NAME) if __name__ == '__main__': unittest.main()