From 0ee82b555e7b753279e3ccf112f5f101f462c23f Mon Sep 17 00:00:00 2001 From: 0xOmarA Date: Sat, 12 Feb 2022 17:46:33 +0300 Subject: [PATCH] Added the ability to decode messages --- src/radixlib/utils.py | 93 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 81 insertions(+), 12 deletions(-) diff --git a/src/radixlib/utils.py b/src/radixlib/utils.py index 6ef0f02..2be57fc 100644 --- a/src/radixlib/utils.py +++ b/src/radixlib/utils.py @@ -1,4 +1,4 @@ -from typing import Dict, Any, Union, List, Tuple, Set +from typing import Optional, Dict, Any, Union, List, Tuple, Set, BinaryIO, overload from radixlib.serializable import Serializable from ecdsa.keys import SigningKey, VerifyingKey @@ -10,6 +10,7 @@ import hashlib import ecdsa import os +import io def remove_none_values_recursively(dictionary: Dict[Any, Any]) -> Dict[Any, Any]: """ Recursively removes the key value pairs where the value is None. @@ -105,9 +106,9 @@ def encrypt_message( # Creating the shared secret of the Diffie-Hellman through the public, private, and the ephemeral key public_key_point: PointJacobi = VerifyingKey.from_string(bytearray.fromhex(receiver_public_key), curve=SECP256k1, hashfunc=hashlib.sha256).pubkey.point # type: ignore private_key_point: int = SigningKey.from_string(bytearray.fromhex(sender_private_key), curve=SECP256k1, hashfunc=hashlib.sha256).privkey.secret_multiplier # type: ignore - ephemeral_private_key_point: PointJacobi = ephemeral_public_key.pubkey.point # type: ignore + ephemeral_public_key_point: PointJacobi = ephemeral_public_key.pubkey.point # type: ignore - shared_secret: bytes = ((public_key_point * private_key_point) + ephemeral_private_key_point).x().to_bytes(32, 'big') # type: ignore + shared_secret: bytes = ((public_key_point * private_key_point) + ephemeral_public_key_point).x().to_bytes(32, 'big') # type: ignore # Creating the key through through the salt nonce: bytes = os.urandom(12) @@ -139,23 +140,91 @@ def encrypt_message( bytearray(ciphertext) ).hex() -def decode_message(message_bytes: str) -> str: +@overload +def decode_message(message_bytes: str) -> str: ... + +@overload +def decode_message(message_bytes: str, public_key: str, private_key: str) -> str: ... + +def decode_message( + message_bytes: str, + public_key: Optional[str] = None, + private_key: Optional[str] = None, +) -> str: """ Decodes the message bytes into a utf-8 encoded string. Args: message_bytes (str): A string of the message bytes in hex. + public_key (str, optional): An optional argument passed only when the message is + believed to be encrypted. If passed when the message is not encrypted then it wont be + used for any operations. + private_key (str, optional): An optional argument passed only when the message is + believed to be encrypted. If passed when the message is not encrypted then it wont be + used for any operations. Returns: str: A string of the decoded message """ - # The first byte determines if the message is encrypted or not. If the first byte is a zer then - # the message is not encrypted. However, if it is is a 1 then the message is encrypted. For the - # time being, the library does not have support for the legacy message formats (will be added - # soon). - is_encrypted: bool = message_bytes[:2] == "01" + # Converting the message into a binary IO to make the reading of the data easier + message_io: BinaryIO = io.BytesIO(bytearray.fromhex(message_bytes)) - if not is_encrypted: - return bytearray.fromhex(message_bytes[4:]).decode('utf-8') + # Loading the data packed in the message bytes + message_type: int = message_io.read(1)[0] + encryption_type: bytes = message_io.read(1) + + if message_type == 0x0: # In this case the message is not encrypted, just encoded in UTF-8 + return message_io.read().decode('utf-8') + + elif message_type == 0x1: # In this case the message is encrypted and encoded in UTF-8 + if encryption_type[0] != 0xFF: + raise NotImplementedError("This function only supports the decryption of 0xFF type encryption (AES-GCM)") + + # Ensure that in the case that the message is encrypted that both the public and private keys + # are provided + if public_key is None or private_key is None: + raise ValueError("This message appears to be encrypted and you need to provide the public and private keys for encrypted messages.") + + # Loading up the remaining information in the encrypted message + ephemeral_public_key: bytes = message_io.read(33) + nonce: bytes = message_io.read(12) + auth_tag: bytes = message_io.read(16) + cipher_text: bytes = message_io.read() + + # Creating the shared secret of the Diffie-Hellman through the public, private, and the ephemeral key + public_key_point: PointJacobi = VerifyingKey.from_string(bytearray.fromhex(public_key), curve=SECP256k1, hashfunc=hashlib.sha256).pubkey.point # type: ignore + private_key_point: int = SigningKey.from_string(bytearray.fromhex(private_key), curve=SECP256k1, hashfunc=hashlib.sha256).privkey.secret_multiplier # type: ignore + ephemeral_public_key_point: PointJacobi = VerifyingKey.from_string(string = ephemeral_public_key, curve = SECP256k1, hashfunc=hashlib.sha256).pubkey.point # type: ignore + + shared_secret: bytes = ((public_key_point * private_key_point) + ephemeral_public_key_point).x().to_bytes(32, 'big') # type: ignore + + # Creating the key through the information in the message + salt: bytes = hashlib.sha256(bytearray(nonce)).digest() + key: bytes = scrypt( # type: ignore + password = bytearray(shared_secret), # type: ignore + salt = salt, # type: ignore + key_len = 32, + N = 8192, + r = 8, + p = 1 + ) + + # Creating a new cipher and adding the ephemeral key to it + cipher: GcmMode = AES.new(key, AES.MODE_GCM, nonce=nonce) # type: ignore + cipher.update(ephemeral_public_key) # type: ignore + + # Decrypting the data and returning it + plain_text: bytes = cipher.decrypt_and_verify(cipher_text, auth_tag) + return plain_text.decode('utf-8') + + elif message_type == 0x30: # In this case the message follows the legacy double encoded format + # When the message has the double encoded format then all that we need to do is pass the + # decoded format back to the function again so that it can perform the decoding accordingly. + return decode_message( + message_bytes = bytearray.fromhex(message_bytes).decode('utf-8'), + public_key = public_key, # type: ignore + private_key = private_key # type: ignore + ) + else: - raise NotImplementedError("Support for the decryption of messages has not yet been implemented") \ No newline at end of file + raise NotImplementedError("No decoding programmed for the provided message type") \ No newline at end of file