Skip to content

Commit

Permalink
Urldecode passwords (#22)
Browse files Browse the repository at this point in the history
For example, if a password was `!mysecure:password#`, you'd have to encode it `%21mysecure%3Apassword%23` to get it to parse right (otherwise it'd think it's a port), but then you'd be passing the wrong password in.

fixes #19
  • Loading branch information
crccheck authored Mar 24, 2018
1 parent 73eab13 commit f6ad1e9
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 65 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
.env
*.egg
*.egg-info/
*.eggs
*.pyc
__pycache__
build/
Expand Down
67 changes: 40 additions & 27 deletions postdoc.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,50 +11,64 @@
--postdoc-quiet Don't print debugging output.
"""

from __future__ import unicode_literals
import os
import subprocess
import sys
try:
# Python 3
from urllib.parse import urlparse
from urllib.parse import unquote
except ImportError:
# Python 2
from urlparse import urlparse
from urllib import unquote


__version__ = '0.4.0'


def get_uri(env='DATABASE_URL'):
def get_uri(env_var='DATABASE_URL'):
"""Grab and parse the url from the environment."""
# trick python3's urlparse into raising an exception
return urlparse(os.environ.get(env, 1337))
parsed_result = urlparse(
# Trick python3's urlparse into raising when env var is missing
os.environ.get(env_var, 1337)
)
meta = {
'scheme': parsed_result.scheme,
'username': unquote(parsed_result.username or ''),
'password': unquote(parsed_result.password or ''),
'hostname': parsed_result.hostname,
'port': parsed_result.port,
'path': unquote(parsed_result.path or '/'),
}
return meta


def pg_connect_bits(meta):
"""Turn the url into connection bits."""
bits = []
if meta.username:
bits.extend(['-U', meta.username])
if meta.hostname:
bits.extend(['-h', meta.hostname])
if meta.port:
bits.extend(['-p', str(meta.port)])
if meta['username']:
bits.extend(['-U', meta['username']])
if meta['hostname']:
bits.extend(['-h', meta['hostname']])
if meta['port']:
bits.extend(['-p', str(meta['port'])])
return bits


def mysql_connect_bits(meta):
"""Turn the url into connection bits."""
bits = []
if meta.username:
bits.extend(['-u', meta.username])
if meta.password:
# password is one token
bits.append('-p{0}'.format(meta.password))
if meta.hostname:
bits.extend(['-h', meta.hostname])
if meta.port:
bits.extend(['-P', str(meta.port)])
if meta['username']:
bits.extend(['-u', meta['username']])
if meta['password']:
# `password` is one token for mysql (no whitespace)
bits.append('-p{0}'.format(meta['password']))
if meta['hostname']:
bits.extend(['-h', meta['hostname']])
if meta['port']:
bits.extend(['-P', str(meta['port'])])
return bits


Expand All @@ -65,7 +79,7 @@ def connect_bits(meta):
'postgresql': pg_connect_bits,
'postgis': pg_connect_bits,
}
scheme = getattr(meta, 'scheme', 'postgres') # default to postgres
scheme = meta.get('scheme', 'postgres') # Default to postgres
# TODO raise a better error than KeyError with an unsupported scheme
return bit_makers[scheme](meta)

Expand All @@ -85,32 +99,31 @@ def get_command(command, meta):
bits.append('--dbname')
if command == 'mysql':
bits.append('--database')
bits.append(meta.path[1:])
# outtahere
bits.append(meta['path'][1:])
return bits


def make_tokens_and_env(sys_argv):
"""Get the tokens or quit with help."""
if sys_argv[1].isupper():
environ_key = sys_argv[1]
env_var = sys_argv[1]
args = sys_argv[2:]
else:
environ_key = 'DATABASE_URL'
env_var = 'DATABASE_URL'
args = sys_argv[1:]

try:
meta = get_uri(environ_key)
meta = get_uri(env_var)
# if we need to switch logic based off scheme multiple places, may want
# to normalize it at this point
tokens = get_command(args[0], meta)
except AttributeError:
exit('Usage: phd COMMAND [additional-options]\n\n'
' ERROR: "{0}" is not set in the environment'.format(environ_key))
' ERROR: "{0}" is not set in the environment'.format(env_var))
env = os.environ.copy()
# password as environment variable, set it for non-postgres schemas anyways
if meta.password:
env['PGPASSWORD'] = meta.password
if meta['password']:
env['PGPASSWORD'] = meta['password']
# pass any other flags the user set along
tokens.extend(args[1:])
return tokens, env
Expand Down
106 changes: 68 additions & 38 deletions test_postdoc.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-

from __future__ import unicode_literals
import os
import unittest

Expand All @@ -14,86 +14,92 @@
exit('Re-run tests in an environment without DATABASE_URL')


class ConnectBitsTest(unittest.TestCase):
class ConnectBitsTests(unittest.TestCase):
def test_pg_connect_bits_trivial_case(self):
meta = type('mock', (object, ),
{'username': '', 'hostname': '', 'port': ''})
meta = {'username': '', 'hostname': '', 'port': ''}
result = postdoc.pg_connect_bits(meta)
self.assertEqual(result, [])

def test_pg_connect_bits_works(self):
meta = type('mock', (object, ),
{'scheme': 'postgres', 'username': '1', 'hostname': '2', 'port': 3})
meta = {'scheme': 'postgres', 'username': '1', 'hostname': '2', 'port': 3}
result = postdoc.pg_connect_bits(meta)
self.assertEqual(result, ['-U', '1', '-h', '2', '-p', '3'])
result = postdoc.connect_bits(meta)
self.assertEqual(result, ['-U', '1', '-h', '2', '-p', '3'])

def test_mysql_connect_bits_trivial_case(self):
meta = type('mock', (object, ),
{'username': '', 'password': '', 'hostname': '', 'port': ''})
meta = {'username': '', 'password': '', 'hostname': '', 'port': ''}
result = postdoc.mysql_connect_bits(meta)
self.assertEqual(result, [])

def test_mysql_connect_bits_works(self):
meta = type('mock', (object, ),
{'scheme': 'mysql', 'username': 'u', 'password': 'p',
'hostname': 'h', 'port': '3306'})
meta = {'scheme': 'mysql', 'username': 'u', 'password': 'p',
'hostname': 'h', 'port': '3306'}
result = postdoc.mysql_connect_bits(meta)
self.assertEqual(result, ['-u', 'u', '-pp', '-h', 'h', '-P', '3306'])
result = postdoc.connect_bits(meta)
self.assertEqual(result, ['-u', 'u', '-pp', '-h', 'h', '-P', '3306'])

def test_connect_bits_supported_schemas(self):
meta = type('mock', (object, ),
{'username': '', 'password': '', 'hostname': 'h', 'port': ''})
meta = {'username': '', 'password': '', 'hostname': 'h', 'port': ''}

# assert defaults to postgres
self.assertTrue(postdoc.connect_bits(meta))
meta.scheme = 'mysql'
meta['scheme'] = 'mysql'
self.assertTrue(postdoc.connect_bits(meta))
meta.scheme = 'postgres'
meta['scheme'] = 'postgres'
self.assertTrue(postdoc.connect_bits(meta))
meta.scheme = 'postgresql'
meta['scheme'] = 'postgresql'
self.assertTrue(postdoc.connect_bits(meta))
meta.scheme = 'postgis'
meta['scheme'] = 'postgis'
self.assertTrue(postdoc.connect_bits(meta))
meta.scheme = 'foo'
meta['scheme'] = 'foo'
self.assertRaises(KeyError, postdoc.connect_bits, meta)


class PHDTest(unittest.TestCase):
class PHDTests(unittest.TestCase):
def test_get_uri(self):
with mock.patch('postdoc.os') as mock_os:
mock_os.environ = {
'DATABASE_URL': 'foo',
'FATTYBASE_URL': 'bar',
}
self.assertEqual(postdoc.get_uri().path, 'foo')
self.assertEqual(postdoc.get_uri('FATTYBASE_URL').path, 'bar')
self.assertEqual(postdoc.get_uri()['path'], 'foo')
self.assertEqual(postdoc.get_uri('FATTYBASE_URL')['path'], 'bar')

def test_get_uri_decodes_urlencoded(self):
with mock.patch('postdoc.os') as mock_os:
mock_os.environ = {
'DATABASE_URL': 'mysql://user%3F:%21mysecure%3Apassword%[email protected]:3307/foo',
}
self.assertEqual(postdoc.get_uri(), {
'scheme': 'mysql',
'username': 'user?',
'password': '!mysecure:password#',
'hostname': '127.0.0.1',
'port': 3307,
'path': '/foo',
})

def test_get_command_assembles_bits_in_right_order(self):
meta = type('mock', (object, ),
{'username': '', 'hostname': '', 'port': '', 'password': '',
'path': '/database'})
meta = {'username': '', 'hostname': '', 'port': '', 'password': '',
'path': '/database'}
with mock.patch('postdoc.pg_connect_bits') as mock_bits:
mock_bits.return_value = ['lol']
self.assertEqual(postdoc.get_command('foo', meta),
['foo', 'lol', 'database'])

def test_get_command_ignores_password(self):
meta = type('mock', (object, ),
{'username': '', 'hostname': '', 'port': '', 'password': 'oops',
'path': '/database'})
meta = {'username': '', 'hostname': '', 'port': '', 'password': 'oops',
'path': '/database'}
with mock.patch('postdoc.pg_connect_bits') as mock_bits:
mock_bits.return_value = ['rofl']
self.assertEqual(postdoc.get_command('bar', meta),
['bar', 'rofl', 'database'])

def test_get_commands_can_ignore_database_name(self):
meta = type('mock', (object, ),
{'scheme': 'mysql', 'username': 'u', 'hostname': 'h', 'port': '',
'password': 'oops', 'path': '/database'})
meta = {'scheme': 'mysql', 'username': 'u', 'hostname': 'h', 'port': '',
'password': 'oops', 'path': '/database'}
result = postdoc.get_command('mysqladmin', meta)
# assert database name is not an argument
self.assertNotIn('database', result)
Expand All @@ -102,23 +108,48 @@ def test_get_commands_can_ignore_database_name(self):
['mysqladmin', '-u', 'u', '-poops', '-h', 'h'])

def test_get_command_special_syntax_for_pg_restore(self):
meta = type('mock', (object, ),
{'username': '', 'hostname': '', 'port': '', 'password': 'oops',
'path': '/database'})
meta = {'username': '', 'hostname': '', 'port': '', 'password': 'oops',
'path': '/database'}
with mock.patch('postdoc.pg_connect_bits') as mock_bits:
mock_bits.return_value = ['rofl']
self.assertEqual(postdoc.get_command('pg_restore', meta),
['pg_restore', 'rofl', '--dbname', 'database'])

def test_get_command_special_syntax_for_mysql(self):
meta = type('mock', (object, ),
{'scheme': 'mysql', 'username': '', 'hostname': '', 'port': '',
'password': 'oops', 'path': '/database'})
meta = {'scheme': 'mysql', 'username': '', 'hostname': '', 'port': '',
'password': 'oops', 'path': '/database'}
with mock.patch('postdoc.connect_bits') as mock_bits:
mock_bits.return_value = ['rofl']
self.assertEqual(postdoc.get_command('mysql', meta),
['mysql', 'rofl', '--database', 'database'])

def test_make_tokens_and_env_happy_case(self):
mock_os = mock.MagicMock(environ={
'DATABASE_URL': 'mysql://u:p@h:3306/test',
})

with mock.patch.multiple(postdoc, os=mock_os):
tokens, env = postdoc.make_tokens_and_env(
['argv1', 'mysql', 'extra_arg'])
self.assertEqual(
tokens,
['mysql', '-u', 'u', '-pp', '-h', 'h', '-P', '3306', '--database', 'test', 'extra_arg']
)

@unittest.skip('TODO')
def test_make_tokens_and_env_handles_urlencoded_password(self):
mock_os = mock.MagicMock(environ={
'DATABASE_URL': 'mysql://u:%21mysecure%3Apassword%23@h/test',
})

with mock.patch.multiple(postdoc, os=mock_os):
tokens, env = postdoc.make_tokens_and_env(
['argv1', 'mysql', 'extra_arg'])
self.assertEqual(
tokens,
['mysql', '-u', 'u', '-p!mysecure:password#', '-h', 'h', '--database', 'test', 'extra_arg']
)

def test_make_tokens_and_env_exits_with_bad_command(self):
with self.assertRaises(SystemExit):
postdoc.make_tokens_and_env(['phd', 'fun'])
Expand Down Expand Up @@ -213,8 +244,7 @@ def test_main_command_debug_can_be_quiet(self):

def test_main_passes_password_in_env(self):
my_password = 'hunter2'
meta = type('mock', (object, ),
{'password': my_password})
meta = {'password': my_password}
mock_subprocess = mock.MagicMock()
mock_get_command = mock.MagicMock(return_value=['get_command'])
mock_get_uri = mock.MagicMock(return_value=meta)
Expand Down

0 comments on commit f6ad1e9

Please sign in to comment.