Skip to content

Commit

Permalink
Added 2FA support (#22)
Browse files Browse the repository at this point in the history
* Added 2FA support

* added requirement to setup.py

* Fixed POST params

* Actually made POST params optional by default to None

* Added 'Wyze' to 2FA input string

Co-authored-by: Shaun Tarves <[email protected]>
  • Loading branch information
jslay88 and Shaun Tarves authored May 22, 2021
1 parent 39b507d commit e10b5fd
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 8 deletions.
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ name = "pypi"
wyze-sdk = {path = "."}
requests = "*"
blackboxprotobuf = "*"
mintotp = "*"

[dev-packages]
flake8 = "*"
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"tests.*",
]
),
install_requires=["requests", "blackboxprotobuf"],
install_requires=["requests", "blackboxprotobuf", "mintotp"],
setup_requires=pytest_runner,
test_suite="tests",
tests_require=validate_dependencies,
Expand Down
5 changes: 4 additions & 1 deletion wyze_sdk/api/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,16 @@ def __init__(
self,
email: Optional[str] = None,
password: Optional[str] = None,
totp_key: Optional[str] = None,
base_url: Optional[str] = None,
timeout: int = 30,
):
#: A string specifying the account email address.
self._email = email
#: An unencrypted string specifying the account password.
self._password = password
#: An unencrypted string specifying the TOTP Key for automatic TOTP 2FA verification code generation.
self._totp_key = totp_key
#: An optional string representing the API base URL. **This should not be used except for when running tests.**
self._base_url = base_url
#: The maximum number of seconds the client will wait to connect and receive a response from Wyze. Defaults to 30
Expand Down Expand Up @@ -133,7 +136,7 @@ def login(self) -> WyzeResponse:
if self._email is None or self._password is None:
raise WyzeClientConfigurationError("must provide email and password")
self._logger.debug(f"access token not provided, attempting to login as {self._email}")
response = self._auth_client().user_login(email=self._email, password=self._password)
response = self._auth_client().user_login(email=self._email, password=self._password, totp_key=self._totp_key)
self._update_session(access_token=response["access_token"], refresh_token=response["refresh_token"], user_id=response["user_id"])
return response

Expand Down
41 changes: 38 additions & 3 deletions wyze_sdk/service/auth_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing import Dict, Optional

from mintotp import totp
from wyze_sdk.signature import RequestVerifier

from .base import ExServiceClient, WyzeResponse
Expand Down Expand Up @@ -61,11 +62,45 @@ def api_call(
nonce=nonce,
)

def user_login(self, *, email: str, password: str, **kwargs) -> WyzeResponse:
def user_login(self, *, email: str, password: str, totp_key: Optional[str] = None, **kwargs) -> WyzeResponse:
nonce = self.request_verifier.clock.nonce()
password = self.request_verifier.md5_string(
self.request_verifier.md5_string(self.request_verifier.md5_string(password))
)
kwargs.update({
'nonce': str(nonce),
'email': email,
'password': self.request_verifier.md5_string(self.request_verifier.md5_string(self.request_verifier.md5_string(password)))
'password': password
})
return self.api_call('/user/login', json=kwargs, nonce=nonce)
response = self.api_call('/user/login', json=kwargs, nonce=nonce)
if response['access_token']:
return response

if 'TotpVerificationCode' in response.get('mfa_options'):
# TOTP 2FA
mfa_type = 'TotpVerificationCode'
if totp_key:
verification_code = totp(totp_key)
else:
verification_code = input('Enter Wyze 2FA Verification Code: ')
verification_id = response['mfa_details']['totp_apps'][0]['app_id']
else:
# SMS 2FA
mfa_type = 'PrimaryPhone'
params = {
'mfaPhoneType': 'Primary',
'sessionId': response['sms_session_id'],
'userId': response['user_id']
}
payload = {}
response = self.api_call('/user/login/sendSmsCode', params=params, json=payload)
verification_id = response['session_id']
verification_code = input('Enter Wyze SMS 2FA Verification Code: ')
payload = {
'email': email,
'password': password,
'mfa_type': mfa_type,
'verification_id': verification_id,
'verification_code': verification_code
}
return self.api_call('/user/login', json=payload, nonce=self.request_verifier.clock.nonce())
6 changes: 3 additions & 3 deletions wyze_sdk/service/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ def _do_request(
status_code=response.status_code,
).validate()

def do_post(self, url: str, headers: dict, payload: dict) -> WyzeResponse:
def do_post(self, url: str, headers: dict, payload: dict, params: Optional[dict] = None) -> WyzeResponse:
with requests.Session() as client:
if headers is not None:
# add the request-specific headers
Expand All @@ -110,7 +110,7 @@ def do_post(self, url: str, headers: dict, payload: dict) -> WyzeResponse:
# we have to use a prepared request because the requests module
# doesn't allow us to specify the separators in our json dumping
# and the server expects no extra whitespace
req = client.prepare_request(requests.Request('POST', url, json=payload))
req = client.prepare_request(requests.Request('POST', url, json=payload, params=params))

self._logger.debug('unmodified prepared request')
self._logger.debug(req)
Expand Down Expand Up @@ -193,7 +193,7 @@ def api_call(
headers.update(self.headers)

if http_verb == "POST":
return self.do_post(url=api_url, headers=headers, payload=json)
return self.do_post(url=api_url, headers=headers, payload=json, params=params)
elif http_verb == "GET":
return self.do_get(url=api_url, headers=headers, payload=params)

Expand Down

0 comments on commit e10b5fd

Please sign in to comment.