From 2338f76fdda8d1538c5bcf4ab4adf41c345e51b0 Mon Sep 17 00:00:00 2001 From: Adeeb Shihadeh Date: Thu, 19 Oct 2023 14:54:11 -0700 Subject: [PATCH] eSIM profile management (#30262) * eSIM profile management * start download * loaded a profile * more stuff * fixups * fix linter --------- Co-authored-by: Comma Device --- system/hardware/tici/esim.py | 115 +++++++++++++++++++++++++++++++++++ 1 file changed, 115 insertions(+) create mode 100755 system/hardware/tici/esim.py diff --git a/system/hardware/tici/esim.py b/system/hardware/tici/esim.py new file mode 100755 index 00000000000000..df76c1a5fdccfb --- /dev/null +++ b/system/hardware/tici/esim.py @@ -0,0 +1,115 @@ +#!/usr/bin/env python3 +import os +import math +import time +import binascii +import requests +import serial +import subprocess + + +def post(url, payload): + print() + print("POST to", url) + r = requests.post( + url, + data=payload, + verify=False, + headers={ + "Content-Type": "application/json", + "X-Admin-Protocol": "gsma/rsp/v2.2.0", + "charset": "utf-8", + "User-Agent": "gsma-rsp-lpad", + }, + ) + print("resp", r) + print("resp text", repr(r.text)) + print() + r.raise_for_status() + + ret = f"HTTP/1.1 {r.status_code}" + ret += ''.join(f"{k}: {v}" for k, v in r.headers.items() if k != 'Connection') + return ret.encode() + r.content + + +class LPA: + def __init__(self): + self.dev = serial.Serial('/dev/ttyUSB2', baudrate=57600, timeout=1, bytesize=8) + self.dev.reset_input_buffer() + self.dev.reset_output_buffer() + assert "OK" in self.at("AT") + + def at(self, cmd): + print(f"==> {cmd}") + self.dev.write(cmd.encode() + b'\r\n') + + r = b"" + cnt = 0 + while b"OK" not in r and b"ERROR" not in r and cnt < 20: + r += self.dev.read(8192).strip() + cnt += 1 + r = r.decode() + print(f"<== {repr(r)}") + return r + + def download_ota(self, qr): + return self.at(f'AT+QESIM="ota","{qr}"') + + def download(self, qr): + smdp = qr.split('$')[1] + out = self.at(f'AT+QESIM="download","{qr}"') + for _ in range(5): + line = out.split("+QESIM: ")[1].split("\r\n\r\nOK")[0] + + parts = [x.strip().strip('"') for x in line.split(',', maxsplit=4)] + print(repr(parts)) + trans, ret, url, payloadlen, payload = parts + assert trans == "trans" and ret == "0" + assert len(payload) == int(payloadlen) + + r = post(f"https://{smdp}/{url}", payload) + to_send = binascii.hexlify(r).decode() + + chunk_len = 1400 + for i in range(math.ceil(len(to_send) / chunk_len)): + state = 1 if (i+1)*chunk_len < len(to_send) else 0 + data = to_send[i * chunk_len : (i+1)*chunk_len] + out = self.at(f'AT+QESIM="trans",{len(to_send)},{state},{i},{len(data)},"{data}"') + assert "OK" in out + + if '+QESIM:"download",1' in out: + raise Exception("profile install failed") + elif '+QESIM:"download",0' in out: + print("done, successfully loaded") + break + + def enable(self, iccid): + self.at(f'AT+QESIM="enable","{iccid}"') + + def disable(self, iccid): + self.at(f'AT+QESIM="disable","{iccid}"') + + def delete(self, iccid): + self.at(f'AT+QESIM="delete","{iccid}"') + + def list_profiles(self): + out = self.at('AT+QESIM="list"') + return out.strip().splitlines()[1:] + + +if __name__ == "__main__": + import sys + + if "RESTART" in os.environ: + subprocess.check_call("sudo systemctl stop ModemManager", shell=True) + subprocess.check_call("/usr/comma/lte/lte.sh stop_blocking", shell=True) + subprocess.check_call("/usr/comma/lte/lte.sh start", shell=True) + while not os.path.exists('/dev/ttyUSB2'): + time.sleep(1) + time.sleep(3) + + lpa = LPA() + print(lpa.list_profiles()) + if len(sys.argv) > 1: + lpa.download(sys.argv[1]) + print(lpa.list_profiles())