diff --git a/.travis.yml b/.travis.yml index e10a9c0..065f0be 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,6 +11,7 @@ python: - 3.8 install: + - python -m pip install -U pip - pip install tox tox-travis script: diff --git a/docs/index.html b/docs/index.html index 5f045d3..3eb6fd4 100644 Binary files a/docs/index.html and b/docs/index.html differ diff --git a/docs/index.md b/docs/index.md index 908594e..a1ed476 100644 --- a/docs/index.md +++ b/docs/index.md @@ -487,6 +487,30 @@ Request example: name: "Serpuhov" } +#### Crypto Fields +CryptoBinaryField and CryptoCharField are a django-rest-framework fields for handling encryption through serialisation. Inputs are String object and internal +python representation is Binary object for CryptoBinaryField and String object for CryptoCharField + +It takes the optional parameter `salt` (Django SECRET_KEY imported from setting as default). If set it use custom cryptographic salt + +It takes the optional parameter `password` ("Non_nobis1solum?nati!sumus" as default). If set it use a custom password in encryption. **It is highly recommended to use custom one!!** + +It takes the optional parameter `ttl` (None as default). If set it manage the number of seconds old a message may be for it to be valid. If the message is older than ttl seconds (from the time it was originally created) field will return None and encrypted message will not be enabled for decryption. + + from rest_framework_extensions.fields import CryptoBinaryField + + class CryptoSerializer(serializers.Serializer): + crypto_char = CryptoCharField() + +Example with parameters + + from rest_framework import serializers + from drf_extra_fields.crypto_fields import CryptoCharField + + + class CryptoSerializer(serializers.Serializer): + crypto_char = CryptoCharField(salt="custom salt", password="custom password", ttl=1000) + ### Permissions @@ -784,7 +808,7 @@ Of course you can use custom [key constructor](#key-constructor): object_cache_key_func = CustomObjectKeyConstructor() list_cache_key_func = CustomListKeyConstructor() -*New in DRF-extensions development* +*New in DRF-extensions development version* You can change cache timeout by providing `object_cache_timeout` or `list_cache_timeout` properties in view class: @@ -2212,6 +2236,8 @@ If you need to access the values of DRF-extensions API settings in your project, You can read about versioning, deprecation policy and upgrading from [Django REST framework documentation](https://www.django-rest-framework.org/community/release-notes/). +#### Development version +* Added fields [CryptoBinaryField and CryptoCharField](#crypto-fields) #### 0.7.0 diff --git a/rest_framework_extensions/__init__.py b/rest_framework_extensions/__init__.py index d7b7697..5909626 100644 --- a/rest_framework_extensions/__init__.py +++ b/rest_framework_extensions/__init__.py @@ -1,3 +1,3 @@ -__version__ = '0.7.0' # from 0.7.0 +__version__ = '0.7.1' # from 0.7.0 VERSION = __version__ diff --git a/rest_framework_extensions/fields.py b/rest_framework_extensions/fields.py index dd9f069..c94c60f 100644 --- a/rest_framework_extensions/fields.py +++ b/rest_framework_extensions/fields.py @@ -1,4 +1,16 @@ from rest_framework.relations import HyperlinkedRelatedField +import base64 + +from cryptography.fernet import Fernet, InvalidToken +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC +from django.conf import settings +from django.utils.translation import gettext_lazy as _ +from rest_framework import serializers + +EMPTY_VALUES = (None, "", [], (), {}) +DEFAULT_PASSWORD = b"Non_nobis1solum?nati!sumus" +DEFAULT_SALT = settings.SECRET_KEY class ResourceUriField(HyperlinkedRelatedField): @@ -20,9 +32,121 @@ class Meta: "resource_uri": "http://localhost/v1/surveys/1/", } """ + # todo: test me read_only = True def __init__(self, *args, **kwargs): - kwargs.setdefault('source', '*') + kwargs.setdefault("source", "*") super().__init__(*args, **kwargs) + + +def _generate_password_key(salt=DEFAULT_SALT, password=DEFAULT_PASSWORD): + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=_to_bytes(salt), + iterations=100000, + ) + + key = base64.urlsafe_b64encode(kdf.derive(_to_bytes(password))) + return key + + +def _to_bytes(v): + if isinstance(v, str): + return v.encode("utf-8") + + if isinstance(v, bytes): + return v + + raise TypeError( + _( + "SALT & PASSWORD must be specified as strings that convert nicely to " + "bytes." + ) + ) + + +def _encrypt(token, value_in_str): + b_message = value_in_str.encode("utf-8") + encrypted_message = token.encrypt(b_message) + return encrypted_message + + +def _decrypt(token, value, ttl=None): + ttl = int(ttl) if ttl else None + decrypted_message = token.decrypt(_to_bytes(value), ttl) + return decrypted_message.decode("utf-8") + + +class CryptoBinaryField(serializers.Field): + """ + A django-rest-framework field for handling encryption through serialisation, where input are string + and internal python representation is Binary object. + + """ + + type_name = "CryptoBinaryField" + type_label = "crypto" + + default_error_messages = { + "invalid": _("Input a valid data"), + } + + def __init__(self, *args, **kwargs): + self.salt = kwargs.pop("salt", DEFAULT_SALT) + self.password = kwargs.pop("password", DEFAULT_PASSWORD) + self.ttl = kwargs.pop("ttl", None) + super(CryptoBinaryField, self).__init__(*args, **kwargs) + + def to_internal_value(self, value): + """ + Parse json data and return a point object + """ + if value in EMPTY_VALUES and not self.required: + return None + + if isinstance(value, str): + key = _generate_password_key(self.salt, self.password) + token = Fernet(key) + encrypted_message = _encrypt(token, value) + return encrypted_message + + self.fail("invalid") + + def to_representation(self, value): + """ + Transform POINT object to json. + """ + if value is None: + return value + if isinstance(value, str): + value = value.encode("utf-8") + elif isinstance(value, (bytearray, memoryview)): + value = bytes(value) + if isinstance(value, bytes): + key = _generate_password_key(self.salt, self.password) + token = Fernet(key) + try: + decrypted_message = _decrypt(token, value, self.ttl) + return decrypted_message + except InvalidToken: + return None + + self.fail("invalid") + + +class CryptoCharField(CryptoBinaryField): + """ + A django-rest-framework field for handling encryption through serialisation, where input are string + and internal python representation is String object. + """ + + type_name = "CryptoCharField" + + def to_internal_value(self, value): + value = super(CryptoCharField, self).to_internal_value(value) + if value: + return value.decode("utf-8") + self.fail("invalid") diff --git a/tests_app/requirements.txt b/tests_app/requirements.txt index 14ccb1d..77720e0 100644 --- a/tests_app/requirements.txt +++ b/tests_app/requirements.txt @@ -2,4 +2,5 @@ nose django-nose django-filter>=2.1.0 mock -ipdb \ No newline at end of file +ipdb +cryptography==3.2.1 \ No newline at end of file diff --git a/tests_app/tests/unit/fields/__init__.py b/tests_app/tests/unit/fields/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests_app/tests/unit/fields/tests.py b/tests_app/tests/unit/fields/tests.py new file mode 100644 index 0000000..27b6ce6 --- /dev/null +++ b/tests_app/tests/unit/fields/tests.py @@ -0,0 +1,166 @@ +import time + +from rest_framework import serializers +from rest_framework_extensions.fields import ( + CryptoBinaryField, + CryptoCharField, +) +import datetime +from django.test import TestCase +from django.conf import settings + +import base64 + +from cryptography.fernet import Fernet +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC + +DEFAULT_PASSWORD = b"Non_nobis1solum?nati!sumus" +DEFAULT_SALT = settings.SECRET_KEY + + +def _generate_password_key(salt=DEFAULT_SALT, password=DEFAULT_PASSWORD): + kdf = PBKDF2HMAC( + algorithm=hashes.SHA256(), + length=32, + salt=_to_bytes(salt), + iterations=100000, + ) + + key = base64.urlsafe_b64encode(kdf.derive(_to_bytes(password))) + return key + + +def _to_bytes(v): + if isinstance(v, str): + return v.encode("utf-8") + + if isinstance(v, bytes): + return v + + + +def _encrypt(token, value_in_str): + b_message = value_in_str.encode("utf-8") + encrypted_message = token.encrypt(b_message) + return encrypted_message + + +def _decrypt(token, value, ttl=None): + ttl = int(ttl) if ttl else None + decrypted_message = token.decrypt(_to_bytes(value), ttl) + return decrypted_message.decode("utf-8") + + +class SaveCrypto(object): + def __init__(self, message=None, created=None): + self.message = message + self.created = created or datetime.datetime.now() + + +class CryptoSerializer(serializers.Serializer): + message = CryptoBinaryField(required=False) + created = serializers.DateTimeField() + + def update(self, instance, validated_data): + instance.message = validated_data["message"] + return instance + + def create(self, validated_data): + return SaveCrypto(**validated_data) + + +class CryptoCharSerializer(serializers.Serializer): + message = CryptoCharField(required=False) + created = serializers.DateTimeField() + + +class SaltCryptoSerializerSerializer(CryptoSerializer): + message = CryptoBinaryField(salt="Salt") + created = serializers.DateTimeField() + + +class PasswordCryptoSerializerSerializer(CryptoSerializer): + message = CryptoBinaryField(password="Password") + created = serializers.DateTimeField() + + +class TtlCryptoSerializerSerializer(CryptoSerializer): + message = CryptoBinaryField(ttl=1) + created = serializers.DateTimeField() + + +class TestPointSerializer(TestCase): + def test_create(self): + """ + Test for creating CryptoBinaryField + """ + now = datetime.datetime.now() + message = "test message" + serializer = CryptoSerializer(data={"created": now, "message": message}) + model_data = SaveCrypto(message=message, created=now) + + self.assertTrue(serializer.is_valid()) + self.assertEqual(serializer.validated_data["created"], model_data.created) + self.assertFalse(serializer.validated_data is model_data) + self.assertIs(type(serializer.validated_data["message"]), bytes) + + def test_create_char(self): + """ + Test for creating CryptoCharField + """ + now = datetime.datetime.now() + message = "test message" + serializer = CryptoCharSerializer(data={"created": now, "message": message}) + model_data = SaveCrypto(message=message, created=now) + + self.assertTrue(serializer.is_valid()) + self.assertEqual(serializer.validated_data["created"], model_data.created) + self.assertFalse(serializer.validated_data is model_data) + self.assertIs(type(serializer.validated_data["message"]), str) + + def test_serialization(self): + """ + Regular JSON serialization should output float values + """ + now = datetime.datetime.now() + message = "test message" + key = _generate_password_key(DEFAULT_SALT, DEFAULT_PASSWORD) + token = Fernet(key) + encrypted_message = _encrypt(token, message) + model_data = SaveCrypto(message=encrypted_message, created=now) + serializer = CryptoSerializer(model_data) + self.assertEqual(serializer.data["message"], message) + + def test_serialization_salt(self): + now = datetime.datetime.now() + message = "test message" + key = _generate_password_key("Salt", DEFAULT_PASSWORD) + token = Fernet(key) + encrypted_message = _encrypt(token, message) + model_data = SaveCrypto(message=encrypted_message, created=now) + serializer = SaltCryptoSerializerSerializer(model_data) + time.sleep(3) + self.assertEqual(serializer.data["message"], message) + + def test_serialization_password(self): + now = datetime.datetime.now() + message = "test message" + key = _generate_password_key(DEFAULT_SALT, "Password") + token = Fernet(key) + encrypted_message = _encrypt(token, message) + model_data = SaveCrypto(message=encrypted_message, created=now) + serializer = PasswordCryptoSerializerSerializer(model_data) + time.sleep(3) + self.assertEqual(serializer.data["message"], message) + + def test_serialization_ttl(self): + now = datetime.datetime.now() + message = "test message" + key = _generate_password_key(DEFAULT_SALT, DEFAULT_PASSWORD) + token = Fernet(key) + encrypted_message = _encrypt(token, message) + model_data = SaveCrypto(message=encrypted_message, created=now) + serializer = TtlCryptoSerializerSerializer(model_data) + time.sleep(3) + self.assertEqual(serializer.data["message"], None)