From f6ad1e94b92a55970bc0eb1648f4cb945cbf8976 Mon Sep 17 00:00:00 2001 From: Chris Chang Date: Sat, 24 Mar 2018 11:00:42 -0500 Subject: [PATCH] Urldecode passwords (#22) For example, if a password was `!mysecure:password#`, you'd have to encode it `%21mysecure%3Apassword%23` to get it to parse right (otherwise it'd think it's a port), but then you'd be passing the wrong password in. fixes #19 --- .gitignore | 1 + postdoc.py | 67 ++++++++++++++++++------------ test_postdoc.py | 106 +++++++++++++++++++++++++++++++----------------- 3 files changed, 109 insertions(+), 65 deletions(-) diff --git a/.gitignore b/.gitignore index eaf7061..2092535 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .env *.egg *.egg-info/ +*.eggs *.pyc __pycache__ build/ diff --git a/postdoc.py b/postdoc.py index 878e0c5..2b87cba 100755 --- a/postdoc.py +++ b/postdoc.py @@ -11,50 +11,64 @@ --postdoc-quiet Don't print debugging output. """ +from __future__ import unicode_literals import os import subprocess import sys try: # Python 3 from urllib.parse import urlparse + from urllib.parse import unquote except ImportError: # Python 2 from urlparse import urlparse + from urllib import unquote __version__ = '0.4.0' -def get_uri(env='DATABASE_URL'): +def get_uri(env_var='DATABASE_URL'): """Grab and parse the url from the environment.""" - # trick python3's urlparse into raising an exception - return urlparse(os.environ.get(env, 1337)) + parsed_result = urlparse( + # Trick python3's urlparse into raising when env var is missing + os.environ.get(env_var, 1337) + ) + meta = { + 'scheme': parsed_result.scheme, + 'username': unquote(parsed_result.username or ''), + 'password': unquote(parsed_result.password or ''), + 'hostname': parsed_result.hostname, + 'port': parsed_result.port, + 'path': unquote(parsed_result.path or '/'), + } + return meta def pg_connect_bits(meta): """Turn the url into connection bits.""" bits = [] - if meta.username: - bits.extend(['-U', meta.username]) - if meta.hostname: - bits.extend(['-h', meta.hostname]) - if meta.port: - bits.extend(['-p', str(meta.port)]) + if meta['username']: + bits.extend(['-U', meta['username']]) + if meta['hostname']: + bits.extend(['-h', meta['hostname']]) + if meta['port']: + bits.extend(['-p', str(meta['port'])]) return bits def mysql_connect_bits(meta): """Turn the url into connection bits.""" bits = [] - if meta.username: - bits.extend(['-u', meta.username]) - if meta.password: - # password is one token - bits.append('-p{0}'.format(meta.password)) - if meta.hostname: - bits.extend(['-h', meta.hostname]) - if meta.port: - bits.extend(['-P', str(meta.port)]) + if meta['username']: + bits.extend(['-u', meta['username']]) + if meta['password']: + # `password` is one token for mysql (no whitespace) + bits.append('-p{0}'.format(meta['password'])) + if meta['hostname']: + bits.extend(['-h', meta['hostname']]) + if meta['port']: + bits.extend(['-P', str(meta['port'])]) return bits @@ -65,7 +79,7 @@ def connect_bits(meta): 'postgresql': pg_connect_bits, 'postgis': pg_connect_bits, } - scheme = getattr(meta, 'scheme', 'postgres') # default to postgres + scheme = meta.get('scheme', 'postgres') # Default to postgres # TODO raise a better error than KeyError with an unsupported scheme return bit_makers[scheme](meta) @@ -85,32 +99,31 @@ def get_command(command, meta): bits.append('--dbname') if command == 'mysql': bits.append('--database') - bits.append(meta.path[1:]) - # outtahere + bits.append(meta['path'][1:]) return bits def make_tokens_and_env(sys_argv): """Get the tokens or quit with help.""" if sys_argv[1].isupper(): - environ_key = sys_argv[1] + env_var = sys_argv[1] args = sys_argv[2:] else: - environ_key = 'DATABASE_URL' + env_var = 'DATABASE_URL' args = sys_argv[1:] try: - meta = get_uri(environ_key) + meta = get_uri(env_var) # if we need to switch logic based off scheme multiple places, may want # to normalize it at this point tokens = get_command(args[0], meta) except AttributeError: exit('Usage: phd COMMAND [additional-options]\n\n' - ' ERROR: "{0}" is not set in the environment'.format(environ_key)) + ' ERROR: "{0}" is not set in the environment'.format(env_var)) env = os.environ.copy() # password as environment variable, set it for non-postgres schemas anyways - if meta.password: - env['PGPASSWORD'] = meta.password + if meta['password']: + env['PGPASSWORD'] = meta['password'] # pass any other flags the user set along tokens.extend(args[1:]) return tokens, env diff --git a/test_postdoc.py b/test_postdoc.py index 02a427a..e34a21b 100644 --- a/test_postdoc.py +++ b/test_postdoc.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- - +from __future__ import unicode_literals import os import unittest @@ -14,86 +14,92 @@ exit('Re-run tests in an environment without DATABASE_URL') -class ConnectBitsTest(unittest.TestCase): +class ConnectBitsTests(unittest.TestCase): def test_pg_connect_bits_trivial_case(self): - meta = type('mock', (object, ), - {'username': '', 'hostname': '', 'port': ''}) + meta = {'username': '', 'hostname': '', 'port': ''} result = postdoc.pg_connect_bits(meta) self.assertEqual(result, []) def test_pg_connect_bits_works(self): - meta = type('mock', (object, ), - {'scheme': 'postgres', 'username': '1', 'hostname': '2', 'port': 3}) + meta = {'scheme': 'postgres', 'username': '1', 'hostname': '2', 'port': 3} result = postdoc.pg_connect_bits(meta) self.assertEqual(result, ['-U', '1', '-h', '2', '-p', '3']) result = postdoc.connect_bits(meta) self.assertEqual(result, ['-U', '1', '-h', '2', '-p', '3']) def test_mysql_connect_bits_trivial_case(self): - meta = type('mock', (object, ), - {'username': '', 'password': '', 'hostname': '', 'port': ''}) + meta = {'username': '', 'password': '', 'hostname': '', 'port': ''} result = postdoc.mysql_connect_bits(meta) self.assertEqual(result, []) def test_mysql_connect_bits_works(self): - meta = type('mock', (object, ), - {'scheme': 'mysql', 'username': 'u', 'password': 'p', - 'hostname': 'h', 'port': '3306'}) + meta = {'scheme': 'mysql', 'username': 'u', 'password': 'p', + 'hostname': 'h', 'port': '3306'} result = postdoc.mysql_connect_bits(meta) self.assertEqual(result, ['-u', 'u', '-pp', '-h', 'h', '-P', '3306']) result = postdoc.connect_bits(meta) self.assertEqual(result, ['-u', 'u', '-pp', '-h', 'h', '-P', '3306']) def test_connect_bits_supported_schemas(self): - meta = type('mock', (object, ), - {'username': '', 'password': '', 'hostname': 'h', 'port': ''}) + meta = {'username': '', 'password': '', 'hostname': 'h', 'port': ''} # assert defaults to postgres self.assertTrue(postdoc.connect_bits(meta)) - meta.scheme = 'mysql' + meta['scheme'] = 'mysql' self.assertTrue(postdoc.connect_bits(meta)) - meta.scheme = 'postgres' + meta['scheme'] = 'postgres' self.assertTrue(postdoc.connect_bits(meta)) - meta.scheme = 'postgresql' + meta['scheme'] = 'postgresql' self.assertTrue(postdoc.connect_bits(meta)) - meta.scheme = 'postgis' + meta['scheme'] = 'postgis' self.assertTrue(postdoc.connect_bits(meta)) - meta.scheme = 'foo' + meta['scheme'] = 'foo' self.assertRaises(KeyError, postdoc.connect_bits, meta) -class PHDTest(unittest.TestCase): +class PHDTests(unittest.TestCase): def test_get_uri(self): with mock.patch('postdoc.os') as mock_os: mock_os.environ = { 'DATABASE_URL': 'foo', 'FATTYBASE_URL': 'bar', } - self.assertEqual(postdoc.get_uri().path, 'foo') - self.assertEqual(postdoc.get_uri('FATTYBASE_URL').path, 'bar') + self.assertEqual(postdoc.get_uri()['path'], 'foo') + self.assertEqual(postdoc.get_uri('FATTYBASE_URL')['path'], 'bar') + + def test_get_uri_decodes_urlencoded(self): + with mock.patch('postdoc.os') as mock_os: + mock_os.environ = { + 'DATABASE_URL': 'mysql://user%3F:%21mysecure%3Apassword%23@127.0.0.1:3307/foo', + } + self.assertEqual(postdoc.get_uri(), { + 'scheme': 'mysql', + 'username': 'user?', + 'password': '!mysecure:password#', + 'hostname': '127.0.0.1', + 'port': 3307, + 'path': '/foo', + }) def test_get_command_assembles_bits_in_right_order(self): - meta = type('mock', (object, ), - {'username': '', 'hostname': '', 'port': '', 'password': '', - 'path': '/database'}) + meta = {'username': '', 'hostname': '', 'port': '', 'password': '', + 'path': '/database'} with mock.patch('postdoc.pg_connect_bits') as mock_bits: mock_bits.return_value = ['lol'] self.assertEqual(postdoc.get_command('foo', meta), ['foo', 'lol', 'database']) def test_get_command_ignores_password(self): - meta = type('mock', (object, ), - {'username': '', 'hostname': '', 'port': '', 'password': 'oops', - 'path': '/database'}) + meta = {'username': '', 'hostname': '', 'port': '', 'password': 'oops', + 'path': '/database'} with mock.patch('postdoc.pg_connect_bits') as mock_bits: mock_bits.return_value = ['rofl'] self.assertEqual(postdoc.get_command('bar', meta), ['bar', 'rofl', 'database']) def test_get_commands_can_ignore_database_name(self): - meta = type('mock', (object, ), - {'scheme': 'mysql', 'username': 'u', 'hostname': 'h', 'port': '', - 'password': 'oops', 'path': '/database'}) + meta = {'scheme': 'mysql', 'username': 'u', 'hostname': 'h', 'port': '', + 'password': 'oops', 'path': '/database'} result = postdoc.get_command('mysqladmin', meta) # assert database name is not an argument self.assertNotIn('database', result) @@ -102,23 +108,48 @@ def test_get_commands_can_ignore_database_name(self): ['mysqladmin', '-u', 'u', '-poops', '-h', 'h']) def test_get_command_special_syntax_for_pg_restore(self): - meta = type('mock', (object, ), - {'username': '', 'hostname': '', 'port': '', 'password': 'oops', - 'path': '/database'}) + meta = {'username': '', 'hostname': '', 'port': '', 'password': 'oops', + 'path': '/database'} with mock.patch('postdoc.pg_connect_bits') as mock_bits: mock_bits.return_value = ['rofl'] self.assertEqual(postdoc.get_command('pg_restore', meta), ['pg_restore', 'rofl', '--dbname', 'database']) def test_get_command_special_syntax_for_mysql(self): - meta = type('mock', (object, ), - {'scheme': 'mysql', 'username': '', 'hostname': '', 'port': '', - 'password': 'oops', 'path': '/database'}) + meta = {'scheme': 'mysql', 'username': '', 'hostname': '', 'port': '', + 'password': 'oops', 'path': '/database'} with mock.patch('postdoc.connect_bits') as mock_bits: mock_bits.return_value = ['rofl'] self.assertEqual(postdoc.get_command('mysql', meta), ['mysql', 'rofl', '--database', 'database']) + def test_make_tokens_and_env_happy_case(self): + mock_os = mock.MagicMock(environ={ + 'DATABASE_URL': 'mysql://u:p@h:3306/test', + }) + + with mock.patch.multiple(postdoc, os=mock_os): + tokens, env = postdoc.make_tokens_and_env( + ['argv1', 'mysql', 'extra_arg']) + self.assertEqual( + tokens, + ['mysql', '-u', 'u', '-pp', '-h', 'h', '-P', '3306', '--database', 'test', 'extra_arg'] + ) + + @unittest.skip('TODO') + def test_make_tokens_and_env_handles_urlencoded_password(self): + mock_os = mock.MagicMock(environ={ + 'DATABASE_URL': 'mysql://u:%21mysecure%3Apassword%23@h/test', + }) + + with mock.patch.multiple(postdoc, os=mock_os): + tokens, env = postdoc.make_tokens_and_env( + ['argv1', 'mysql', 'extra_arg']) + self.assertEqual( + tokens, + ['mysql', '-u', 'u', '-p!mysecure:password#', '-h', 'h', '--database', 'test', 'extra_arg'] + ) + def test_make_tokens_and_env_exits_with_bad_command(self): with self.assertRaises(SystemExit): postdoc.make_tokens_and_env(['phd', 'fun']) @@ -213,8 +244,7 @@ def test_main_command_debug_can_be_quiet(self): def test_main_passes_password_in_env(self): my_password = 'hunter2' - meta = type('mock', (object, ), - {'password': my_password}) + meta = {'password': my_password} mock_subprocess = mock.MagicMock() mock_get_command = mock.MagicMock(return_value=['get_command']) mock_get_uri = mock.MagicMock(return_value=meta)