Skip to content

Commit

Permalink
Merge pull request #21 from atsign-foundation/realvarx-dev
Browse files Browse the repository at this point in the history
feat: Notification monitoring system
  • Loading branch information
cpswan authored Jul 14, 2023
2 parents 549667a + c3fa483 commit c93c9f7
Show file tree
Hide file tree
Showing 16 changed files with 572 additions and 11 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ There are 3 ways in which data can be sent and received from at server.

```


### CLI Tools
* **REPL** - you can use this to type atPlatform commands and see responses; but the best thing about the REPL currently is that it shows the data notifications as they are received. The REPL code has the essentials of what a 'receiving' client needs to do - i.e.
* create an AtClient (assigning a Queue object to its queue parameter)
* start two new threads
* one for the AtClient.start_monitor() task: receives data update/delete notification events (the event data contains the ciphertext)
* the other one calls handle_event() method, which will read the upcoming events in the queue and handle them:
* calling AtClient.handle_event() (to decrypt the notifications and introducing the result as a new event in the queue)
* reading the new event, which contains the decrypted result
* Instructions to run the REPL:
1) Run repl.py and choose an atSign using option `1`
2) Select option `2`. REPL will start and activate monitor mode automatically in a different thread. You can still send commands/verbs. You will start seeing your own notifications (from yourself to yourself) and heartbeat working (noop verb is sent from time to time as a keepalive)
3) Use `at_talk` or any other tool to send notifications to your atSign from a different atSign. You should be able to see the complete notification, and the encrypted and decrypted value of it.

## Open source usage and contributions

This is open source code, so feel free to use it as is, suggest changes or
Expand Down
96 changes: 94 additions & 2 deletions at_client/atclient.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import base64
import json
from queue import Empty, Queue
import time
import traceback

from at_client.connections.notification.atevents import AtEvent, AtEventType


from .common.atsign import AtSign
from .util.verbbuilder import *
Expand All @@ -8,20 +15,25 @@
from .exception.atexception import *
from .connections.atrootconnection import AtRootConnection
from .connections.atsecondaryconnection import AtSecondaryConnection
from .connections.atmonitorconnection import AtMonitorConnection
from .util.atconstants import *
from .connections.address import Address
from .common.keys import Keys, SharedKey, PrivateHiddenKey, PublicKey, SelfKey
from .util.authutil import AuthUtil

class AtClient(ABC):
def __init__(self, atsign:AtSign, root_address:Address=Address("root.atsign.org", 64), secondary_address:Address=None, verbose:bool = False):
def __init__(self, atsign:AtSign, root_address:Address=Address("root.atsign.org", 64), secondary_address:Address=None, queue:Queue=None, verbose:bool = False):
self.atsign = atsign
self.queue = queue
self.monitor_connection = None
self.keys = KeysUtil.load_keys(atsign)
self.verbose = verbose
if secondary_address is None:
self.root_connection = AtRootConnection.get_instance(host=root_address.host,
port=root_address.port,
verbose=verbose)
secondary_address = self.root_connection.find_secondary(atsign)
self.secondary_address = secondary_address
self.secondary_connection = AtSecondaryConnection(secondary_address, verbose=verbose)
self.secondary_connection.connect()
AuthUtil.authenticate_with_pkam(self.secondary_connection, self.atsign, self.keys)
Expand Down Expand Up @@ -320,7 +332,87 @@ def delete(self, key):
raise AtSecondaryConnectException(f"Failed to execute {command} - {e}")
else:
raise NotImplementedError(f"No implementation found for key type: {type(key)}")

def __del__(self):
if self.secondary_connection:
self.secondary_connection.disconnect()

def start_monitor(self):
if self.queue != None:
global should_be_running_lock
what = ""
try:
if self.monitor_connection == None:
what = "construct an AtMonitorConnection"
self.monitor_connection = AtMonitorConnection(queue=self.queue, atsign=self.atsign, address=self.secondary_address, verbose=True)
self.monitor_connection.connect()
AuthUtil.authenticate_with_pkam(self.monitor_connection, self.atsign, self.keys)
should_be_running_lock.acquire(blocking=1)
if not self.monitor_connection.running:
should_be_running_lock.release()
what = "call monitor_connection.start_monitor()"
self.monitor_connection.start_monitor()
else:
should_be_running_lock.release()
except Exception as e:
print("SEVERE: failed to " + what + " : " + str(e))
traceback.print_exc()
else:
raise Exception("You must assign a Queue object to the queue paremeter of AtClient class")

def stop_monitor(self):
if self.queue != None:
global should_be_running_lock
what = ""
try:
if self.monitor_connection == None:
return
should_be_running_lock.acquire(blocking=1)
if not self.monitor_connection.running:
should_be_running_lock.release()
what = "call monitor_connection.stop_monitor()"
self.monitor_connection.stop_monitor()
else:
should_be_running_lock.release()
except Exception as e:
print("SEVERE: failed to " + what + " : " + str(e))
traceback.print_exc()
else:
raise Exception("You must assign a Queue object to the queue paremeter of AtClient class")

def handle_event(self, queue, at_event):
if self.queue != None:
try:
event_type = at_event.event_type
event_data = at_event.event_data

if event_type == AtEventType.SHARED_KEY_NOTIFICATION:
if event_data["value"] != None:
shared_shared_key_name = event_data["key"]
shared_shared_key_encrypted_value = event_data["value"]
try:
shared_key_decrypted_value = EncryptionUtil.rsa_decrypt_from_base64(shared_shared_key_encrypted_value, self.keys[KeysUtil.encryption_private_key_name])
self.keys[shared_shared_key_name] = shared_key_decrypted_value
except Exception as e:
print(str(time.time()) + ": caught exception " + str(e) + " while decrypting received shared key " + shared_shared_key_name)
elif event_type == AtEventType.UPDATE_NOTIFICATION:
if event_data["value"] != None:
key = event_data["key"]
encrypted_value = event_data["value"]
ivNonce = event_data["metadata"]["ivNonce"]
try:
encryption_key_shared_by_other = self.get_encryption_key_shared_by_other(SharedKey.from_string(key=key))
decrypted_value = EncryptionUtil.aes_decrypt_from_base64(encrypted_text=encrypted_value.encode(), self_encryption_key=encryption_key_shared_by_other, iv=base64.b64decode(ivNonce))
new_event_data = dict(event_data)
new_event_data["decryptedValue"] = decrypted_value
new_at_event = AtEvent(AtEventType.DECRYPTED_UPDATE_NOTIFICATION, new_event_data)
queue.put(new_at_event)
except Exception as e:
print(str(time.time()) + ": caught exception " + str(e) + " while decrypting received data with key name [" + key + "]")
except Empty:
pass
else:
raise Exception("You must assign a Queue object to the queue paremeter of AtClient class")



2 changes: 1 addition & 1 deletion at_client/common/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def from_string(key: str) -> 'SharedKey':
raise AtException("SharedKey.from_string('" + key + "'): key must have structure @bob:foo.bar@alice")
key_name = split_by_at_sign[0]
shared_by = split_by_at_sign[1]
shared_key = SharedKey(AtSign(shared_by), AtSign(shared_with))
shared_key = SharedKey(key_name, AtSign(shared_by), AtSign(shared_with))
shared_key.name = key_name
return shared_key

Expand Down
8 changes: 7 additions & 1 deletion at_client/common/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Metadata:
shared_key_enc: str = None
pub_key_cs: str = None
encoding: str = None

def parse_datetime(datetime_str):
if datetime_str is not None:
return parse(datetime_str)
Expand Down Expand Up @@ -64,6 +64,7 @@ def from_json(json_str):
metadata.shared_key_enc = data.get('sharedKeyEnc')
metadata.pub_key_cs = data.get('pubKeyCS')
metadata.encoding = data.get('encoding')

return metadata

@staticmethod
Expand Down Expand Up @@ -93,6 +94,7 @@ def from_dict(data_dict):
metadata.shared_key_enc = data_dict.get('sharedKeyEnc')
metadata.pub_key_cs = data_dict.get('pubKeyCS')
metadata.encoding = data_dict.get('encoding')

return metadata


Expand Down Expand Up @@ -120,6 +122,9 @@ def __str__(self):
s += f":isEncrypted:{'true' if self.is_encrypted else 'false'}"
if self.encoding:
s += f":encoding:{self.encoding}"

# TO?DO: Add new parameters

return s

@staticmethod
Expand All @@ -146,4 +151,5 @@ def squash(first_metadata, second_metadata):
metadata.shared_key_enc = first_metadata.shared_key_enc if first_metadata.shared_key_enc is not None else second_metadata.shared_key_enc
metadata.pub_key_cs = first_metadata.pub_key_cs if first_metadata.pub_key_cs is not None else second_metadata.pub_key_cs
metadata.encoding = first_metadata.encoding if first_metadata.encoding is not None else second_metadata.encoding

return metadata
5 changes: 4 additions & 1 deletion at_client/connections/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from .address import Address
from .atconnection import AtConnection
from .atrootconnection import AtRootConnection
from .atsecondaryconnection import AtSecondaryConnection
from .atsecondaryconnection import AtSecondaryConnection
from .atmonitorconnection import AtMonitorConnection
from .notification.atevents import AtEventType
from .notification.atnotification import AtNotification
10 changes: 10 additions & 0 deletions at_client/connections/atconnection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import asyncio
import socket
import ssl
from abc import ABC, abstractmethod
import traceback

from ..util.socketutil import SocketUtil

from ..exception.atexception import AtException
from .response import Response
Expand Down Expand Up @@ -29,6 +33,7 @@ def __init__(self, host:str, port:int, context:ssl.SSLContext, verbose:bool=Fals
self._secure_root_socket = None
self._verbose = verbose
self._connected = False
self.monitor_connection = None

def __str__(self):
"""
Expand Down Expand Up @@ -77,10 +82,15 @@ def connect(self):
Establish a connection to the server. Throws IOException
"""
if not self._connected:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(None)
self._socket.connect(self._addr_info)
self._secure_root_socket = self._context.wrap_socket(
self._socket, server_hostname=self._host, do_handshake_on_connect=True
)

self._stream_reader = SocketUtil(self._secure_root_socket)

self._connected = True
self.read()

Expand Down
Loading

0 comments on commit c93c9f7

Please sign in to comment.