Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Notification monitoring system #21

Merged
merged 14 commits into from
Jul 14, 2023
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,20 @@ There are 3 ways in which data can be sent and received from at server.

```


### CLI Tools
* **REPL** - you can use this to type atPlatform commands and see responses; but the best thing about the REPL currently is that it shows the data notifications as they are received. The REPL code has the essentials of what a 'receiving' client needs to do - i.e.
* create an AtClient (assigning a Queue object to its queue parameter)
* start two new threads
* one for the AtClient.start_monitor() task: receives data update/delete notification events (the event data contains the ciphertext)
* the other one calls handle_event() method, which will read the upcoming events in the queue and handle them:
* calling AtClient.handle_event() (to decrypt the notifications and introducing the result as a new event in the queue)
* reading the new event, which contains the decrypted result
* Instructions to run the REPL:
1) Run repl.py and choose an atSign using option `1`
2) Select option `2`. REPL will start and activate monitor mode automatically in a different thread. You can still send commands/verbs. You will start seeing your own notifications (from yourself to yourself) and heartbeat working (noop verb is sent from time to time as a keepalive)
3) Use `at_talk` or any other tool to send notifications to your atSign from a different atSign. You should be able to see the complete notification, and the encrypted and decrypted value of it.

## Open source usage and contributions

This is open source code, so feel free to use it as is, suggest changes or
Expand Down
96 changes: 94 additions & 2 deletions at_client/atclient.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
import base64
import json
from queue import Empty, Queue
import time
import traceback

from at_client.connections.notification.atevents import AtEvent, AtEventType


from .common.atsign import AtSign
from .util.verbbuilder import *
Expand All @@ -8,20 +15,25 @@
from .exception.atexception import *
from .connections.atrootconnection import AtRootConnection
from .connections.atsecondaryconnection import AtSecondaryConnection
from .connections.atmonitorconnection import AtMonitorConnection
from .util.atconstants import *
from .connections.address import Address
from .common.keys import Keys, SharedKey, PrivateHiddenKey, PublicKey, SelfKey
from .util.authutil import AuthUtil

class AtClient(ABC):
def __init__(self, atsign:AtSign, root_address:Address=Address("root.atsign.org", 64), secondary_address:Address=None, verbose:bool = False):
def __init__(self, atsign:AtSign, root_address:Address=Address("root.atsign.org", 64), secondary_address:Address=None, queue:Queue=None, verbose:bool = False):
self.atsign = atsign
self.queue = queue
self.monitor_connection = None
self.keys = KeysUtil.load_keys(atsign)
self.verbose = verbose
if secondary_address is None:
self.root_connection = AtRootConnection.get_instance(host=root_address.host,
port=root_address.port,
verbose=verbose)
secondary_address = self.root_connection.find_secondary(atsign)
self.secondary_address = secondary_address
self.secondary_connection = AtSecondaryConnection(secondary_address, verbose=verbose)
self.secondary_connection.connect()
AuthUtil.authenticate_with_pkam(self.secondary_connection, self.atsign, self.keys)
Expand Down Expand Up @@ -320,7 +332,87 @@ def delete(self, key):
raise AtSecondaryConnectException(f"Failed to execute {command} - {e}")
else:
raise NotImplementedError(f"No implementation found for key type: {type(key)}")

def __del__(self):
if self.secondary_connection:
self.secondary_connection.disconnect()

def start_monitor(self):
if self.queue != None:
global should_be_running_lock
what = ""
try:
if self.monitor_connection == None:
what = "construct an AtMonitorConnection"
self.monitor_connection = AtMonitorConnection(queue=self.queue, atsign=self.atsign, address=self.secondary_address, verbose=True)
self.monitor_connection.connect()
AuthUtil.authenticate_with_pkam(self.monitor_connection, self.atsign, self.keys)
should_be_running_lock.acquire(blocking=1)
if not self.monitor_connection.running:
should_be_running_lock.release()
what = "call monitor_connection.start_monitor()"
self.monitor_connection.start_monitor()
else:
should_be_running_lock.release()
except Exception as e:
print("SEVERE: failed to " + what + " : " + str(e))
traceback.print_exc()
else:
raise Exception("You must assign a Queue object to the queue paremeter of AtClient class")

def stop_monitor(self):
if self.queue != None:
global should_be_running_lock
what = ""
try:
if self.monitor_connection == None:
return
should_be_running_lock.acquire(blocking=1)
if not self.monitor_connection.running:
should_be_running_lock.release()
what = "call monitor_connection.stop_monitor()"
self.monitor_connection.stop_monitor()
else:
should_be_running_lock.release()
except Exception as e:
print("SEVERE: failed to " + what + " : " + str(e))
traceback.print_exc()
else:
raise Exception("You must assign a Queue object to the queue paremeter of AtClient class")

def handle_event(self, queue, at_event):
if self.queue != None:
try:
event_type = at_event.event_type
event_data = at_event.event_data

if event_type == AtEventType.SHARED_KEY_NOTIFICATION:
if event_data["value"] != None:
shared_shared_key_name = event_data["key"]
shared_shared_key_encrypted_value = event_data["value"]
try:
shared_key_decrypted_value = EncryptionUtil.rsa_decrypt_from_base64(shared_shared_key_encrypted_value, self.keys[KeysUtil.encryption_private_key_name])
self.keys[shared_shared_key_name] = shared_key_decrypted_value
except Exception as e:
print(str(time.time()) + ": caught exception " + str(e) + " while decrypting received shared key " + shared_shared_key_name)
elif event_type == AtEventType.UPDATE_NOTIFICATION:
if event_data["value"] != None:
key = event_data["key"]
encrypted_value = event_data["value"]
ivNonce = event_data["metadata"]["ivNonce"]
try:
encryption_key_shared_by_other = self.get_encryption_key_shared_by_other(SharedKey.from_string(key=key))
decrypted_value = EncryptionUtil.aes_decrypt_from_base64(encrypted_text=encrypted_value.encode(), self_encryption_key=encryption_key_shared_by_other, iv=base64.b64decode(ivNonce))
new_event_data = dict(event_data)
new_event_data["decryptedValue"] = decrypted_value
new_at_event = AtEvent(AtEventType.DECRYPTED_UPDATE_NOTIFICATION, new_event_data)
queue.put(new_at_event)
except Exception as e:
print(str(time.time()) + ": caught exception " + str(e) + " while decrypting received data with key name [" + key + "]")
except Empty:
pass
else:
raise Exception("You must assign a Queue object to the queue paremeter of AtClient class")



2 changes: 1 addition & 1 deletion at_client/common/keys.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ def from_string(key: str) -> 'SharedKey':
raise AtException("SharedKey.from_string('" + key + "'): key must have structure @bob:foo.bar@alice")
key_name = split_by_at_sign[0]
shared_by = split_by_at_sign[1]
shared_key = SharedKey(AtSign(shared_by), AtSign(shared_with))
shared_key = SharedKey(key_name, AtSign(shared_by), AtSign(shared_with))
shared_key.name = key_name
return shared_key

Expand Down
8 changes: 7 additions & 1 deletion at_client/common/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class Metadata:
shared_key_enc: str = None
pub_key_cs: str = None
encoding: str = None

def parse_datetime(datetime_str):
if datetime_str is not None:
return parse(datetime_str)
Expand Down Expand Up @@ -64,6 +64,7 @@ def from_json(json_str):
metadata.shared_key_enc = data.get('sharedKeyEnc')
metadata.pub_key_cs = data.get('pubKeyCS')
metadata.encoding = data.get('encoding')

return metadata

@staticmethod
Expand Down Expand Up @@ -93,6 +94,7 @@ def from_dict(data_dict):
metadata.shared_key_enc = data_dict.get('sharedKeyEnc')
metadata.pub_key_cs = data_dict.get('pubKeyCS')
metadata.encoding = data_dict.get('encoding')

return metadata


Expand Down Expand Up @@ -120,6 +122,9 @@ def __str__(self):
s += f":isEncrypted:{'true' if self.is_encrypted else 'false'}"
if self.encoding:
s += f":encoding:{self.encoding}"

# TO?DO: Add new parameters

return s

@staticmethod
Expand All @@ -146,4 +151,5 @@ def squash(first_metadata, second_metadata):
metadata.shared_key_enc = first_metadata.shared_key_enc if first_metadata.shared_key_enc is not None else second_metadata.shared_key_enc
metadata.pub_key_cs = first_metadata.pub_key_cs if first_metadata.pub_key_cs is not None else second_metadata.pub_key_cs
metadata.encoding = first_metadata.encoding if first_metadata.encoding is not None else second_metadata.encoding

return metadata
5 changes: 4 additions & 1 deletion at_client/connections/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from .address import Address
from .atconnection import AtConnection
from .atrootconnection import AtRootConnection
from .atsecondaryconnection import AtSecondaryConnection
from .atsecondaryconnection import AtSecondaryConnection
from .atmonitorconnection import AtMonitorConnection
from .notification.atevents import AtEventType
from .notification.atnotification import AtNotification
10 changes: 10 additions & 0 deletions at_client/connections/atconnection.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import asyncio
import socket
import ssl
from abc import ABC, abstractmethod
import traceback

from ..util.socketutil import SocketUtil

from ..exception.atexception import AtException
from .response import Response
Expand Down Expand Up @@ -29,6 +33,7 @@ def __init__(self, host:str, port:int, context:ssl.SSLContext, verbose:bool=Fals
self._secure_root_socket = None
self._verbose = verbose
self._connected = False
self.monitor_connection = None

def __str__(self):
"""
Expand Down Expand Up @@ -77,10 +82,15 @@ def connect(self):
Establish a connection to the server. Throws IOException
"""
if not self._connected:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(None)
self._socket.connect(self._addr_info)
self._secure_root_socket = self._context.wrap_socket(
self._socket, server_hostname=self._host, do_handshake_on_connect=True
)

self._stream_reader = SocketUtil(self._secure_root_socket)

self._connected = True
self.read()

Expand Down
Loading