diff --git a/.github/workflows/ruff_lint.yml b/.github/workflows/ruff_lint.yml new file mode 100644 index 0000000..8df3ce0 --- /dev/null +++ b/.github/workflows/ruff_lint.yml @@ -0,0 +1,34 @@ +name: RuffLint +# Runs the workflow on the below events: +# 1. on pull request raised to trunk branch. +# 2. on push event to trunk branch. +on: + push: + branches: + - trunk + pull_request: + branches: + - trunk + +permissions: # added using https://github.com/step-security/secure-workflows + contents: read + +jobs: + lint-markdown: + runs-on: ubuntu-latest + name: Lint MicroPython + steps: + - name: checkout repo content + uses: actions/checkout@b4ffde65f46336ab88eb53be808477a3936bae11 # v4.1.1 + - name: Install and run Ruff linter + uses: chartboost/ruff-action@e18ae971ccee1b2d7bbef113930f00c670b78da4 # v1.0.0 + with: + src: | + 'src/main.py' + 'src/lib/aes.py' + 'src/lib/atclient.py' + 'src/lib/iv_nonce.py' + 'src/lib/logging.py' + 'src/lib/ntp_client.py' + 'src/lib/pem_service.py' + 'src/lib/uasn1.py' diff --git a/src/lib/aes.py b/src/lib/aes.py index b924e75..71e66ce 100644 --- a/src/lib/aes.py +++ b/src/lib/aes.py @@ -1,6 +1,5 @@ -import ucryptolib -import os import binascii +import ucryptolib def aes_decrypt(encryptedText, selfEncryptionKey): ciphertext = binascii.a2b_base64(encryptedText) @@ -14,5 +13,3 @@ def hex_str_to_bytes(hex_str): parts = hex_str.split(' ') together = "".join(parts) return bytes.fromhex(together) - - \ No newline at end of file diff --git a/src/lib/atclient.py b/src/lib/atclient.py index 4b861bf..17a60ba 100644 --- a/src/lib/atclient.py +++ b/src/lib/atclient.py @@ -1,4 +1,6 @@ +import _thread import gc +import logging import time import ubinascii import ucryptolib @@ -10,10 +12,8 @@ from third_party import string from iv_nonce import IVNonce -import logging log=logging.getLogger(__name__) -import _thread lock = _thread.allocate_lock() monitoring = True notifications = [] @@ -36,7 +36,6 @@ def unpad(ciphertext: bytes): ciphertext (byte string): A piece of data with padding that needs to be stripped. """ - pdata_len = len(ciphertext) padding_len = ciphertext[-1] return ciphertext[:-padding_len] @@ -166,7 +165,7 @@ def authenticate(self, pkamKey): log.info("Doing PKAM authentication to {}", self.atsign) response, command = send_verb(self.sock, 'from:' + self.atsign) if response is None or len(response)<4: - raise ATException("Short response") + raise atException("Short response") challenge = response.replace('@data:', '') log.info("atsign pkam challenge : {}", challenge) pkamrsa=rsa.PrivateKey(pkamKey[0], pkamKey[1], pkamKey[2], pkamKey[3], pkamKey[4]) @@ -195,7 +194,7 @@ def getsharedkey(self, privKey): response, command = send_verb(self.sock, 'llookup:shared_key.' + self.recipient +'@' + self.atsign) log.info("Got this response for llookup: {}", response) if response is None or len(response)<4: - raise ATException("Short response") + raise atException("Short response") if response.startswith('@' + self.atsign + '@'): response=response.replace('@' + self.atsign + '@','') log.info("Truncated response is: {}", response) @@ -261,7 +260,7 @@ def getrecipientsharedkey(self, privKey): response, command = send_verb(self.sock, 'llookup:cached:@' + self.atsign +':shared_key@' + self.recipient) log.info("Got this response for llookup: {}", response) if response is None or len(response)<4: - raise ATException("Short response") + raise atException("Short response") if response.startswith('@' + self.atsign + '@'): response=response.replace('@' + self.atsign + '@','') log.info("Truncated response is: {}", response) diff --git a/src/lib/iv_nonce.py b/src/lib/iv_nonce.py index f669903..b4e1fcc 100644 --- a/src/lib/iv_nonce.py +++ b/src/lib/iv_nonce.py @@ -24,8 +24,8 @@ def token_bytes(self): def token_hex(self): # check upython compat for decode - return binascii.hexlify(token_bytes()).decode('ascii') + return ubinascii.hexlify(self.randbytes).decode('ascii') def __init__(self, n:int=16): self.n = n - self.randbytes = self._randbytes(n) \ No newline at end of file + self.randbytes = self._randbytes(n) diff --git a/src/lib/logging.py b/src/lib/logging.py index 27baf52..6be1c2c 100644 --- a/src/lib/logging.py +++ b/src/lib/logging.py @@ -43,9 +43,9 @@ def __init__(self, name): self.name = name def _level_str(self, level): - l = _level_dict.get(level) - if l is not None: - return l + levelfromdict = _level_dict.get(level) + if levelfromdict is not None: + return levelfromdict return "LVL%s" % level def setLevel(self, level): @@ -65,7 +65,7 @@ def log(self, level, msg, *args): msg = msg % args else: msg = msg.format(*args) - except: + except Exception: msg = msg + '--BAD LOG FORMAT--' if self.handlers: d = self.record.__dict__ @@ -115,9 +115,9 @@ def addHandler(self, hndlr): def getLogger(name="root"): if name in _loggers: return _loggers[name] - l = Logger(name) - _loggers[name] = l - return l + loggername = Logger(name) + _loggers[name] = loggername + return loggername def exc(e, msg, *args): getLogger().exc(e, msg, *args) @@ -142,4 +142,4 @@ def basicConfig(level=INFO, filename=None, stream=None, format=None): if filename is not None: print("logging.basicConfig: filename arg is not supported") if format is not None: - print("logging.basicConfig: format arg is not supported") \ No newline at end of file + print("logging.basicConfig: format arg is not supported") diff --git a/src/lib/ntp_client.py b/src/lib/ntp_client.py index ac499b0..2d75785 100644 --- a/src/lib/ntp_client.py +++ b/src/lib/ntp_client.py @@ -1,8 +1,7 @@ -import network # type: ignore +import machine +import struct import time import usocket as socket # type: ignore -import struct -import machine # Substract 1 hour, so we get + 1 hour TIMESTAMP_DELTA = 2208988800 - 3600*1 # epoch time - 1 hour @@ -42,11 +41,3 @@ def format_time_id(): def get_week_day(day): d = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] return d[day] - - - - - - - - diff --git a/src/lib/pem_service.py b/src/lib/pem_service.py index 2d73098..be3ab64 100644 --- a/src/lib/pem_service.py +++ b/src/lib/pem_service.py @@ -74,7 +74,7 @@ def prettyprint(input_data, output, indent=0): tag, value = input_data.read() output.write(' ' * indent) output.write('[%s] %s (value %s)' % - (strclass(tag[2]), strid(tag[0]), repr(value))) + (strclass(tag[2]), strid(tag[0]), repr(value))) output.write('\n') elif tag[1] == uasn1.TypeConstructed: output.write(' ' * indent) diff --git a/src/lib/uasn1.py b/src/lib/uasn1.py index 2a5536c..ab92801 100644 --- a/src/lib/uasn1.py +++ b/src/lib/uasn1.py @@ -6,6 +6,9 @@ # uASN1 is copyright (c) 2007-2021 by the uASN1 authors. See the # file "AUTHORS" for a complete overview. +import ubinascii as binascii +import re + Boolean = 0x01 Integer = 0x02 BitString = 0x03 @@ -24,9 +27,6 @@ ClassContext = 0x80 ClassPrivate = 0xc0 -import re -import binascii - class Error(Exception): """ASN1 error""" @@ -442,4 +442,4 @@ def _decode_object_identifier(self, bytes_data): raise Error('ASN1 syntax error') result = [result[0] // 40, result[0] % 40] + result[1:] result = list(map(str, result)) - return '.'.join(result) \ No newline at end of file + return '.'.join(result) diff --git a/src/main.py b/src/main.py index 3883227..4a5bc38 100644 --- a/src/main.py +++ b/src/main.py @@ -1,15 +1,16 @@ +#import _thread import sys # Needed when running on Linux to find imports in lib directory if sys.platform == 'linux': sys.path.append('./lib') -import ujson as json +import atclient +import logging import os +import sys import time -from atclient import * +import ujson as json -import logging log=logging.getLogger(__name__) -import _thread loading = False @@ -104,7 +105,7 @@ def main(): elif int(opt) == 2: if atRecipient != '' and atRecipient != 'NOT SET': print('Connecting to ' + atSign + "...") - atc = atClient(atsign=atSign, recipient=atRecipient) + atc = atclient.atClient(atsign=atSign, recipient=atRecipient) atServer,atPort=atc.discover() atc.connect(atServer,atPort) atc.authenticate(pkamKey) @@ -119,7 +120,7 @@ def main(): # init second thread to read from socket (monitor) global monitoring monitoring = True - read_thread = _thread.start_new_thread(atc.attalk_recv, ()) + #read_thread = _thread.start_new_thread(atc.attalk_recv, ()) print('To return to menu type: /exit\n') while True: # print(atSign+":",end='\r')