Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ADD custom encrypted fields #312

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ python:
- 3.8

install:
- python -m pip install -U pip
- pip install tox tox-travis

script:
Expand Down
Binary file modified docs/index.html
Binary file not shown.
28 changes: 27 additions & 1 deletion docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -487,6 +487,30 @@ Request example:
name: "Serpuhov"
}

#### Crypto Fields
CryptoBinaryField and CryptoCharField are a django-rest-framework fields for handling encryption through serialisation. Inputs are String object and internal
python representation is Binary object for CryptoBinaryField and String object for CryptoCharField

It takes the optional parameter `salt` (Django SECRET_KEY imported from setting as default). If set it use custom cryptographic salt

It takes the optional parameter `password` ("Non_nobis1solum?nati!sumus" as default). If set it use a custom password in encryption. **It is highly recommended to use custom one!!**

It takes the optional parameter `ttl` (None as default). If set it manage the number of seconds old a message may be for it to be valid. If the message is older than ttl seconds (from the time it was originally created) field will return None and encrypted message will not be enabled for decryption.

from rest_framework_extensions.fields import CryptoBinaryField

class CryptoSerializer(serializers.Serializer):
crypto_char = CryptoCharField()

Example with parameters

from rest_framework import serializers
from drf_extra_fields.crypto_fields import CryptoCharField


class CryptoSerializer(serializers.Serializer):
crypto_char = CryptoCharField(salt="custom salt", password="custom password", ttl=1000)


### Permissions

Expand Down Expand Up @@ -784,7 +808,7 @@ Of course you can use custom [key constructor](#key-constructor):
object_cache_key_func = CustomObjectKeyConstructor()
list_cache_key_func = CustomListKeyConstructor()

*New in DRF-extensions development*
*New in DRF-extensions development version*

You can change cache timeout by providing `object_cache_timeout` or
`list_cache_timeout` properties in view class:
Expand Down Expand Up @@ -2212,6 +2236,8 @@ If you need to access the values of DRF-extensions API settings in your project,
You can read about versioning, deprecation policy and upgrading from
[Django REST framework documentation](https://www.django-rest-framework.org/community/release-notes/).

#### Development version
* Added fields [CryptoBinaryField and CryptoCharField](#crypto-fields)

#### 0.7.0

Expand Down
2 changes: 1 addition & 1 deletion rest_framework_extensions/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
__version__ = '0.7.0' # from 0.7.0
__version__ = '0.7.1' # from 0.7.0

VERSION = __version__
126 changes: 125 additions & 1 deletion rest_framework_extensions/fields.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
from rest_framework.relations import HyperlinkedRelatedField
import base64

from cryptography.fernet import Fernet, InvalidToken
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from rest_framework import serializers

EMPTY_VALUES = (None, "", [], (), {})
DEFAULT_PASSWORD = b"Non_nobis1solum?nati!sumus"
DEFAULT_SALT = settings.SECRET_KEY


class ResourceUriField(HyperlinkedRelatedField):
Expand All @@ -20,9 +32,121 @@ class Meta:
"resource_uri": "http://localhost/v1/surveys/1/",
}
"""

# todo: test me
read_only = True

def __init__(self, *args, **kwargs):
kwargs.setdefault('source', '*')
kwargs.setdefault("source", "*")
super().__init__(*args, **kwargs)


def _generate_password_key(salt=DEFAULT_SALT, password=DEFAULT_PASSWORD):
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=_to_bytes(salt),
iterations=100000,
)

key = base64.urlsafe_b64encode(kdf.derive(_to_bytes(password)))
return key


def _to_bytes(v):
if isinstance(v, str):
return v.encode("utf-8")

if isinstance(v, bytes):
return v

raise TypeError(
_(
"SALT & PASSWORD must be specified as strings that convert nicely to "
"bytes."
)
)


def _encrypt(token, value_in_str):
b_message = value_in_str.encode("utf-8")
encrypted_message = token.encrypt(b_message)
return encrypted_message


def _decrypt(token, value, ttl=None):
ttl = int(ttl) if ttl else None
decrypted_message = token.decrypt(_to_bytes(value), ttl)
return decrypted_message.decode("utf-8")


class CryptoBinaryField(serializers.Field):
"""
A django-rest-framework field for handling encryption through serialisation, where input are string
and internal python representation is Binary object.

"""

type_name = "CryptoBinaryField"
type_label = "crypto"

default_error_messages = {
"invalid": _("Input a valid data"),
}

def __init__(self, *args, **kwargs):
self.salt = kwargs.pop("salt", DEFAULT_SALT)
self.password = kwargs.pop("password", DEFAULT_PASSWORD)
self.ttl = kwargs.pop("ttl", None)
super(CryptoBinaryField, self).__init__(*args, **kwargs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

python 2 compatibilities is not needed anymore so just super().init(*, **)


def to_internal_value(self, value):
"""
Parse json data and return a point object
"""
if value in EMPTY_VALUES and not self.required:
return None

if isinstance(value, str):
key = _generate_password_key(self.salt, self.password)
token = Fernet(key)
encrypted_message = _encrypt(token, value)
return encrypted_message

self.fail("invalid")

def to_representation(self, value):
"""
Transform POINT object to json.
"""
if value is None:
return value
if isinstance(value, str):
value = value.encode("utf-8")
elif isinstance(value, (bytearray, memoryview)):
value = bytes(value)
if isinstance(value, bytes):
key = _generate_password_key(self.salt, self.password)
token = Fernet(key)
try:
decrypted_message = _decrypt(token, value, self.ttl)
return decrypted_message
except InvalidToken:
return None

self.fail("invalid")


class CryptoCharField(CryptoBinaryField):
"""
A django-rest-framework field for handling encryption through serialisation, where input are string
and internal python representation is String object.
"""

type_name = "CryptoCharField"

def to_internal_value(self, value):
value = super(CryptoCharField, self).to_internal_value(value)
if value:
return value.decode("utf-8")
self.fail("invalid")
3 changes: 2 additions & 1 deletion tests_app/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ nose
django-nose
django-filter>=2.1.0
mock
ipdb
ipdb
cryptography==3.2.1
Empty file.
166 changes: 166 additions & 0 deletions tests_app/tests/unit/fields/tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import time

from rest_framework import serializers
from rest_framework_extensions.fields import (
CryptoBinaryField,
CryptoCharField,
)
import datetime
from django.test import TestCase
from django.conf import settings

import base64

from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC

DEFAULT_PASSWORD = b"Non_nobis1solum?nati!sumus"
DEFAULT_SALT = settings.SECRET_KEY


def _generate_password_key(salt=DEFAULT_SALT, password=DEFAULT_PASSWORD):
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=_to_bytes(salt),
iterations=100000,
)

key = base64.urlsafe_b64encode(kdf.derive(_to_bytes(password)))
return key


def _to_bytes(v):
if isinstance(v, str):
return v.encode("utf-8")

if isinstance(v, bytes):
return v



def _encrypt(token, value_in_str):
b_message = value_in_str.encode("utf-8")
encrypted_message = token.encrypt(b_message)
return encrypted_message


def _decrypt(token, value, ttl=None):
ttl = int(ttl) if ttl else None
decrypted_message = token.decrypt(_to_bytes(value), ttl)
return decrypted_message.decode("utf-8")


class SaveCrypto(object):
def __init__(self, message=None, created=None):
self.message = message
self.created = created or datetime.datetime.now()


class CryptoSerializer(serializers.Serializer):
message = CryptoBinaryField(required=False)
created = serializers.DateTimeField()

def update(self, instance, validated_data):
instance.message = validated_data["message"]
return instance

def create(self, validated_data):
return SaveCrypto(**validated_data)


class CryptoCharSerializer(serializers.Serializer):
message = CryptoCharField(required=False)
created = serializers.DateTimeField()


class SaltCryptoSerializerSerializer(CryptoSerializer):
message = CryptoBinaryField(salt="Salt")
created = serializers.DateTimeField()


class PasswordCryptoSerializerSerializer(CryptoSerializer):
message = CryptoBinaryField(password="Password")
created = serializers.DateTimeField()


class TtlCryptoSerializerSerializer(CryptoSerializer):
message = CryptoBinaryField(ttl=1)
created = serializers.DateTimeField()


class TestPointSerializer(TestCase):
def test_create(self):
"""
Test for creating CryptoBinaryField
"""
now = datetime.datetime.now()
message = "test message"
serializer = CryptoSerializer(data={"created": now, "message": message})
model_data = SaveCrypto(message=message, created=now)

self.assertTrue(serializer.is_valid())
self.assertEqual(serializer.validated_data["created"], model_data.created)
self.assertFalse(serializer.validated_data is model_data)
self.assertIs(type(serializer.validated_data["message"]), bytes)

def test_create_char(self):
"""
Test for creating CryptoCharField
"""
now = datetime.datetime.now()
message = "test message"
serializer = CryptoCharSerializer(data={"created": now, "message": message})
model_data = SaveCrypto(message=message, created=now)

self.assertTrue(serializer.is_valid())
self.assertEqual(serializer.validated_data["created"], model_data.created)
self.assertFalse(serializer.validated_data is model_data)
self.assertIs(type(serializer.validated_data["message"]), str)

def test_serialization(self):
"""
Regular JSON serialization should output float values
"""
now = datetime.datetime.now()
message = "test message"
key = _generate_password_key(DEFAULT_SALT, DEFAULT_PASSWORD)
token = Fernet(key)
encrypted_message = _encrypt(token, message)
model_data = SaveCrypto(message=encrypted_message, created=now)
serializer = CryptoSerializer(model_data)
self.assertEqual(serializer.data["message"], message)

def test_serialization_salt(self):
now = datetime.datetime.now()
message = "test message"
key = _generate_password_key("Salt", DEFAULT_PASSWORD)
token = Fernet(key)
encrypted_message = _encrypt(token, message)
model_data = SaveCrypto(message=encrypted_message, created=now)
serializer = SaltCryptoSerializerSerializer(model_data)
time.sleep(3)
self.assertEqual(serializer.data["message"], message)

def test_serialization_password(self):
now = datetime.datetime.now()
message = "test message"
key = _generate_password_key(DEFAULT_SALT, "Password")
token = Fernet(key)
encrypted_message = _encrypt(token, message)
model_data = SaveCrypto(message=encrypted_message, created=now)
serializer = PasswordCryptoSerializerSerializer(model_data)
time.sleep(3)
self.assertEqual(serializer.data["message"], message)

def test_serialization_ttl(self):
now = datetime.datetime.now()
message = "test message"
key = _generate_password_key(DEFAULT_SALT, DEFAULT_PASSWORD)
token = Fernet(key)
encrypted_message = _encrypt(token, message)
model_data = SaveCrypto(message=encrypted_message, created=now)
serializer = TtlCryptoSerializerSerializer(model_data)
time.sleep(3)
self.assertEqual(serializer.data["message"], None)