diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py index c82a936076746..0da3f1f061c18 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte/resources.py @@ -28,6 +28,10 @@ DEFAULT_POLL_INTERVAL_SECONDS = 10 +# The access token expire every 3 minutes in Airbyte Cloud. +# Refresh is needed after 2.5 minutes to avoid the "token expired" error message. +AIRBYTE_CLOUD_REFRESH_TIMEDELTA_SECONDS = 150 + class AirbyteState: RUNNING = "running" @@ -303,6 +307,23 @@ def all_additional_request_params(self) -> Mapping[str, Any]: } } + def make_request( + self, + endpoint: str, + data: Optional[Mapping[str, object]] = None, + method: str = "POST", + include_additional_request_params: bool = True, + ) -> Optional[Mapping[str, object]]: + # Make sure the access token is refreshed before using it when calling the API. + if include_additional_request_params and self._needs_refreshed_access_token(): + self._refresh_access_token() + return super().make_request( + endpoint=endpoint, + data=data, + method=method, + include_additional_request_params=include_additional_request_params, + ) + def start_sync(self, connection_id: str) -> Mapping[str, object]: job_sync = check.not_none( self.make_request( @@ -346,13 +367,13 @@ def _refresh_access_token(self) -> None: self._access_token_timestamp = datetime.now().timestamp() def _needs_refreshed_access_token(self) -> bool: - # The access token expire every 3 minutes in Airbyte Cloud. - # Refresh is needed after 2.5 minutes to avoid the "token expired" error message. return ( not self._access_token_value or not self._access_token_timestamp or self._access_token_timestamp - <= datetime.timestamp(datetime.now() - timedelta(seconds=150)) + <= datetime.timestamp( + datetime.now() - timedelta(seconds=AIRBYTE_CLOUD_REFRESH_TIMEDELTA_SECONDS) + ) )