Skip to content

Commit

Permalink
Added more tests and Pep8 fixes (#279)
Browse files Browse the repository at this point in the history
* * added more tests

* coverage increased from 65% to 91%

* pep8 fixes

* * fixed long string to comply with pep8

* changed permissions of 2 test files

* * f-strings to string concatenation for python3.5 support

* max-line-length 120 fix throughout repo
  • Loading branch information
lordlabuckdas authored Feb 3, 2021
1 parent 2593c86 commit 5af7675
Show file tree
Hide file tree
Showing 25 changed files with 290 additions and 125 deletions.
24 changes: 4 additions & 20 deletions bin/clone
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,11 @@ from snare.utils.snare_helpers import check_privileges, print_color, str_to_bool
def main():
loop = asyncio.get_event_loop()
parser = argparse.ArgumentParser()
parser.add_argument(
"--target",
help="domain of the site to be cloned",
required=True)
parser.add_argument(
"--max-depth",
help="max depth of the cloning",
required=False,
default=sys.maxsize)
parser.add_argument("--target", help="domain of the site to be cloned", required=True)
parser.add_argument("--max-depth", help="max depth of the cloning", required=False, default=sys.maxsize)
parser.add_argument("--log_path", help="path to the log file")
parser.add_argument(
"--css-validate",
help="set whether css validation is required",
type=str_to_bool,
default=None)
parser.add_argument(
"--path",
help="path to save the page to be cloned",
required=False,
default="/opt/"
)
parser.add_argument("--css-validate", help="set whether css validation is required", type=str_to_bool, default=None)
parser.add_argument("--path", help="path to save the page to be cloned", required=False, default="/opt/")
args = parser.parse_args()
default_path = os.path.join(args.path, 'snare')

Expand Down
55 changes: 17 additions & 38 deletions bin/snare
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ from snare.utils.snare_helpers import check_privileges, check_meta_file, print_c
def create_initial_config(base_path):
cfg = configparser.ConfigParser()
cfg['WEB-TOOLS'] = dict(google='', bing='')
with open(os.path.join(base_path,'snare.cfg'), 'w') as configfile:
with open(os.path.join(base_path, 'snare.cfg'), 'w') as configfile:
cfg.write(configfile)


Expand All @@ -49,16 +49,16 @@ def snare_setup(base_path):
print_color(err, 'WARNING')
sys.exit(1)

if not os.path.exists(os.path.join(base_path,'pages')):
os.makedirs(os.path.join(base_path,'pages'))
if not os.path.exists(os.path.join(base_path, 'pages')):
os.makedirs(os.path.join(base_path, 'pages'))
# Write pid to pid file
with open(os.path.join(base_path,'snare.pid'), 'wb') as pid_fh:
with open(os.path.join(base_path, 'snare.pid'), 'wb') as pid_fh:
pid_fh.write(str(os.getpid()).encode('utf-8'))
# Config file
if not os.path.exists(os.path.join(base_path,'snare.cfg')):
if not os.path.exists(os.path.join(base_path, 'snare.cfg')):
create_initial_config(base_path)
# Read or create the sensor id
uuid_file_path = os.path.join(base_path,'snare.uuid')
uuid_file_path = os.path.join(base_path, 'snare.uuid')
if os.path.exists(uuid_file_path):
with open(uuid_file_path, 'rb') as uuid_fh:
snare_uuid = uuid_fh.read()
Expand All @@ -79,10 +79,7 @@ def drop_privileges():
os.setuid(wanted_user.pw_uid)
new_user = pwd.getpwuid(os.getuid())
new_group = grp.getgrgid(os.getgid())
print_color(
'privileges dropped, running as "{}:{}"'.format(
new_user.pw_name,
new_group.gr_name), 'INFO')
print_color('privileges dropped, running as "{}:{}"'.format(new_user.pw_name, new_group.gr_name), 'INFO')


def compare_version_info(timeout):
Expand All @@ -96,8 +93,7 @@ def compare_version_info(timeout):
print_color('timeout fetching the repository version', 'ERROR')
else:
if diff_list:
print_color(
'you are running an outdated version, SNARE will be updated and restarted', 'INFO')
print_color('you are running an outdated version, SNARE will be updated and restarted', 'INFO')
repo.git.reset('--hard')
repo.heads.master.checkout()
repo.git.clean('-xdf')
Expand All @@ -120,9 +116,7 @@ async def check_tanner():
version = result["version"]
vm.check_compatibility(version)
except aiohttp.ClientOSError:
print_color(
"Can't connect to tanner host {}".format(req_url),
'ERROR')
print_color("Can't connect to tanner host {}".format(req_url), 'ERROR')
exit(1)
else:
await resp.release()
Expand All @@ -137,16 +131,9 @@ if __name__ == '__main__':
""")
parser = argparse.ArgumentParser()
page_group = parser.add_mutually_exclusive_group(required=True)
page_group.add_argument("--page-dir",
help="name of the folder to be served")
page_group.add_argument(
"--list-pages",
help="list available pages",
action='store_true')
parser.add_argument(
"--index-page",
help="file name of the index page",
default='index.html')
page_group.add_argument("--page-dir", help="name of the folder to be served")
page_group.add_argument("--list-pages", help="list available pages", action='store_true')
parser.add_argument("--index-page", help="file name of the index page", default='index.html')
parser.add_argument("--port", help="port to listen on", default='8080')
parser.add_argument("--host-ip", help="host ip to bind to", default='127.0.0.1')
parser.add_argument("--debug", help="run web server in debug mode", default=False)
Expand Down Expand Up @@ -185,8 +172,7 @@ if __name__ == '__main__':
args_dict = vars(args)
args_dict['full_page_path'] = os.path.realpath(full_page_path)
if not os.path.exists(full_page_path):
print_color(
"--page-dir: {0} does not exist".format(args.page_dir), 'ERROR')
print_color("--page-dir: {0} does not exist".format(args.page_dir), 'ERROR')
exit()
args.index_page = os.path.join("/", args.index_page)

Expand All @@ -202,8 +188,7 @@ if __name__ == '__main__':
print_color("Error found in meta.json. Please clone the pages again.", "ERROR")
exit()

if not os.path.exists(os.path.join(full_page_path,
os.path.join(meta_info[args.index_page]['hash']))):
if not os.path.exists(os.path.join(full_page_path, os.path.join(meta_info[args.index_page]['hash']))):
print_color('can\'t create meta tag', 'WARNING')
else:
snare_helpers.add_meta_tag(args.page_dir, meta_info[args.index_page]['hash'], config, base_path)
Expand All @@ -214,15 +199,9 @@ if __name__ == '__main__':
compare_version_fut = None
if args.auto_update is True:
timeout = snare_helpers.parse_timeout(args.update_timeout)
compare_version_fut = loop.run_in_executor(
pool, compare_version_info, timeout)

app = HttpRequestHandler(
meta_info,
args,
snare_uuid,
debug=args.debug,
keep_alive=75)
compare_version_fut = loop.run_in_executor(pool, compare_version_info, timeout)

app = HttpRequestHandler(meta_info, args, snare_uuid, debug=args.debug, keep_alive=75)

print_color('serving with uuid {0}'.format(snare_uuid.decode('utf-8')), 'INFO')
print_color("Debug logs will be stored in {}".format(log_debug), 'INFO')
Expand Down
5 changes: 2 additions & 3 deletions snare/tanner_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ async def submit_data(self, data):
event_result = await r.json()
except (json.decoder.JSONDecodeError, aiohttp.client_exceptions.ContentTypeError) as e:
self.logger.error('Error submitting data: {} {}'.format(e, data))
event_result = {'version': '0.6.0', 'response': {'message': {'detection':
{'name': 'index', 'order': 1, 'type': 1, 'version': '0.6.0'},
'sess_uuid': data['uuid']}}}
event_result = {'version': '0.6.0', 'response': {'message': {'detection': {
'name': 'index', 'order': 1, 'type': 1, 'version': '0.6.0'}, 'sess_uuid': data['uuid']}}}
finally:
await r.release()
except Exception as e:
Expand Down
17 changes: 5 additions & 12 deletions snare/tests/test_cloner_get_body.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,7 @@ def setUp(self):
self.loop = asyncio.new_event_loop()
self.css_validate = 'false'
self.handler = Cloner(self.root, self.max_depth, self.css_validate)
self.target_path = '/opt/snare/pages/{}'.format(
yarl.URL(self.root).host)
self.target_path = '/opt/snare/pages/{}'.format(yarl.URL(self.root).host)
self.return_content = None
self.expected_content = None
self.filename = None
Expand Down Expand Up @@ -51,8 +50,7 @@ def test_get_body(self):

aiohttp.ClientResponse._headers = {'Content-Type': 'text/html'}
aiohttp.ClientResponse.read = AsyncMock(return_value=self.content)
self.filename, self.hashname = self.handler._make_filename(
yarl.URL(self.root))
self.filename, self.hashname = self.handler._make_filename(yarl.URL(self.root))
self.expected_content = '<html><body><a href="/test"></a></body></html>'

self.meta = {
Expand All @@ -72,17 +70,13 @@ async def test():

with self.assertLogs(level='DEBUG') as log:
self.loop.run_until_complete(test())
self.assertIn(
'DEBUG:snare.cloner:Cloned file: /test',
''.join(
log.output))
self.assertIn('DEBUG:snare.cloner:Cloned file: /test', ''.join(log.output))

with open(os.path.join(self.target_path, self.hashname)) as f:
self.return_content = f.read()

self.assertEqual(self.return_content, self.expected_content)
self.assertEqual(
self.handler.visited_urls[-2:], ['http://example.com/', 'http://example.com/test'])
self.assertEqual(self.handler.visited_urls[-2:], ['http://example.com/', 'http://example.com/test'])
self.assertEqual(self.handler.meta, self.meta)

def test_get_body_css_validate(self):
Expand Down Expand Up @@ -142,8 +136,7 @@ async def test():
self.loop.run_until_complete(test())
self.assertEqual(self.return_size, self.q_size)
self.assertEqual(self.handler.meta, self.meta)
self.assertEqual(
self.handler.visited_urls[-1], self.expected_content)
self.assertEqual(self.handler.visited_urls[-1], self.expected_content)

def test_client_error(self):
self.session.get = AsyncMock(side_effect=aiohttp.ClientError)
Expand Down
43 changes: 43 additions & 0 deletions snare/tests/test_cloner_get_root_host.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import unittest
from unittest import mock
import sys
from snare.cloner import Cloner
import shutil
from yarl import URL
import asyncio
import aiohttp
from snare.utils.asyncmock import AsyncMock


class TestClonerGetRootHost(unittest.TestCase):
def setUp(self):
self.loop = asyncio.new_event_loop()

def test_moved_root(self):
self.root = 'http://example.com'
self.max_depth = sys.maxsize
self.css_validate = 'false'
self.handler = Cloner(self.root, self.max_depth, self.css_validate)
self.expected_moved_root = URL('http://www.example.com')

async def test():
await self.handler.get_root_host()

self.loop.run_until_complete(test())

self.assertEqual(self.handler.moved_root, self.expected_moved_root)

@mock.patch('aiohttp.ClientSession')
def test_clienterror(self, session):
self.root = 'http://example.com'
self.max_depth = sys.maxsize
self.css_validate = 'false'
self.handler = Cloner(self.root, self.max_depth, self.css_validate)

aiohttp.ClientSession = mock.Mock(side_effect=aiohttp.ClientError)

async def test():
await self.handler.get_root_host()

with self.assertRaises(SystemExit):
self.loop.run_until_complete(test())
18 changes: 18 additions & 0 deletions snare/tests/test_cloner_init.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import unittest
import sys
from snare.cloner import Cloner
import shutil


class TestClonerInitialization(unittest.TestCase):
def setUp(self):
self.root = 'http://example.com'
self.max_depth = sys.maxsize
self.css_validate = 'false'
self.handler = Cloner(self.root, self.max_depth, self.css_validate, default_path='/tmp')

def test_cloner_init(self):
self.assertIsInstance(self.handler, Cloner)

def tearDown(self):
shutil.rmtree(self.handler.target_path)
3 changes: 1 addition & 2 deletions snare/tests/test_cloner_make_filename.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ def test_make_filename(self):
self.assertEqual(self.hashname, '167a0418dd8ce3bf0ef00dfb6195f038')

def test_make_filename_same_host(self):
self.filename, self.hashname = self.handler._make_filename(
yarl.URL(self.root))
self.filename, self.hashname = self.handler._make_filename(yarl.URL(self.root))
self.assertEqual(self.filename, '/index.html')
self.assertEqual(self.hashname, 'd1546d731a9f30cc80127d57142a482b')

Expand Down
13 changes: 9 additions & 4 deletions snare/tests/test_cloner_process_links.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,15 @@ async def test():

self.loop.run_until_complete(test())
self.assertEqual(self.return_content, '/foo/путь/')
self.assertEqual(
yarl.URL(
self.return_url).human_repr(),
self.expected_content)
self.assertEqual(yarl.URL(self.return_url).human_repr(), self.expected_content)
self.assertEqual(self.return_level, self.level + 1)

self.handler.moved_root = yarl.URL('http://example2.com')
self.expected_content = 'http://example2.com/foo/путь/'

self.loop.run_until_complete(test())
self.assertEqual(self.return_content, '/foo/путь/')
self.assertEqual(yarl.URL(self.return_url).human_repr(), self.expected_content)
self.assertEqual(self.return_level, self.level + 1)

def test_process_link_absolute(self):
Expand Down
17 changes: 15 additions & 2 deletions snare/tests/test_cloner_replace_links.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ async def test():

self.loop.run_until_complete(test())
self.assertEqual(str(self.return_content), self.expected_content)
self.handler.process_link.assert_called_with(
self.root, self.level, check_host=True)
self.handler.process_link.assert_called_with(self.root, self.level, check_host=True)

def test_replace_image_links(self):
self.handler.process_link = AsyncMock(return_value="/smiley.png")
Expand Down Expand Up @@ -65,5 +64,19 @@ async def test():
self.assertEqual(str(self.return_content), self.expected_content)
self.handler.process_link.assert_called_with(self.root, self.level)

def test_replace_redirects(self):
self.root = "http://example.com"
self.content = ('\n<html>\n<body>\n<p name="redirect" value="http://example.com/home.html">Redirecting...</p>\n'
'</body>\n</html>\n')

self.expected_content = ('\n<html>\n<body>\n<p name="redirect" value="/home.html">Redirecting...</p>\n</body>\n'
'</html>\n')

async def test():
self.return_content = await self.handler.replace_links(self.content, self.level)

self.loop.run_until_complete(test())
self.assertEqual(str(self.return_content), self.expected_content)

def tearDown(self):
shutil.rmtree(self.main_page_path)
17 changes: 17 additions & 0 deletions snare/tests/test_cloner_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import unittest
import sys
from snare.cloner import Cloner
import shutil
import asyncio


class TestClonerRun(unittest.TestCase):
def setUp(self):
self.root = 'http://example.com'
self.max_depth = sys.maxsize
self.css_validate = 'false'
self.handler = Cloner(self.root, self.max_depth, self.css_validate, default_path='/tmp')
self.loop = asyncio.new_event_loop()

def test_run(self):
self.loop.run_until_complete(self.handler.run())
13 changes: 4 additions & 9 deletions snare/tests/test_html_handler_get_dorks.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,13 @@ def setUp(self):
self.data = None

def test_get_dorks(self):
aiohttp.ClientResponse.json = AsyncMock(
return_value=dict(response={'dorks': "test_dorks"}))
aiohttp.ClientResponse.json = AsyncMock(return_value=dict(response={'dorks': "test_dorks"}))

async def test():
self.data = await self.handler.get_dorks()

self.loop.run_until_complete(test())
aiohttp.ClientSession.get.assert_called_with(
'http://tanner.mushmush.org:8090/dorks', timeout=10.0)
aiohttp.ClientSession.get.assert_called_with('http://tanner.mushmush.org:8090/dorks', timeout=10.0)

def test_return_dorks(self):
aiohttp.ClientResponse.json = AsyncMock(return_value=self.dorks)
Expand All @@ -53,17 +51,14 @@ async def test():
self.assertEqual(self.data, self.dorks['response']['dorks'])

def test_logging_error(self):
aiohttp.ClientResponse.json = AsyncMock(
side_effect=JSONDecodeError('ERROR', '', 0))
aiohttp.ClientResponse.json = AsyncMock(side_effect=JSONDecodeError('ERROR', '', 0))

async def test():
self.data = await self.handler.get_dorks()

with self.assertLogs(level='ERROR') as log:
self.loop.run_until_complete(test())
self.assertIn(
'Error getting dorks: ERROR: line 1 column 1 (char 0)',
log.output[0])
self.assertIn('Error getting dorks: ERROR: line 1 column 1 (char 0)', log.output[0])

def test_logging_timeout(self):
aiohttp.ClientResponse.json = AsyncMock(
Expand Down
Loading

0 comments on commit 5af7675

Please sign in to comment.