diff --git a/src/rivian/exceptions.py b/src/rivian/exceptions.py index 530644d..0a61bbd 100644 --- a/src/rivian/exceptions.py +++ b/src/rivian/exceptions.py @@ -1,2 +1,17 @@ class RivianExpiredTokenError(Exception): - """Access Token Expired Error""" \ No newline at end of file + """Access Token Expired Error""" + +class RivianUnauthenticated(Exception): + """User Token Invalid Error""" + +class RivianInvalidCredentials(Exception): + """Invalid User Credentials - Check Username and Password""" + +class RivianInvalidOTP(Exception): + """User's One Time Password Invalid - Try Again""" + +class RivianDataError(Exception): + """Rivian Server Data Error""" + +class RivianTemporarilyLockedError(Exception): + """Rivian User Temporarily Locked Error""" \ No newline at end of file diff --git a/src/rivian/rivian.py b/src/rivian/rivian.py index bdf47e5..922c9c6 100644 --- a/src/rivian/rivian.py +++ b/src/rivian/rivian.py @@ -7,6 +7,7 @@ import json import socket import random +import uuid import logging @@ -14,7 +15,7 @@ import async_timeout from yarl import URL -from rivian.exceptions import RivianExpiredTokenError +from rivian.exceptions import * _LOGGER = logging.getLogger(__name__) @@ -54,6 +55,7 @@ def __init__( self.request_timeout = request_timeout self._otp_needed = False + self._otp_token = "" async def authenticate( self, @@ -344,6 +346,7 @@ async def authenticate_graphql( "Csrf-Token": self._csrf_token, "A-Sess": self._app_session_token, "Apollographql-Client-Name": "com.rivian.ios.consumer-apollo-ios", + "Dc-Cid": f"m-ios-{uuid.uuid4()}", } ) @@ -356,9 +359,53 @@ async def authenticate_graphql( response = await self.__graphql_query(headers, url, graphql_json) response_json = await response.json() - self._access_token = response_json["data"]["login"]["accessToken"] - self._refresh_token = response_json["data"]["login"]["refreshToken"] - self._user_session_token = response_json["data"]["login"]["userSessionToken"] + + login_data = response_json["data"]["login"] + + if "otpToken" in login_data: + self._otp_needed = True + self._otp_token = login_data["otpToken"] + else: + self._access_token = login_data["accessToken"] + self._refresh_token = login_data["refreshToken"] + self._user_session_token = login_data["userSessionToken"] + + return response + + async def validate_otp_graphql(self, username: str, otpCode: str) -> dict[str, Any]: + """Validates OTP against the Rivian GraphQL API with Username, OTP Code, and OTP Token""" + + url = GRAPHQL_GATEWAY + + headers = dict() + headers.update(BASE_HEADERS) + headers.update( + { + "Csrf-Token": self._csrf_token, + "A-Sess": self._app_session_token, + "Apollographql-Client-Name": "com.rivian.ios.consumer-apollo-ios", + } + ) + + graphql_json = { + "operationName": "LoginWithOTP", + "query": "mutation LoginWithOTP($email: String!, $otpCode: String!, $otpToken: String!) {\n loginWithOTP(email: $email, otpCode: $otpCode, otpToken: $otpToken) {\n __typename\n ... on MobileLoginResponse {\n __typename\n accessToken\n refreshToken\n userSessionToken\n }\n }\n}", + "variables": { + "email": username, + "otpCode": otpCode, + "otpToken": self._otp_token, + }, + } + + response = await self.__graphql_query(headers, url, graphql_json) + + response_json = await response.json() + + login_data = response_json["data"]["loginWithOTP"] + + self._access_token = login_data["accessToken"] + self._refresh_token = login_data["refreshToken"] + self._user_session_token = login_data["userSessionToken"] return response @@ -499,23 +546,58 @@ async def __graphql_query(self, headers: dict(str, str), url: str, body: str): "Error occurred while communicating with Rivian." ) from exception - if response.status != 200: - body = await response.text() - raise Exception( - "Error occurred while reading the graphql response from Rivian.", - response, - response.status, - body, - ) - - response_json = await response.json() - if "errors" in response_json: - raise Exception( - "Error occurred while reading the graphql response from Rivian.", - response, - response.status, - response_json, - ) + try: + response_json = await response.json() + if "errors" in response_json: + for e in response_json["errors"]: + extensions = e["extensions"] + if extensions["code"] == "UNAUTHENTICATED": + raise RivianUnauthenticated( + response.status, + response_json, + headers, + body, + ) + elif extensions["code"] == "DATA_ERROR": + raise RivianDataError( + response.status, + response_json, + headers, + body, + ) + elif extensions["code"] == "BAD_CURRENT_PASSWORD": + raise RivianInvalidCredentials( + response.status, + response_json, + headers, + body, + ) + elif ( + extensions["code"] == "BAD_USER_INPUT" + and extensions["reason"] == "INVALID_OTP" + ): + raise RivianInvalidOTP( + response.status, + response_json, + headers, + body, + ) + elif extensions["code"] == "SESSION_MANAGER_ERROR": + raise RivianTemporarilyLockedError( + response.status, + response_json, + headers, + body, + ) + raise Exception( + "Error occurred while reading the graphql response from Rivian.", + response.status, + response_json, + headers, + body, + ) + except Exception as exception: + raise exception return response