diff --git a/smartcitizen_connector/_config/config.py b/smartcitizen_connector/_config/config.py index 69adcae..9d42fc7 100644 --- a/smartcitizen_connector/_config/config.py +++ b/smartcitizen_connector/_config/config.py @@ -31,7 +31,7 @@ class Config(): } _max_retries = 3 - _rety_interval = 10 + _retry_interval = 10 _retry_codes = [ HTTPStatus.TOO_MANY_REQUESTS, HTTPStatus.INTERNAL_SERVER_ERROR, diff --git a/smartcitizen_connector/device/device.py b/smartcitizen_connector/device/device.py index 92ddd6c..53f039d 100644 --- a/smartcitizen_connector/device/device.py +++ b/smartcitizen_connector/device/device.py @@ -6,6 +6,7 @@ from typing import Optional, List, Dict from requests import get, post, patch from aiohttp_retry import RetryClient, ExponentialRetry +from aiohttp import ClientResponseError, ClientResponse, ClientSession, ContentTypeError from pandas import DataFrame, to_datetime from datetime import datetime from os import environ @@ -15,7 +16,6 @@ from math import isnan from tqdm import trange from json import dumps, JSONEncoder, loads -import aiohttp import asyncio import time @@ -61,7 +61,7 @@ def check_postprocessing(postprocessing): if len(urls)>1: logger.warning('URLs for postprocessing recipe are more than one, trying first') tentative_url = urls[0] - logger.info(f'Device has postprocessing information:\n{_postprocessing}') + logger.info(f'Device has postprocessing information') _ok = True # Make hardware postprocessing @@ -128,7 +128,6 @@ def __load__(self): self.json = TypeAdapter(Device).validate_python(r.json()) if r.json()['hardware']['last_status_message'] != '[FILTERED]': logger.info('Device has status message') - print (r.json()['hardware']) if r.json()['hardware']['last_status_message'] is not None: self._last_status_message = TypeAdapter(HardwareStatus).validate_python(r.json()['hardware']['last_status_message']) else: @@ -193,15 +192,32 @@ def __make_properties__(self): async def get_datum(self, semaphore, session, url, headers, sensor_id, resample, frequency, rename)->Dict: async with semaphore: - retry_client = RetryClient(client_session=session) - async with retry_client.get(url, headers = headers) as resp: - datum = await resp.json() + async def evaluate_response(response: ClientResponse) -> bool: + try: + await response.json() + except: + return False + return True + + retry_options = ExponentialRetry(attempts=config._max_retries, + statuses= [code.numerator for code in config._retry_codes], + evaluate_response_callback=evaluate_response, + start_timeout=config._retry_interval, + exceptions={ClientResponseError}) + + retry_client = RetryClient(client_session=session, retry_options=retry_options) + + async with retry_client.get(url, headers = headers) as response: + + rdatum = await response.read() + datum = json.loads(rdatum) + sensor_name = find_by_field(self.json.data.sensors, sensor_id, 'id').name if 'readings' not in datum: logger.warning(f"Device: {self.json.id}- No readings in request for sensor: {sensor_id}: {sensor_name}") - logger.warning(f"Response code: {resp}") + logger.warning(f"Response code: {response}") return None if datum['readings'] == []: @@ -294,7 +310,7 @@ async def get_data(self, logger.info(f'Requesting device {self.id} from {min_date} to {max_date}') semaphore = asyncio.Semaphore(config._max_concurrent_requests) - async with aiohttp.ClientSession() as session: + async with ClientSession() as session: tasks = [] for sensor in self.json.data.sensors: @@ -394,7 +410,7 @@ async def post_data(self, columns = 'sensors', rename = None, clean_na = 'drop', else: _rename = rename - async with aiohttp.ClientSession() as session: + async with ClientSession() as session: tasks = [] for column in _columns: diff --git a/smartcitizen_connector/tools/tools.py b/smartcitizen_connector/tools/tools.py index a321c61..b456b45 100644 --- a/smartcitizen_connector/tools/tools.py +++ b/smartcitizen_connector/tools/tools.py @@ -190,7 +190,6 @@ def safe_get(url, headers = None): if code in config._retry_codes: time.sleep(config._retry_interval) continue - raise else: break