Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EmailSettings password encryption improvement #35403

Merged
merged 15 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions corehq/apps/email/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from django.db import models

from corehq.motech.const import PASSWORD_PLACEHOLDER, ALGO_AES
from corehq.motech.utils import b64_aes_decrypt, b64_aes_encrypt
from corehq.motech.const import PASSWORD_PLACEHOLDER, ALGO_AES, ALGO_AES_CBC
from corehq.motech.utils import (
b64_aes_cbc_decrypt,
b64_aes_cbc_encrypt,
b64_aes_decrypt,
)


class EmailSettings(models.Model):
Expand All @@ -24,13 +28,16 @@ def __str__(self):

@property
def plaintext_password(self):
if self.password.startswith(f'${ALGO_AES}$'):
if self.password.startswith(f'${ALGO_AES_CBC}$'):
ciphertext = self.password.split('$', 2)[2]
return b64_aes_cbc_decrypt(ciphertext)
if self.password.startswith(f'${ALGO_AES}$'): # This will be deleted after migration to cbc is done
ciphertext = self.password.split('$', 2)[2]
return b64_aes_decrypt(ciphertext)
return self.password

@plaintext_password.setter
def plaintext_password(self, plaintext):
if plaintext != PASSWORD_PLACEHOLDER:
ciphertext = b64_aes_encrypt(plaintext)
self.password = f'${ALGO_AES}${ciphertext}'
ciphertext = b64_aes_cbc_encrypt(plaintext)
self.password = f'${ALGO_AES_CBC}${ciphertext}'
2 changes: 2 additions & 0 deletions corehq/motech/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class OAuth2ApiSettings:
REQUEST_TIMEOUT = (CONNECT_TIMEOUT, READ_TIMEOUT)

ALGO_AES = 'aes'
ALGO_AES_CBC = 'aes-cbc'


IMPORT_FREQUENCY_DAILY = 'daily'
IMPORT_FREQUENCY_WEEKLY = 'weekly'
Expand Down
48 changes: 48 additions & 0 deletions corehq/motech/tests/test_reencryption_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from django.test import SimpleTestCase

from corehq.apps.email.models import EmailSettings
from corehq.motech.const import ALGO_AES
from corehq.motech.utils import b64_aes_encrypt


from corehq.motech.utils import reencrypt_ecb_to_cbc_mode


class TestReencryptionMigration(SimpleTestCase):
def setUp(self):
self.email_settings = EmailSettings(
domain='example.com',
username='testuser',
)

def plaintext_to_ecb_password(self, plaintext, prefix=True):
ciphertext = b64_aes_encrypt(plaintext)
if prefix:
return f'${ALGO_AES}${ciphertext}'
else:
return ciphertext

def test_reencrypt_ecb_to_cbc_mode_match_plaintext_with_prefix(self):
plaintext_password = 'testpassword'
self.email_settings.password = self.plaintext_to_ecb_password(plaintext_password, True)
reencrypted_password = reencrypt_ecb_to_cbc_mode(self.email_settings.password, f'${ALGO_AES}$')

self.email_settings.password = reencrypted_password

self.assertEqual(plaintext_password, self.email_settings.plaintext_password)

def test_reencrypt_ecb_to_cbc_mode_match_plaintext_without_prefix(self):
plaintext_password = 'testpassword'
self.email_settings.password = self.plaintext_to_ecb_password(plaintext_password, False)
reencrypted_password = reencrypt_ecb_to_cbc_mode(self.email_settings.password, f'${ALGO_AES}$')

self.email_settings.password = reencrypted_password

self.assertEqual(plaintext_password, self.email_settings.plaintext_password)

def test_empty_password_reencrypt_ecb_to_cbc_mode_match_plaintext(self):
plaintext_password = ''
self.email_settings.password = plaintext_password
reencrypted_password = reencrypt_ecb_to_cbc_mode(self.email_settings.password, f'${ALGO_AES}$')
self.email_settings.password = reencrypted_password
self.assertEqual(plaintext_password, self.email_settings.plaintext_password)
24 changes: 21 additions & 3 deletions corehq/motech/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
AES_KEY_MAX_LEN,
b64_aes_decrypt,
b64_aes_encrypt,
b64_aes_cbc_encrypt,
b64_aes_cbc_decrypt,
get_endpoint_url,
pformat_json,
simple_pad,
Expand Down Expand Up @@ -192,7 +194,7 @@ def test_none(self):

class EncryptionTests(SimpleTestCase):

def assert_message_equals_plaintext(self, message):
def assert_message_equals_plaintext_using_ecb(self, message):
assert isinstance(message, str)
ciphertext = b64_aes_encrypt(message)
plaintext = b64_aes_decrypt(ciphertext)
Expand All @@ -202,11 +204,27 @@ def assert_message_equals_plaintext(self, message):

def test_encrypt_decrypt_ascii(self):
message = 'Around you is a forest.'
self.assert_message_equals_plaintext(message)
self.assert_message_equals_plaintext_using_ecb(message)

def test_encrypt_decrypt_utf8(self):
message = 'आपके आसपास एक जंगल है'
self.assert_message_equals_plaintext(message)
self.assert_message_equals_plaintext_using_ecb(message)

def assert_message_equals_plaintext_using_cbc(self, message):
assert isinstance(message, str)
ciphertext = b64_aes_cbc_encrypt(message)
plaintext = b64_aes_cbc_decrypt(ciphertext)
self.assertEqual(plaintext, message)
self.assertIsInstance(ciphertext, str)
self.assertIsInstance(plaintext, str)

def test_encrypt_decrypt_cbc_ascii(self):
message = 'Around you is a forest.'
self.assert_message_equals_plaintext_using_cbc(message)

def test_encrypt_decrypt_cbc_utf8(self):
message = 'आपके आसपास एक जंगल है'
self.assert_message_equals_plaintext_using_cbc(message)


class GetEndpointUrlTests(SimpleTestCase):
Expand Down
87 changes: 86 additions & 1 deletion corehq/motech/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
from Crypto.Util.Padding import pad as crypto_pad
from Crypto.Util.Padding import unpad as crypto_unpad
from Crypto.Util.py3compat import bord
from Crypto.Random import get_random_bytes

from corehq.motech.const import AUTH_PRESETS, OAUTH2_PWD
from corehq.motech.const import AUTH_PRESETS, OAUTH2_PWD, ALGO_AES_CBC, ALGO_AES

AES_BLOCK_SIZE = 16
AES_KEY_MAX_LEN = 32 # AES key must be either 16, 24, or 32 bytes long
Expand Down Expand Up @@ -81,6 +82,90 @@ def b64_aes_decrypt(message):
return plaintext_bytes.decode('utf8')


def b64_aes_cbc_encrypt(message):
"""
AES-encrypt and base64-encode `message` using CBC mode.

Uses Django SECRET_KEY as AES key and generates a random IV.
"""
if isinstance(settings.SECRET_KEY, bytes):
secret_key_bytes = settings.SECRET_KEY
else:
secret_key_bytes = settings.SECRET_KEY.encode('ascii')
aes_key = simple_pad(secret_key_bytes, AES_BLOCK_SIZE)[:AES_KEY_MAX_LEN]
# We never need to unpad the key, so simple_pad() is fine (and
# allows us to decrypt old values).
iv = get_random_bytes(AES_BLOCK_SIZE)
aes = AES.new(aes_key, AES.MODE_CBC, iv)

message_bytes = message if isinstance(message, bytes) else message.encode('utf8')
plaintext_bytes = crypto_pad(message_bytes, AES_BLOCK_SIZE, style='iso7816')
ciphertext_bytes = aes.encrypt(plaintext_bytes)

b64ciphertext_bytes = b64encode(iv + ciphertext_bytes)
return b64ciphertext_bytes.decode('ascii')


def b64_aes_cbc_decrypt(message):
"""
Base64-decode and AES-decrypt ASCII `message` using CBC mode.

Uses Django SECRET_KEY as AES key.

>>> settings.SECRET_KEY = 'xyzzy'
>>> b64_aes_cbc_decrypt('6WbQuezOKqp4AMOCoUOndVnAUDL13e0fl3cpxcgHX/AlcPwN4+poaetdjwgikz0F')
'Around you is a forest.'
"""
if isinstance(settings.SECRET_KEY, bytes):
secret_key_bytes = settings.SECRET_KEY
else:
secret_key_bytes = settings.SECRET_KEY.encode('ascii')
aes_key = simple_pad(secret_key_bytes, AES_BLOCK_SIZE)[:AES_KEY_MAX_LEN]

decoded_bytes = b64decode(message)
iv = decoded_bytes[:AES_BLOCK_SIZE]
ciphertext_bytes = decoded_bytes[AES_BLOCK_SIZE:]

aes = AES.new(aes_key, AES.MODE_CBC, iv)
padded_plaintext_bytes = aes.decrypt(ciphertext_bytes)
plaintext_bytes = unpad(padded_plaintext_bytes)
return plaintext_bytes.decode('utf8')


# Only needed for migration from ECB to CBC mode.
def reencrypt_ecb_to_cbc_mode(encrypted_text, existing_prefix=None):
"""
Re-encrypt a message that was encrypted using ECB mode to CBC mode.
"""
if not encrypted_text:
return encrypted_text

if existing_prefix and encrypted_text.startswith(existing_prefix):
ciphertext = encrypted_text[len(existing_prefix):]
else:
ciphertext = encrypted_text

new_ciphertext = b64_aes_cbc_encrypt(b64_aes_decrypt(ciphertext))
return f'${ALGO_AES_CBC}${new_ciphertext}'


# Only needed for migration revert from CBC to ECB mode.
def reencrypt_cbc_to_ecb_mode(encrypted_text, existing_prefix=None):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is only referenced in the migration, what do you think of moving it into the migration file? (And removing it from this PR since the migration has been removed.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm open to that, but this same function will be needed for migration of other models (in future PRs). Does it make sense to copy this same function into each migration file?

Copy link
Contributor

@millerdev millerdev Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so. Migration files should avoid importing things from other modules as much as possible because they should be frozen-in-time rather than depending on things that may be refactored and change in other ways over time. Granted, this function is probably unlikely to change over time, but I still think it's safer to include it in each migration file that needs it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, how would you advise i unit test the function in that case? I don't think i can import functions defined within a migration file

"""
Re-encrypt a message that was encrypted using CBC mode to ECB mode.
"""
if not encrypted_text:
return encrypted_text

if existing_prefix and encrypted_text.startswith(existing_prefix):
ciphertext = encrypted_text[len(existing_prefix):]
else:
ciphertext = encrypted_text

new_ciphertext = b64_aes_encrypt(b64_aes_cbc_decrypt(ciphertext))
return f'${ALGO_AES}${new_ciphertext}'


def unpad(bytestring):
"""
Unpad will remove padding from the right of a string that has been
Expand Down
Loading