diff --git a/README.md b/README.md index fea2a07..3a74093 100644 --- a/README.md +++ b/README.md @@ -95,6 +95,20 @@ There are 3 ways in which data can be sent and received from at server. ``` + +### CLI Tools +* **REPL** - you can use this to type atPlatform commands and see responses; but the best thing about the REPL currently is that it shows the data notifications as they are received. The REPL code has the essentials of what a 'receiving' client needs to do - i.e. + * create an AtClient (assigning a Queue object to its queue parameter) + * start two new threads + * one for the AtClient.start_monitor() task: receives data update/delete notification events (the event data contains the ciphertext) + * the other one calls handle_event() method, which will read the upcoming events in the queue and handle them: + * calling AtClient.handle_event() (to decrypt the notifications and introducing the result as a new event in the queue) + * reading the new event, which contains the decrypted result + * Instructions to run the REPL: + 1) Run repl.py and choose an atSign using option `1` + 2) Select option `2`. REPL will start and activate monitor mode automatically in a different thread. You can still send commands/verbs. You will start seeing your own notifications (from yourself to yourself) and heartbeat working (noop verb is sent from time to time as a keepalive) + 3) Use `at_talk` or any other tool to send notifications to your atSign from a different atSign. You should be able to see the complete notification, and the encrypted and decrypted value of it. + ## Open source usage and contributions This is open source code, so feel free to use it as is, suggest changes or diff --git a/at_client/atclient.py b/at_client/atclient.py index 2ea3a95..2fdc42a 100644 --- a/at_client/atclient.py +++ b/at_client/atclient.py @@ -1,4 +1,11 @@ +import base64 import json +from queue import Empty, Queue +import time +import traceback + +from at_client.connections.notification.atevents import AtEvent, AtEventType + from .common.atsign import AtSign from .util.verbbuilder import * @@ -8,13 +15,17 @@ from .exception.atexception import * from .connections.atrootconnection import AtRootConnection from .connections.atsecondaryconnection import AtSecondaryConnection +from .connections.atmonitorconnection import AtMonitorConnection +from .util.atconstants import * from .connections.address import Address from .common.keys import Keys, SharedKey, PrivateHiddenKey, PublicKey, SelfKey from .util.authutil import AuthUtil class AtClient(ABC): - def __init__(self, atsign:AtSign, root_address:Address=Address("root.atsign.org", 64), secondary_address:Address=None, verbose:bool = False): + def __init__(self, atsign:AtSign, root_address:Address=Address("root.atsign.org", 64), secondary_address:Address=None, queue:Queue=None, verbose:bool = False): self.atsign = atsign + self.queue = queue + self.monitor_connection = None self.keys = KeysUtil.load_keys(atsign) self.verbose = verbose if secondary_address is None: @@ -22,6 +33,7 @@ def __init__(self, atsign:AtSign, root_address:Address=Address("root.atsign.org" port=root_address.port, verbose=verbose) secondary_address = self.root_connection.find_secondary(atsign) + self.secondary_address = secondary_address self.secondary_connection = AtSecondaryConnection(secondary_address, verbose=verbose) self.secondary_connection.connect() AuthUtil.authenticate_with_pkam(self.secondary_connection, self.atsign, self.keys) @@ -320,7 +332,87 @@ def delete(self, key): raise AtSecondaryConnectException(f"Failed to execute {command} - {e}") else: raise NotImplementedError(f"No implementation found for key type: {type(key)}") - + def __del__(self): if self.secondary_connection: self.secondary_connection.disconnect() + + def start_monitor(self): + if self.queue != None: + global should_be_running_lock + what = "" + try: + if self.monitor_connection == None: + what = "construct an AtMonitorConnection" + self.monitor_connection = AtMonitorConnection(queue=self.queue, atsign=self.atsign, address=self.secondary_address, verbose=True) + self.monitor_connection.connect() + AuthUtil.authenticate_with_pkam(self.monitor_connection, self.atsign, self.keys) + should_be_running_lock.acquire(blocking=1) + if not self.monitor_connection.running: + should_be_running_lock.release() + what = "call monitor_connection.start_monitor()" + self.monitor_connection.start_monitor() + else: + should_be_running_lock.release() + except Exception as e: + print("SEVERE: failed to " + what + " : " + str(e)) + traceback.print_exc() + else: + raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") + + def stop_monitor(self): + if self.queue != None: + global should_be_running_lock + what = "" + try: + if self.monitor_connection == None: + return + should_be_running_lock.acquire(blocking=1) + if not self.monitor_connection.running: + should_be_running_lock.release() + what = "call monitor_connection.stop_monitor()" + self.monitor_connection.stop_monitor() + else: + should_be_running_lock.release() + except Exception as e: + print("SEVERE: failed to " + what + " : " + str(e)) + traceback.print_exc() + else: + raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") + + def handle_event(self, queue, at_event): + if self.queue != None: + try: + event_type = at_event.event_type + event_data = at_event.event_data + + if event_type == AtEventType.SHARED_KEY_NOTIFICATION: + if event_data["value"] != None: + shared_shared_key_name = event_data["key"] + shared_shared_key_encrypted_value = event_data["value"] + try: + shared_key_decrypted_value = EncryptionUtil.rsa_decrypt_from_base64(shared_shared_key_encrypted_value, self.keys[KeysUtil.encryption_private_key_name]) + self.keys[shared_shared_key_name] = shared_key_decrypted_value + except Exception as e: + print(str(time.time()) + ": caught exception " + str(e) + " while decrypting received shared key " + shared_shared_key_name) + elif event_type == AtEventType.UPDATE_NOTIFICATION: + if event_data["value"] != None: + key = event_data["key"] + encrypted_value = event_data["value"] + ivNonce = event_data["metadata"]["ivNonce"] + try: + encryption_key_shared_by_other = self.get_encryption_key_shared_by_other(SharedKey.from_string(key=key)) + decrypted_value = EncryptionUtil.aes_decrypt_from_base64(encrypted_text=encrypted_value.encode(), self_encryption_key=encryption_key_shared_by_other, iv=base64.b64decode(ivNonce)) + new_event_data = dict(event_data) + new_event_data["decryptedValue"] = decrypted_value + new_at_event = AtEvent(AtEventType.DECRYPTED_UPDATE_NOTIFICATION, new_event_data) + queue.put(new_at_event) + except Exception as e: + print(str(time.time()) + ": caught exception " + str(e) + " while decrypting received data with key name [" + key + "]") + except Empty: + pass + else: + raise Exception("You must assign a Queue object to the queue paremeter of AtClient class") + + + diff --git a/at_client/common/keys.py b/at_client/common/keys.py index 65a4533..37548e5 100644 --- a/at_client/common/keys.py +++ b/at_client/common/keys.py @@ -134,7 +134,7 @@ def from_string(key: str) -> 'SharedKey': raise AtException("SharedKey.from_string('" + key + "'): key must have structure @bob:foo.bar@alice") key_name = split_by_at_sign[0] shared_by = split_by_at_sign[1] - shared_key = SharedKey(AtSign(shared_by), AtSign(shared_with)) + shared_key = SharedKey(key_name, AtSign(shared_by), AtSign(shared_with)) shared_key.name = key_name return shared_key diff --git a/at_client/common/metadata.py b/at_client/common/metadata.py index a1b598a..ae3a0df 100644 --- a/at_client/common/metadata.py +++ b/at_client/common/metadata.py @@ -30,7 +30,7 @@ class Metadata: shared_key_enc: str = None pub_key_cs: str = None encoding: str = None - + def parse_datetime(datetime_str): if datetime_str is not None: return parse(datetime_str) @@ -64,6 +64,7 @@ def from_json(json_str): metadata.shared_key_enc = data.get('sharedKeyEnc') metadata.pub_key_cs = data.get('pubKeyCS') metadata.encoding = data.get('encoding') + return metadata @staticmethod @@ -93,6 +94,7 @@ def from_dict(data_dict): metadata.shared_key_enc = data_dict.get('sharedKeyEnc') metadata.pub_key_cs = data_dict.get('pubKeyCS') metadata.encoding = data_dict.get('encoding') + return metadata @@ -120,6 +122,9 @@ def __str__(self): s += f":isEncrypted:{'true' if self.is_encrypted else 'false'}" if self.encoding: s += f":encoding:{self.encoding}" + + # TO?DO: Add new parameters + return s @staticmethod @@ -146,4 +151,5 @@ def squash(first_metadata, second_metadata): metadata.shared_key_enc = first_metadata.shared_key_enc if first_metadata.shared_key_enc is not None else second_metadata.shared_key_enc metadata.pub_key_cs = first_metadata.pub_key_cs if first_metadata.pub_key_cs is not None else second_metadata.pub_key_cs metadata.encoding = first_metadata.encoding if first_metadata.encoding is not None else second_metadata.encoding + return metadata diff --git a/at_client/connections/__init__.py b/at_client/connections/__init__.py index f13903b..971df9c 100644 --- a/at_client/connections/__init__.py +++ b/at_client/connections/__init__.py @@ -1,4 +1,7 @@ from .address import Address from .atconnection import AtConnection from .atrootconnection import AtRootConnection -from .atsecondaryconnection import AtSecondaryConnection \ No newline at end of file +from .atsecondaryconnection import AtSecondaryConnection +from .atmonitorconnection import AtMonitorConnection +from .notification.atevents import AtEventType +from .notification.atnotification import AtNotification \ No newline at end of file diff --git a/at_client/connections/atconnection.py b/at_client/connections/atconnection.py index 164985f..0d2ba44 100644 --- a/at_client/connections/atconnection.py +++ b/at_client/connections/atconnection.py @@ -1,6 +1,10 @@ +import asyncio import socket import ssl from abc import ABC, abstractmethod +import traceback + +from ..util.socketutil import SocketUtil from ..exception.atexception import AtException from .response import Response @@ -29,6 +33,7 @@ def __init__(self, host:str, port:int, context:ssl.SSLContext, verbose:bool=Fals self._secure_root_socket = None self._verbose = verbose self._connected = False + self.monitor_connection = None def __str__(self): """ @@ -77,10 +82,15 @@ def connect(self): Establish a connection to the server. Throws IOException """ if not self._connected: + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._socket.settimeout(None) self._socket.connect(self._addr_info) self._secure_root_socket = self._context.wrap_socket( self._socket, server_hostname=self._host, do_handshake_on_connect=True ) + + self._stream_reader = SocketUtil(self._secure_root_socket) + self._connected = True self.read() diff --git a/at_client/connections/atmonitorconnection.py b/at_client/connections/atmonitorconnection.py new file mode 100644 index 0000000..d285452 --- /dev/null +++ b/at_client/connections/atmonitorconnection.py @@ -0,0 +1,222 @@ +import ssl +from threading import Thread +import threading +import time +import json +import traceback + +from ..common.atsign import AtSign +from .notification.atevents import AtEvent, AtEventType +from ..util.syncdecorator import synchronized +from ..util.timeutil import TimeUtil +from ..util.atconstants import * +from .address import Address +from .atsecondaryconnection import AtSecondaryConnection +import queue + +class AtMonitorConnection(AtSecondaryConnection): + last_received_time: int = 0 + running: bool = False + should_be_running: bool = False + + def __init__(self, queue:queue.Queue, atsign:AtSign, address: Address, context:ssl.SSLContext=ssl.create_default_context(), verbose:bool=True): + self.atsign = atsign + self.queue = queue + self._verbose = True + super().__init__(address, context, verbose) + self._last_heartbeat_sent_time = TimeUtil.current_time_millis() + self._last_heartbeat_ack_time = TimeUtil.current_time_millis() + self._heartbeat_interval_millis = 30000 + self.start_heart_beat() + + def start_heart_beat(self): + threading.Thread(target=self._start_heart_beat).start() + + def _start_heart_beat(self): + global should_be_running_lock + while True: + should_be_running_lock.acquire() + if self.should_be_running: + should_be_running_lock.release() + if (not self.running) or (self._last_heartbeat_sent_time - self._last_heartbeat_ack_time >= self._heartbeat_interval_millis): + try: + print("Monitor heartbeats not being received") + self.stop_monitor() + wait_start_time = TimeUtil.current_time_millis() + running_lock.acquire(blocking=1) + entered = False + print((TimeUtil.current_time_millis() - wait_start_time) < 5000) + while self.running and ((TimeUtil.current_time_millis() - wait_start_time) < 5000): + entered = True + running_lock.release() + print("Wait 5 seconds for monitor to stop") + try: + time.sleep(1) + except Exception as ignore: + pass + if not entered: + running_lock.release() + entered = False + running_lock.acquire(blocking=1) + if self.running: + print("Monitor thread has not stopped, but going to start another one anyway") + running_lock.release() + self.start_monitor() + except Exception as e: + print("Monitor restart failed "+ str(e)) + else: + if TimeUtil.current_time_millis() - self._last_heartbeat_sent_time > self._heartbeat_interval_millis: + try: + self.execute_command(command="noop:0", retry_on_exception=False, read_the_response=False) + self._last_heartbeat_sent_time = TimeUtil.current_time_millis() + except Exception as ignore: + # Can't do anything, the heartbeat loop will take care of restarting the monitor connection + pass + else: + should_be_running_lock.release() + try: + time.sleep(self._heartbeat_interval_millis / 6000) # 6 * 1000 (from ms to s) + except Exception as ignore: + pass + + def start_monitor(self): + self._last_heartbeat_sent_time = self._last_heartbeat_ack_time = TimeUtil.current_time_millis() + + should_be_running_lock.acquire(blocking=1) + self.should_be_running = True + should_be_running_lock.release() + + running_lock.acquire(blocking=1) + if not self.running: + self.running = True + running_lock.release() + if not self._connected: + try: + self._connect() + except Exception as e: + print("startMonitor failed to connect to secondary : " + str(e)) + traceback.print_exc() + running_lock.acquire(blocking=1) + self.running = False + running_lock.release() + return False + self._run() + else: + running_lock.release() + return True + + def stop_monitor(self): + should_be_running_lock.acquire(blocking=1) + self.should_be_running = False + should_be_running_lock.release() + + self._last_heartbeat_sent_time = self._last_heartbeat_ack_time = TimeUtil.current_time_millis() + self.disconnect() + + def _run(self): + what = "" + first = True + try: + monitor_cmd = "monitor:" + str(self.last_received_time) + what = "send monitor command " + monitor_cmd + self.execute_command(command=monitor_cmd, retry_on_exception=True, read_the_response=False) + + entered = False + should_be_running_lock.acquire(blocking=1) + while self.should_be_running: + should_be_running_lock.release() + entered = True + first = False + what = "read from connection" + response = self._stream_reader.readline() + if self._verbose and response != b"": + print("\tRCVD (MONITOR): " + str(response.decode())) + + event_type = AtEventType.NONE + event_data = {} + what = "parse monitor message" + try: + if response.startswith(b"data:ok"): + event_type = AtEventType.MONITOR_HEARTBEAT_ACK + event_data["key"] = "__heartbeat__" + event_data["value"] = response.decode()[len("data:"):] + self._last_heartbeat_ack_time = TimeUtil.current_time_millis() + elif response.startswith(b"data:"): + event_type = AtEventType.MONITOR_EXCEPTION + event_data["key"] = "__monitorException__" + event_data["value"] = response.decode() + event_data["exception"] = "Unexpected 'data:' message from server" + elif response.startswith(b"error:"): + event_type = AtEventType.MONITOR_EXCEPTION + event_data["key"] = "__monitorException__" + event_data["value"] = response.decode() + event_data["exception"] = "Unexpected 'error:' message from server" + elif response.startswith(b"notification:"): + event_data = json.loads(response.decode()[len("notification:"):]) + uuid = str(event_data.get("id")) + operation = str(event_data.get("operation")) + key = str(event_data.get("key")) + if "epochMillis" in event_data: + self.last_received_time = int(event_data.get("epochMillis")) + else: + self.last_received_time = TimeUtil.current_time_millis() + if uuid == "-1": + event_type = AtEventType.STATS_NOTIFICATION + elif operation == "update": + if key.startswith(str(self.atsign.to_string) + ":shared_key@"): + event_type = AtEventType.SHARED_KEY_NOTIFICATION + else: + event_type = AtEventType.UPDATE_NOTIFICATION + elif operation == "delete": + event_type = AtEventType.DELETE_NOTIFICATION + else: + event_type = AtEventType.MONITOR_EXCEPTION + event_data["key"] = "__monitorException__" + event_data["value"] = response.decode() + event_data["exception"] = "Unknown notification operation " + str(operation) + else: + event_type = AtEventType.MONITOR_EXCEPTION + event_data["key"] = "__monitorException__" + event_data["value"] = response.decode() + event_data["exception"] = "Malformed response from server" + except Exception as e: + print(e) + event_type = AtEventType.MONITOR_EXCEPTION + event_data["key"] = "__monitorException__" + event_data["value"] = response.decode() + event_data["exception"] = str(e) + traceback.print_exc() + + at_event = AtEvent(event_type, event_data) + self.queue.put(at_event) + + should_be_running_lock.acquire(blocking=1) + entered = False + if not entered: + should_be_running_lock.release() + entered = False + except Exception as e: + traceback.print_exc() + should_be_running_lock.acquire(blocking=1) + if not self.should_be_running: + should_be_running_lock.release() + else: + should_be_running_lock.release() + print("Monitor failed to " + what + " : " + str(e)) + traceback.print_exc() + print("Monitor ending. Monitor heartbeat thread should restart the monitor shortly") + self.disconnect() + finally: + running_lock.acquire(blocking=1) + self.running = False + running_lock.release() + + self.disconnect() + + def _connect(self): + """ + Establish a connection to the secondary server. + """ + super().connect() + if self._verbose: + print("Secondary Connection Successful") \ No newline at end of file diff --git a/at_client/connections/atsecondaryconnection.py b/at_client/connections/atsecondaryconnection.py index 4b0079e..f7a8e42 100644 --- a/at_client/connections/atsecondaryconnection.py +++ b/at_client/connections/atsecondaryconnection.py @@ -52,7 +52,6 @@ def parse_raw_response(self, raw_response:str): raw_response = raw_response[:-1] raw_response = raw_response.strip() - # return raw_response data_index = raw_response.find("data:") error_index = raw_response.find("error:") diff --git a/at_client/connections/notification/atevents.py b/at_client/connections/notification/atevents.py new file mode 100644 index 0000000..54da1b1 --- /dev/null +++ b/at_client/connections/notification/atevents.py @@ -0,0 +1,47 @@ +from enum import Enum +from typing import Dict, Set + +class AtEventType(Enum): + NONE = 0 + SHARED_KEY_NOTIFICATION = 1 + UPDATE_NOTIFICATION = 2 + DELETE_NOTIFICATION = 3 + UPDATE_NOTIFICATION_TEXT = 4 + STATS_NOTIFICATION = 5 + MONITOR_HEARTBEAT_ACK = 6 + MONITOR_EXCEPTION = 7 + DECRYPTED_UPDATE_NOTIFICATION = 8 + USER_DEFINED = 9 + + def __str__(self): + return self.name + + def to_string(event): + if event == 0: + return "NONE" + elif event == 1: + return "SHARED_KEY_NOTIFICATION" + elif event == 2: + return "UPDATE_NOTIFICATION" + elif event == 3: + return "DELETE_NOTIFICATION" + elif event == 4: + return "UPDATE_NOTIFICATION_TEXT" + elif event == 5: + return "STATS_NOTIFICATION" + elif event == 6: + return "MONITOR_HEARTBEAT_ACK" + elif event == 7: + return "MONITOR_EXCEPTION" + elif event == 8: + return "DECRYPTED_UPDATE_NOTIFICATION" + elif event == 9: + return "USER_DEFINED" + else: + return "UNKNOWN" + + +class AtEvent: + def __init__(self, event_type, event_data): + self.event_type = event_type + self.event_data = event_data diff --git a/at_client/connections/notification/atnotification.py b/at_client/connections/notification/atnotification.py new file mode 100644 index 0000000..dd93fbd --- /dev/null +++ b/at_client/connections/notification/atnotification.py @@ -0,0 +1,52 @@ +from dataclasses import dataclass + +from ...common.metadata import Metadata +from ...util.atconstants import * + +@dataclass +class AtNotification: + uuid: str = None + key: str = '' + from_atsign: str = '' + to_atsign: str = '' + epoch_millis: int = 0 + status: str = '' + value: str = None + operation: str = None + message_type: str = None + is_encrypted: str = None + expires_at_in_epoch_millis: int = None + metadata: Metadata = None + + def from_json(cls, json): + metadata = None + if json['metadata'] is not None: + metadata = Metadata() + metadata.enc_key_name = json['metadata'][ENCRYPTING_KEY_NAME] + metadata.enc_algo = json['metadata'][ENCRYPTING_ALGO] + metadata.iv_nonce = json['metadata'][IV_OR_NONCE] + metadata.ske_enc_key_name = json['metadata'][SHARED_KEY_ENCRYPTED_ENCRYPTING_KEY_NAME] + metadata.ske_enc_algo = json['metadata'][SHARED_KEY_ENCRYPTED_ENCRYPTING_ALGO] + + return cls(json['id'], json['key'], json['from'], json['to'], json['epochMillis'], + json['messageType'], json[IS_ENCRYPTED], + value=json['value'], operation=json['operation'], + expiresAtInEpochMillis=json['expiresAt'], metadata=metadata) + + def to_json(self): + return { + 'id': self.uuid, + 'key': self.key, + 'from': self.from_atsign, + 'to': self.to_atsign, + 'epochMillis': self.epoch_millis, + 'value': self.value, + 'operation': self.operation, + 'messageType': self.message_type, + IS_ENCRYPTED: self.is_encrypted, + 'notificationStatus': self.status, + 'expiresAt': self.expires_at_in_epoch_millis, + 'metadata': self.metadata # to json? + } + + # TO?DO: from_json_list, __str__(self) \ No newline at end of file diff --git a/at_client/util/__init__.py b/at_client/util/__init__.py index 533fe54..43a2109 100644 --- a/at_client/util/__init__.py +++ b/at_client/util/__init__.py @@ -3,4 +3,7 @@ from .verbbuilder import * from .authutil import AuthUtil from .onboardingutil import OnboardingUtil -from .registerutil import * \ No newline at end of file +from .registerutil import * +from .atconstants import * +from .timeutil import TimeUtil +from .syncdecorator import synchronized \ No newline at end of file diff --git a/at_client/util/atconstants.py b/at_client/util/atconstants.py new file mode 100644 index 0000000..6079f36 --- /dev/null +++ b/at_client/util/atconstants.py @@ -0,0 +1,12 @@ +import threading + + +should_be_running_lock = threading.Lock() +running_lock = threading.Lock() + +ENCRYPTING_KEY_NAME = 'encKeyName' +ENCRYPTING_ALGO = 'encAlgo' +IV_OR_NONCE = 'ivNonce' +SHARED_KEY_ENCRYPTED_ENCRYPTING_KEY_NAME = 'skeEncKeyName' +SHARED_KEY_ENCRYPTED_ENCRYPTING_ALGO = 'skeEncAlgo' +IS_ENCRYPTED = 'isEncrypted' \ No newline at end of file diff --git a/at_client/util/socketutil.py b/at_client/util/socketutil.py new file mode 100644 index 0000000..0dfdf8d --- /dev/null +++ b/at_client/util/socketutil.py @@ -0,0 +1,28 @@ +from ssl import SSLSocket + +LF = b"\x0a" +CRLF = b"\x0d\x0a" + + +class SocketUtil: + + def __init__(self, sock: SSLSocket): + self._socket = sock + + def readline(self) -> bytes: + """ + Read a line from the SSLSocket until a newline character is encountered. + + :param socket: SSLSocket object + :return: bytes read until newline character + """ + + line = b"" + while True: + data = self._socket.recv(1) + if data == b"": + break # No more data to read, connection closed + line += data + if data == LF: + break # Newline character encountered, stop reading + return line diff --git a/at_client/util/syncdecorator.py b/at_client/util/syncdecorator.py new file mode 100644 index 0000000..9479e27 --- /dev/null +++ b/at_client/util/syncdecorator.py @@ -0,0 +1,27 @@ +from functools import wraps +from multiprocessing import Lock + +def synchronized(member): + """ + @synchronized decorator. + + Lock a method for synchronized access only. The lock is stored to + the function or class instance, depending on what is available. + """ + + @wraps(member) + def wrapper(*args, **kwargs): + lock = vars(member).get("_synchronized_lock", None) + result = "" + try: + if lock is None: + lock = vars(member).setdefault("_synchronized_lock", Lock()) + lock.acquire() + result = member(*args, **kwargs) + lock.release() + except Exception as e: + lock.release() + raise e + return result + + return wrapper \ No newline at end of file diff --git a/at_client/util/timeutil.py b/at_client/util/timeutil.py new file mode 100644 index 0000000..39f7c65 --- /dev/null +++ b/at_client/util/timeutil.py @@ -0,0 +1,6 @@ +import time + +class TimeUtil: + @staticmethod + def current_time_millis(): + return int(time.time() * 1000) \ No newline at end of file diff --git a/examples/repl.py b/examples/repl.py index b6e51f5..135a327 100644 --- a/examples/repl.py +++ b/examples/repl.py @@ -1,4 +1,6 @@ import os, sys +from queue import Queue, Empty +import threading if os.path.basename(os.getcwd()) == "examples": base_dir = ".." @@ -6,14 +8,18 @@ base_dir = "." sys.path.append(base_dir) -sys.path.append(f"{base_dir}/src/common") -sys.path.append(f"{base_dir}/src/connections") -sys.path.append(f"{base_dir}/src/util") +sys.path.append(f"{base_dir}/at_client/common") +sys.path.append(f"{base_dir}/at_client/connections") +sys.path.append(f"{base_dir}/at_client/util") from at_client import AtClient from at_client.common import AtSign from at_client.common.keys import Keys, SharedKey from at_client.util.keystringutil import KeyStringUtil, KeyType +from at_client.connections.notification.atevents import AtEvent, AtEventType +from at_client.util.atconstants import * + +shared_queue = Queue() def print_help_instructions(): print() @@ -38,7 +44,35 @@ def print_help_instructions(): print(" NOTE: put, get, and delete will append the current atSign to the atKeyName if not supplied") print() + +def handle_event(queue, client): + while True: + try: + at_event = queue.get(block=False) + client.handle_event(queue, at_event) + event_type = at_event.event_type + event_data = at_event.event_data + print("\t => " + " REPL received event: " + str(event_type) + "\n\t\t\t" + str(event_data) + "\n") + sk = None + if event_type == AtEventType.DECRYPTED_UPDATE_NOTIFICATION: + key = event_data["key"] + sk = SharedKey.from_string(key=key) + value = event_data["value"] + decrypted_value = str(event_data["decryptedValue"]) + print(" => Notification ==> Key: [" + str(sk) + "] ==> EncryptedValue [" + str(value) + "] ==> DecryptedValue [" + decrypted_value + "]") + elif event_type == AtEventType.UPDATE_NOTIFICATION_TEXT: + print(str(event_data)) + elif event_type == AtEventType.UPDATE_NOTIFICATION: + try: + sk = SharedKey.from_string(str(event_data["key"])) + + except Exception as e: + print("Failed to retrieve " + str(sk) + " : " + str(e)) + except Empty: + pass + def main(): + atSignStr = 'NOT SET' while True: @@ -55,7 +89,13 @@ def main(): if atSignStr != '' and atSignStr != 'NOT SET': print('Connecting to ' + atSignStr + "...") atSign = AtSign(atSignStr) - client = AtClient(atsign=atSign, verbose=True) + + global shared_queue + client = AtClient(atsign=atSign, verbose=True, queue=shared_queue) + + threading.Thread(target=handle_event, args=(shared_queue,client,)).start() + threading.Thread(target=client.start_monitor, args=()).start() + command = '' while command!= '/exit': if client.authenticated: