Skip to content
This repository has been archived by the owner on Sep 21, 2023. It is now read-only.

Commit

Permalink
Added the ability to decode messages
Browse files Browse the repository at this point in the history
  • Loading branch information
0xOmarA committed Feb 12, 2022
1 parent f6f1647 commit 0ee82b5
Showing 1 changed file with 81 additions and 12 deletions.
93 changes: 81 additions & 12 deletions src/radixlib/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict, Any, Union, List, Tuple, Set
from typing import Optional, Dict, Any, Union, List, Tuple, Set, BinaryIO, overload
from radixlib.serializable import Serializable

from ecdsa.keys import SigningKey, VerifyingKey
Expand All @@ -10,6 +10,7 @@
import hashlib
import ecdsa
import os
import io

def remove_none_values_recursively(dictionary: Dict[Any, Any]) -> Dict[Any, Any]:
""" Recursively removes the key value pairs where the value is None.
Expand Down Expand Up @@ -105,9 +106,9 @@ def encrypt_message(
# Creating the shared secret of the Diffie-Hellman through the public, private, and the ephemeral key
public_key_point: PointJacobi = VerifyingKey.from_string(bytearray.fromhex(receiver_public_key), curve=SECP256k1, hashfunc=hashlib.sha256).pubkey.point # type: ignore
private_key_point: int = SigningKey.from_string(bytearray.fromhex(sender_private_key), curve=SECP256k1, hashfunc=hashlib.sha256).privkey.secret_multiplier # type: ignore
ephemeral_private_key_point: PointJacobi = ephemeral_public_key.pubkey.point # type: ignore
ephemeral_public_key_point: PointJacobi = ephemeral_public_key.pubkey.point # type: ignore

shared_secret: bytes = ((public_key_point * private_key_point) + ephemeral_private_key_point).x().to_bytes(32, 'big') # type: ignore
shared_secret: bytes = ((public_key_point * private_key_point) + ephemeral_public_key_point).x().to_bytes(32, 'big') # type: ignore

# Creating the key through through the salt
nonce: bytes = os.urandom(12)
Expand Down Expand Up @@ -139,23 +140,91 @@ def encrypt_message(
bytearray(ciphertext)
).hex()

def decode_message(message_bytes: str) -> str:
@overload
def decode_message(message_bytes: str) -> str: ...

@overload
def decode_message(message_bytes: str, public_key: str, private_key: str) -> str: ...

def decode_message(
message_bytes: str,
public_key: Optional[str] = None,
private_key: Optional[str] = None,
) -> str:
""" Decodes the message bytes into a utf-8 encoded string.
Args:
message_bytes (str): A string of the message bytes in hex.
public_key (str, optional): An optional argument passed only when the message is
believed to be encrypted. If passed when the message is not encrypted then it wont be
used for any operations.
private_key (str, optional): An optional argument passed only when the message is
believed to be encrypted. If passed when the message is not encrypted then it wont be
used for any operations.
Returns:
str: A string of the decoded message
"""

# The first byte determines if the message is encrypted or not. If the first byte is a zer then
# the message is not encrypted. However, if it is is a 1 then the message is encrypted. For the
# time being, the library does not have support for the legacy message formats (will be added
# soon).
is_encrypted: bool = message_bytes[:2] == "01"
# Converting the message into a binary IO to make the reading of the data easier
message_io: BinaryIO = io.BytesIO(bytearray.fromhex(message_bytes))

if not is_encrypted:
return bytearray.fromhex(message_bytes[4:]).decode('utf-8')
# Loading the data packed in the message bytes
message_type: int = message_io.read(1)[0]
encryption_type: bytes = message_io.read(1)

if message_type == 0x0: # In this case the message is not encrypted, just encoded in UTF-8
return message_io.read().decode('utf-8')

elif message_type == 0x1: # In this case the message is encrypted and encoded in UTF-8
if encryption_type[0] != 0xFF:
raise NotImplementedError("This function only supports the decryption of 0xFF type encryption (AES-GCM)")

# Ensure that in the case that the message is encrypted that both the public and private keys
# are provided
if public_key is None or private_key is None:
raise ValueError("This message appears to be encrypted and you need to provide the public and private keys for encrypted messages.")

# Loading up the remaining information in the encrypted message
ephemeral_public_key: bytes = message_io.read(33)
nonce: bytes = message_io.read(12)
auth_tag: bytes = message_io.read(16)
cipher_text: bytes = message_io.read()

# Creating the shared secret of the Diffie-Hellman through the public, private, and the ephemeral key
public_key_point: PointJacobi = VerifyingKey.from_string(bytearray.fromhex(public_key), curve=SECP256k1, hashfunc=hashlib.sha256).pubkey.point # type: ignore
private_key_point: int = SigningKey.from_string(bytearray.fromhex(private_key), curve=SECP256k1, hashfunc=hashlib.sha256).privkey.secret_multiplier # type: ignore
ephemeral_public_key_point: PointJacobi = VerifyingKey.from_string(string = ephemeral_public_key, curve = SECP256k1, hashfunc=hashlib.sha256).pubkey.point # type: ignore

shared_secret: bytes = ((public_key_point * private_key_point) + ephemeral_public_key_point).x().to_bytes(32, 'big') # type: ignore

# Creating the key through the information in the message
salt: bytes = hashlib.sha256(bytearray(nonce)).digest()
key: bytes = scrypt( # type: ignore
password = bytearray(shared_secret), # type: ignore
salt = salt, # type: ignore
key_len = 32,
N = 8192,
r = 8,
p = 1
)

# Creating a new cipher and adding the ephemeral key to it
cipher: GcmMode = AES.new(key, AES.MODE_GCM, nonce=nonce) # type: ignore
cipher.update(ephemeral_public_key) # type: ignore

# Decrypting the data and returning it
plain_text: bytes = cipher.decrypt_and_verify(cipher_text, auth_tag)
return plain_text.decode('utf-8')

elif message_type == 0x30: # In this case the message follows the legacy double encoded format
# When the message has the double encoded format then all that we need to do is pass the
# decoded format back to the function again so that it can perform the decoding accordingly.
return decode_message(
message_bytes = bytearray.fromhex(message_bytes).decode('utf-8'),
public_key = public_key, # type: ignore
private_key = private_key # type: ignore
)

else:
raise NotImplementedError("Support for the decryption of messages has not yet been implemented")
raise NotImplementedError("No decoding programmed for the provided message type")

0 comments on commit 0ee82b5

Please sign in to comment.