diff --git a/Pipfile b/Pipfile index 743d69e..118d257 100644 --- a/Pipfile +++ b/Pipfile @@ -7,6 +7,7 @@ name = "pypi" wyze-sdk = {path = "."} requests = "*" blackboxprotobuf = "*" +mintotp = "*" [dev-packages] flake8 = "*" diff --git a/setup.py b/setup.py index fa12bcb..1611889 100644 --- a/setup.py +++ b/setup.py @@ -53,7 +53,7 @@ "tests.*", ] ), - install_requires=["requests", "blackboxprotobuf"], + install_requires=["requests", "blackboxprotobuf", "mintotp"], setup_requires=pytest_runner, test_suite="tests", tests_require=validate_dependencies, diff --git a/wyze_sdk/api/client.py b/wyze_sdk/api/client.py index c1faf50..1bd145a 100644 --- a/wyze_sdk/api/client.py +++ b/wyze_sdk/api/client.py @@ -41,6 +41,7 @@ def __init__( self, email: Optional[str] = None, password: Optional[str] = None, + totp_key: Optional[str] = None, base_url: Optional[str] = None, timeout: int = 30, ): @@ -48,6 +49,8 @@ def __init__( self._email = email #: An unencrypted string specifying the account password. self._password = password + #: An unencrypted string specifying the TOTP Key for automatic TOTP 2FA verification code generation. + self._totp_key = totp_key #: An optional string representing the API base URL. **This should not be used except for when running tests.** self._base_url = base_url #: The maximum number of seconds the client will wait to connect and receive a response from Wyze. Defaults to 30 @@ -133,7 +136,7 @@ def login(self) -> WyzeResponse: if self._email is None or self._password is None: raise WyzeClientConfigurationError("must provide email and password") self._logger.debug(f"access token not provided, attempting to login as {self._email}") - response = self._auth_client().user_login(email=self._email, password=self._password) + response = self._auth_client().user_login(email=self._email, password=self._password, totp_key=self._totp_key) self._update_session(access_token=response["access_token"], refresh_token=response["refresh_token"], user_id=response["user_id"]) return response diff --git a/wyze_sdk/service/auth_service.py b/wyze_sdk/service/auth_service.py index 461b8db..315d226 100644 --- a/wyze_sdk/service/auth_service.py +++ b/wyze_sdk/service/auth_service.py @@ -2,6 +2,7 @@ from typing import Dict, Optional +from mintotp import totp from wyze_sdk.signature import RequestVerifier from .base import ExServiceClient, WyzeResponse @@ -61,11 +62,45 @@ def api_call( nonce=nonce, ) - def user_login(self, *, email: str, password: str, **kwargs) -> WyzeResponse: + def user_login(self, *, email: str, password: str, totp_key: Optional[str] = None, **kwargs) -> WyzeResponse: nonce = self.request_verifier.clock.nonce() + password = self.request_verifier.md5_string( + self.request_verifier.md5_string(self.request_verifier.md5_string(password)) + ) kwargs.update({ 'nonce': str(nonce), 'email': email, - 'password': self.request_verifier.md5_string(self.request_verifier.md5_string(self.request_verifier.md5_string(password))) + 'password': password }) - return self.api_call('/user/login', json=kwargs, nonce=nonce) + response = self.api_call('/user/login', json=kwargs, nonce=nonce) + if response['access_token']: + return response + + if 'TotpVerificationCode' in response.get('mfa_options'): + # TOTP 2FA + mfa_type = 'TotpVerificationCode' + if totp_key: + verification_code = totp(totp_key) + else: + verification_code = input('Enter Wyze 2FA Verification Code: ') + verification_id = response['mfa_details']['totp_apps'][0]['app_id'] + else: + # SMS 2FA + mfa_type = 'PrimaryPhone' + params = { + 'mfaPhoneType': 'Primary', + 'sessionId': response['sms_session_id'], + 'userId': response['user_id'] + } + payload = {} + response = self.api_call('/user/login/sendSmsCode', params=params, json=payload) + verification_id = response['session_id'] + verification_code = input('Enter Wyze SMS 2FA Verification Code: ') + payload = { + 'email': email, + 'password': password, + 'mfa_type': mfa_type, + 'verification_id': verification_id, + 'verification_code': verification_code + } + return self.api_call('/user/login', json=payload, nonce=self.request_verifier.clock.nonce()) diff --git a/wyze_sdk/service/base.py b/wyze_sdk/service/base.py index 7271a06..92c4162 100644 --- a/wyze_sdk/service/base.py +++ b/wyze_sdk/service/base.py @@ -100,7 +100,7 @@ def _do_request( status_code=response.status_code, ).validate() - def do_post(self, url: str, headers: dict, payload: dict) -> WyzeResponse: + def do_post(self, url: str, headers: dict, payload: dict, params: Optional[dict] = None) -> WyzeResponse: with requests.Session() as client: if headers is not None: # add the request-specific headers @@ -110,7 +110,7 @@ def do_post(self, url: str, headers: dict, payload: dict) -> WyzeResponse: # we have to use a prepared request because the requests module # doesn't allow us to specify the separators in our json dumping # and the server expects no extra whitespace - req = client.prepare_request(requests.Request('POST', url, json=payload)) + req = client.prepare_request(requests.Request('POST', url, json=payload, params=params)) self._logger.debug('unmodified prepared request') self._logger.debug(req) @@ -193,7 +193,7 @@ def api_call( headers.update(self.headers) if http_verb == "POST": - return self.do_post(url=api_url, headers=headers, payload=json) + return self.do_post(url=api_url, headers=headers, payload=json, params=params) elif http_verb == "GET": return self.do_get(url=api_url, headers=headers, payload=params)