Skip to content

Commit

Permalink
Merge pull request #35403 from dimagi/jt+jc/encryption_improvement
Browse files Browse the repository at this point in the history
EmailSettings password encryption improvement
  • Loading branch information
Jtang-1 authored Dec 9, 2024
2 parents 6df8883 + b260ec9 commit 8a72efa
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 9 deletions.
17 changes: 12 additions & 5 deletions corehq/apps/email/models.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
from django.db import models

from corehq.motech.const import PASSWORD_PLACEHOLDER, ALGO_AES
from corehq.motech.utils import b64_aes_decrypt, b64_aes_encrypt
from corehq.motech.const import PASSWORD_PLACEHOLDER, ALGO_AES, ALGO_AES_CBC
from corehq.motech.utils import (
b64_aes_cbc_decrypt,
b64_aes_cbc_encrypt,
b64_aes_decrypt,
)


class EmailSettings(models.Model):
Expand All @@ -24,13 +28,16 @@ def __str__(self):

@property
def plaintext_password(self):
if self.password.startswith(f'${ALGO_AES}$'):
if self.password.startswith(f'${ALGO_AES_CBC}$'):
ciphertext = self.password.split('$', 2)[2]
return b64_aes_cbc_decrypt(ciphertext)
if self.password.startswith(f'${ALGO_AES}$'): # This will be deleted after migration to cbc is done
ciphertext = self.password.split('$', 2)[2]
return b64_aes_decrypt(ciphertext)
return self.password

@plaintext_password.setter
def plaintext_password(self, plaintext):
if plaintext != PASSWORD_PLACEHOLDER:
ciphertext = b64_aes_encrypt(plaintext)
self.password = f'${ALGO_AES}${ciphertext}'
ciphertext = b64_aes_cbc_encrypt(plaintext)
self.password = f'${ALGO_AES_CBC}${ciphertext}'
2 changes: 2 additions & 0 deletions corehq/motech/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ class OAuth2ApiSettings:
REQUEST_TIMEOUT = (CONNECT_TIMEOUT, READ_TIMEOUT)

ALGO_AES = 'aes'
ALGO_AES_CBC = 'aes-cbc'


IMPORT_FREQUENCY_DAILY = 'daily'
IMPORT_FREQUENCY_WEEKLY = 'weekly'
Expand Down
48 changes: 48 additions & 0 deletions corehq/motech/tests/test_reencryption_migration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
from django.test import SimpleTestCase

from corehq.apps.email.models import EmailSettings
from corehq.motech.const import ALGO_AES
from corehq.motech.utils import b64_aes_encrypt


from corehq.motech.utils import reencrypt_ecb_to_cbc_mode


class TestReencryptionMigration(SimpleTestCase):
def setUp(self):
self.email_settings = EmailSettings(
domain='example.com',
username='testuser',
)

def plaintext_to_ecb_password(self, plaintext, prefix=True):
ciphertext = b64_aes_encrypt(plaintext)
if prefix:
return f'${ALGO_AES}${ciphertext}'
else:
return ciphertext

def test_reencrypt_ecb_to_cbc_mode_match_plaintext_with_prefix(self):
plaintext_password = 'testpassword'
self.email_settings.password = self.plaintext_to_ecb_password(plaintext_password, True)
reencrypted_password = reencrypt_ecb_to_cbc_mode(self.email_settings.password, f'${ALGO_AES}$')

self.email_settings.password = reencrypted_password

self.assertEqual(plaintext_password, self.email_settings.plaintext_password)

def test_reencrypt_ecb_to_cbc_mode_match_plaintext_without_prefix(self):
plaintext_password = 'testpassword'
self.email_settings.password = self.plaintext_to_ecb_password(plaintext_password, False)
reencrypted_password = reencrypt_ecb_to_cbc_mode(self.email_settings.password, f'${ALGO_AES}$')

self.email_settings.password = reencrypted_password

self.assertEqual(plaintext_password, self.email_settings.plaintext_password)

def test_empty_password_reencrypt_ecb_to_cbc_mode_match_plaintext(self):
plaintext_password = ''
self.email_settings.password = plaintext_password
reencrypted_password = reencrypt_ecb_to_cbc_mode(self.email_settings.password, f'${ALGO_AES}$')
self.email_settings.password = reencrypted_password
self.assertEqual(plaintext_password, self.email_settings.plaintext_password)
24 changes: 21 additions & 3 deletions corehq/motech/tests/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
AES_KEY_MAX_LEN,
b64_aes_decrypt,
b64_aes_encrypt,
b64_aes_cbc_encrypt,
b64_aes_cbc_decrypt,
get_endpoint_url,
pformat_json,
simple_pad,
Expand Down Expand Up @@ -192,7 +194,7 @@ def test_none(self):

class EncryptionTests(SimpleTestCase):

def assert_message_equals_plaintext(self, message):
def assert_message_equals_plaintext_using_ecb(self, message):
assert isinstance(message, str)
ciphertext = b64_aes_encrypt(message)
plaintext = b64_aes_decrypt(ciphertext)
Expand All @@ -202,11 +204,27 @@ def assert_message_equals_plaintext(self, message):

def test_encrypt_decrypt_ascii(self):
message = 'Around you is a forest.'
self.assert_message_equals_plaintext(message)
self.assert_message_equals_plaintext_using_ecb(message)

def test_encrypt_decrypt_utf8(self):
message = 'आपके आसपास एक जंगल है'
self.assert_message_equals_plaintext(message)
self.assert_message_equals_plaintext_using_ecb(message)

def assert_message_equals_plaintext_using_cbc(self, message):
assert isinstance(message, str)
ciphertext = b64_aes_cbc_encrypt(message)
plaintext = b64_aes_cbc_decrypt(ciphertext)
self.assertEqual(plaintext, message)
self.assertIsInstance(ciphertext, str)
self.assertIsInstance(plaintext, str)

def test_encrypt_decrypt_cbc_ascii(self):
message = 'Around you is a forest.'
self.assert_message_equals_plaintext_using_cbc(message)

def test_encrypt_decrypt_cbc_utf8(self):
message = 'आपके आसपास एक जंगल है'
self.assert_message_equals_plaintext_using_cbc(message)


class GetEndpointUrlTests(SimpleTestCase):
Expand Down
87 changes: 86 additions & 1 deletion corehq/motech/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
from Crypto.Util.Padding import pad as crypto_pad
from Crypto.Util.Padding import unpad as crypto_unpad
from Crypto.Util.py3compat import bord
from Crypto.Random import get_random_bytes

from corehq.motech.const import AUTH_PRESETS, OAUTH2_PWD
from corehq.motech.const import AUTH_PRESETS, OAUTH2_PWD, ALGO_AES_CBC, ALGO_AES

AES_BLOCK_SIZE = 16
AES_KEY_MAX_LEN = 32 # AES key must be either 16, 24, or 32 bytes long
Expand Down Expand Up @@ -81,6 +82,90 @@ def b64_aes_decrypt(message):
return plaintext_bytes.decode('utf8')


def b64_aes_cbc_encrypt(message):
"""
AES-encrypt and base64-encode `message` using CBC mode.
Uses Django SECRET_KEY as AES key and generates a random IV.
"""
if isinstance(settings.SECRET_KEY, bytes):
secret_key_bytes = settings.SECRET_KEY
else:
secret_key_bytes = settings.SECRET_KEY.encode('ascii')
aes_key = simple_pad(secret_key_bytes, AES_BLOCK_SIZE)[:AES_KEY_MAX_LEN]
# We never need to unpad the key, so simple_pad() is fine (and
# allows us to decrypt old values).
iv = get_random_bytes(AES_BLOCK_SIZE)
aes = AES.new(aes_key, AES.MODE_CBC, iv)

message_bytes = message if isinstance(message, bytes) else message.encode('utf8')
plaintext_bytes = crypto_pad(message_bytes, AES_BLOCK_SIZE, style='iso7816')
ciphertext_bytes = aes.encrypt(plaintext_bytes)

b64ciphertext_bytes = b64encode(iv + ciphertext_bytes)
return b64ciphertext_bytes.decode('ascii')


def b64_aes_cbc_decrypt(message):
"""
Base64-decode and AES-decrypt ASCII `message` using CBC mode.
Uses Django SECRET_KEY as AES key.
>>> settings.SECRET_KEY = 'xyzzy'
>>> b64_aes_cbc_decrypt('6WbQuezOKqp4AMOCoUOndVnAUDL13e0fl3cpxcgHX/AlcPwN4+poaetdjwgikz0F')
'Around you is a forest.'
"""
if isinstance(settings.SECRET_KEY, bytes):
secret_key_bytes = settings.SECRET_KEY
else:
secret_key_bytes = settings.SECRET_KEY.encode('ascii')
aes_key = simple_pad(secret_key_bytes, AES_BLOCK_SIZE)[:AES_KEY_MAX_LEN]

decoded_bytes = b64decode(message)
iv = decoded_bytes[:AES_BLOCK_SIZE]
ciphertext_bytes = decoded_bytes[AES_BLOCK_SIZE:]

aes = AES.new(aes_key, AES.MODE_CBC, iv)
padded_plaintext_bytes = aes.decrypt(ciphertext_bytes)
plaintext_bytes = unpad(padded_plaintext_bytes)
return plaintext_bytes.decode('utf8')


# Only needed for migration from ECB to CBC mode.
def reencrypt_ecb_to_cbc_mode(encrypted_text, existing_prefix=None):
"""
Re-encrypt a message that was encrypted using ECB mode to CBC mode.
"""
if not encrypted_text:
return encrypted_text

if existing_prefix and encrypted_text.startswith(existing_prefix):
ciphertext = encrypted_text[len(existing_prefix):]
else:
ciphertext = encrypted_text

new_ciphertext = b64_aes_cbc_encrypt(b64_aes_decrypt(ciphertext))
return f'${ALGO_AES_CBC}${new_ciphertext}'


# Only needed for migration revert from CBC to ECB mode.
def reencrypt_cbc_to_ecb_mode(encrypted_text, existing_prefix=None):
"""
Re-encrypt a message that was encrypted using CBC mode to ECB mode.
"""
if not encrypted_text:
return encrypted_text

if existing_prefix and encrypted_text.startswith(existing_prefix):
ciphertext = encrypted_text[len(existing_prefix):]
else:
ciphertext = encrypted_text

new_ciphertext = b64_aes_encrypt(b64_aes_cbc_decrypt(ciphertext))
return f'${ALGO_AES}${new_ciphertext}'


def unpad(bytestring):
"""
Unpad will remove padding from the right of a string that has been
Expand Down

0 comments on commit 8a72efa

Please sign in to comment.