From f8c01a0d496df66e85d2a68900cd7c23ec5d8f29 Mon Sep 17 00:00:00 2001 From: linuxdaemon Date: Sun, 15 Mar 2020 23:28:52 -0400 Subject: [PATCH] Update cryptocurrency plugin to new CMC API --- CHANGELOG.md | 1 + config.default.json | 3 +- plugins/cryptocurrency.py | 660 +++++++++++++++++----- tests/conftest.py | 13 + tests/plugin_tests/test_cryptocurrency.py | 416 ++++++++++++++ tests/util/__init__.py | 3 +- 6 files changed, 962 insertions(+), 134 deletions(-) create mode 100644 tests/plugin_tests/test_cryptocurrency.py diff --git a/CHANGELOG.md b/CHANGELOG.md index acff0d550..d6a579753 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Add the factoid character in the listfacts output ### Changed - Cleaned up the timeformat API and implementation (#32) +- Updated cryptocurrency to new CoinMarketCap API ### Fixed - Disconnect active vs configured channel lists (#11) - Fix reminder tests time based errors diff --git a/config.default.json b/config.default.json index d7aa3e3c4..6553b0269 100644 --- a/config.default.json +++ b/config.default.json @@ -109,7 +109,8 @@ "lyricsnmusic": "", "cleverbot": "", "brewerydb": "", - "alphavantage": "" + "alphavantage": "", + "coinmarketcap": "" }, "database": "sqlite:///cloudbot.db", "plugin_loading": { diff --git a/plugins/cryptocurrency.py b/plugins/cryptocurrency.py index 50e9f486a..9cc979d7e 100644 --- a/plugins/cryptocurrency.py +++ b/plugins/cryptocurrency.py @@ -9,149 +9,548 @@ License: GPL v3 """ -from collections import defaultdict -from datetime import datetime, timedelta +import inspect +import time +import warnings +from numbers import Number from operator import itemgetter from threading import RLock +from typing import Any, Dict, List, Optional, Type, TypeVar import requests -from requests import Session +from requests import Response from yarl import URL from cloudbot import hook from cloudbot.util import colors, web from cloudbot.util.func_utils import call_with_args -CURRENCY_SYMBOLS = { - 'USD': '$', - 'GBP': '£', - 'EUR': '€', -} - class APIError(Exception): + def __init__(self, msg: str): + super().__init__(msg) + self.msg = msg + + +class UnknownSymbolError(APIError): + def __init__(self, name: str): + super().__init__(name) + self.name = name + + +class UnknownFiatCurrencyError(APIError): + def __init__(self, name: str): + super().__init__(name) + self.name = name + + +class APIResponse: + def __init__(self, api, data: 'UntypedResponse', response: Response) -> None: + self.api = api + self.data = data + self.response = response + + @classmethod + def from_response(cls, api: 'CoinMarketCapAPI', response: Response): + return cls(api, read_data(response.json(), UntypedResponse), response) + + +class SchemaField: + empty = object() + + def __init__(self, name: str, field_type: Type, default=empty): + self.name = name + self.field_type = field_type + self.default = default + + +def _get_fields(init_func): + signature = inspect.signature(init_func) + for parameter in signature.parameters.values(): + if parameter.annotation is parameter.empty: + continue + + if parameter.default is parameter.empty: + default = SchemaField.empty + else: + default = parameter.default + + yield SchemaField(parameter.name, parameter.annotation, default) + + +class SchemaMeta(type): + def __new__(cls, name, bases, members): + if members.setdefault('_abstract', False): + super_fields = tuple() + for base in bases: + if not getattr(base, '_abstract', False) and isinstance(base, cls): + super_fields = getattr(base, '_fields') + break + + members['_fields'] = super_fields + else: + members['_fields'] = tuple(_get_fields(members['__init__'])) + + return type.__new__(cls, name, bases, members) + + +T = TypeVar('T') + + +class Schema(metaclass=SchemaMeta): + # noinspection PyUnusedName + _abstract = True + + def __init__(self): + self.unknown_fields = {} + + def cast_to(self, new_type: Type[T]) -> T: + return read_data(serialize(self), new_type) + + +class ResponseStatus(Schema): + def __init__( + self, + timestamp: str, + error_code: int, + elapsed: int, + credit_count: int, + error_message: str = None, + notice: str = None, + ): + super().__init__() + self.timestamp = timestamp + self.error_code = error_code + self.error_message = error_message + self.elapsed = elapsed + self.credit_count = credit_count + self.notice = notice + + +class APIRequestResponse(Schema): + def __init__(self, status: ResponseStatus): + super().__init__() + self.status = status + + +class UntypedResponse(APIRequestResponse): + def __init__(self, data: Any, status: ResponseStatus): + super().__init__(status) + self.data = data + + +class Platform(Schema): + # noinspection PyShadowingBuiltins + def __init__(self, id: int, name: str, symbol: str, slug: str, token_address: str): + super().__init__() + self.id = id + self.name = name + self.symbol = symbol + self.slug = slug + self.token_address = token_address + + +class Quote(Schema): + def __init__( + self, + price: Number, + volume_24h: Number, + market_cap: Number, + percent_change_1h: Number, + percent_change_24h: Number, + percent_change_7d: Number, + last_updated: str, + volume_24h_reported: Number = None, + volume_7d: Number = None, + volume_7d_reported: Number = None, + volume_30d: Number = None, + volume_30d_reported: Number = None, + ): + super().__init__() + self.price = price + self.volume_24h = volume_24h + self.volume_24h_reported = volume_24h_reported + self.volume_7d = volume_7d + self.volume_7d_reported = volume_7d_reported + self.volume_30d = volume_30d + self.volume_30d_reported = volume_30d_reported + self.market_cap = market_cap + self.percent_change_1h = percent_change_1h + self.percent_change_24h = percent_change_24h + self.percent_change_7d = percent_change_7d + self.last_updated = last_updated + + +class CryptoCurrency(Schema): + # noinspection PyShadowingBuiltins + def __init__( + self, + id: int, + name: str, + symbol: str, + slug: str, + circulating_supply: Number, + total_supply: Number, + date_added: str, + num_market_pairs: int, + cmc_rank: int, + last_updated: str, + tags: List[str], + quote: Dict[str, Quote], + max_supply: Number = None, + market_cap_by_total_supply: Number = None, + platform: Platform = None, + ): + super().__init__() + self.id = id + self.name = name + self.symbol = symbol + self.slug = slug + self.circulating_supply = circulating_supply + self.total_supply = total_supply + self.max_supply = max_supply + self.market_cap_by_total_supply = market_cap_by_total_supply + self.date_added = date_added + self.num_market_pairs = num_market_pairs + self.cmc_rank = cmc_rank + self.last_updated = last_updated + self.tags = tags + self.platform = platform + self.quote = quote + + +class QuoteRequestResponse(APIRequestResponse): + def __init__(self, data: Dict[str, CryptoCurrency], status: ResponseStatus): + super().__init__(status) + self.data = data + + +class FiatCurrency(Schema): + # noinspection PyShadowingBuiltins + def __init__(self, id: int, name: str, sign: str, symbol: str): + super().__init__() + self.id = id + self.name = name + self.sign = sign + self.symbol = symbol + + +class FiatCurrencyMap(APIRequestResponse): + def __init__(self, data: List[FiatCurrency], status: ResponseStatus): + super().__init__(status) + self.data = data + + self.symbols = {currency.symbol: currency.sign for currency in self.data} + + +class CryptoCurrencyEntry(Schema): + # noinspection PyShadowingBuiltins + def __init__( + self, + id: int, + name: str, + symbol: str, + slug: str, + is_active: int, + first_historical_data: str = None, + last_historical_data: str = None, + platform: Platform = None, + status: str = None, + ) -> None: + super().__init__() + self.id = id + self.name = name + self.symbol = symbol + self.slug = slug + self.is_active = is_active + self.status = status + self.first_historical_data = first_historical_data + self.last_historical_data = last_historical_data + self.platform = platform + + +class CryptoCurrencyMap(APIRequestResponse): + def __init__(self, data: List[CryptoCurrencyEntry], status: ResponseStatus): + super().__init__(status) + self.data = data + + self.names = set(currency.symbol for currency in self.data) + + +BAD_FIELD_TYPE_MSG = "field {field!r} expected type {exp_type!r}, got type {act_type!r}" + + +def sentinel(name: str): + try: + storage = getattr(sentinel, '_sentinels') + except AttributeError: + storage = {} + setattr(sentinel, '_sentinels', storage) + + try: + return storage[name] + except KeyError: + storage[name] = obj = object() + return obj + + +_unset = sentinel("unset") + + +class TypeAssertError(TypeError): + def __init__(self, obj, cls): + super().__init__() + self.cls = cls + self.obj = obj + + +class MissingSchemaField(KeyError): pass -class APIRateLimitError(APIError): +class ParseError(ValueError): pass -class TickerNotFound(APIError): - def __init__(self, name): - super().__init__(name) - self.currency = name +def _assert_type(obj, cls, display_cls=_unset): + if display_cls is _unset: + display_cls = cls + if not isinstance(obj, cls): + raise TypeAssertError(obj, display_cls) -class CurrencyConversionError(APIError): - def __init__(self, in_name, out_name): - super().__init__(in_name, out_name) - self.in_name = in_name - self.out_name = out_name +def _hydrate_object(_value, _cls): + if _cls is Any: + return _value -class CMCApi: - def __init__(self, user_agent=None, url="https://api.coinmarketcap.com/v1"): - self.url = URL(url) - self._request_times = [] + if isinstance(_cls, type) and issubclass(_cls, Schema): + _assert_type(_value, dict) + return read_data(_value, _cls) - self._cache = defaultdict(dict) - self._lock = RLock() - self._now = datetime.now() + try: + typing_cls = _cls.__origin__ + except AttributeError: + pass + else: + type_args = _cls.__args__ + if issubclass(typing_cls, list): + _assert_type(_value, list, _cls) - self._session = Session() + return [_hydrate_object(v, type_args[0]) for v in _value] - self.set_user_agent(user_agent) + if issubclass(typing_cls, dict): + _assert_type(_value, dict, _cls) - def set_user_agent(self, user_agent=None): - if user_agent is None: - user_agent = requests.utils.default_user_agent() + return { + _hydrate_object(k, type_args[0]): _hydrate_object(v, type_args[1]) + for k, v in _value.items() + } - with self._lock: - self._session.headers['User-Agent'] = user_agent + raise TypeError("Can't match typing alias {!r}".format(typing_cls)) # pragma: no cover - def close(self): - self._session.close() + _assert_type(_value, _cls) - def _request(self, endpoint, params=None): - self._request_times[:] = [t for t in self._request_times if (self._now - t) < timedelta(minutes=1)] - if len(self._request_times) > 10: - raise APIRateLimitError + return _value - with self._session.get(self.url / endpoint, params=params) as response: - self._request_times.append(self._now) - response.raise_for_status() - return response.json() - def _update(self, key, obj): - old_obj = self._cache[key.lower()] - if old_obj.get("last_updated") != obj["last_updated"]: - old_obj.clear() +def read_data(data: Dict, schema_cls: Type[T]) -> T: + fields = schema_cls._fields - old_obj.update(obj) + out = {} + field_names = [] - def _handle_obj(self, *objs): - with self._lock: - for obj in objs: - self._update(obj["id"], obj) - self._update(obj["symbol"], obj) - - def _get_currency_data(self, id_or_symbol, out_currency="USD"): - self._now = datetime.now() - old_data = self._cache[id_or_symbol.lower()] - _id = old_data.get("id", id_or_symbol) - last_updated = datetime.fromtimestamp(float(old_data.get('last_updated', "0"))) - diff = self._now - last_updated - price_key = "price_" + out_currency.lower() - if diff > timedelta(minutes=5) or price_key not in old_data: - responses = self._request("ticker/" + _id.lower(), params={'limit': 0, 'convert': out_currency}) - self._handle_obj(*responses) - data = self._cache[id_or_symbol.lower()] - last_updated = datetime.fromtimestamp(float(data.get('last_updated', "0"))) - diff = self._now - last_updated - if diff > timedelta(days=2): - raise TickerNotFound(id_or_symbol) - elif price_key not in data: - raise CurrencyConversionError(data["symbol"], out_currency) - - return self._cache[id_or_symbol.lower()] - - return old_data - - def update_cache(self): + for schema_field in fields: # type: SchemaField + try: + param_type = schema_field.field_type + name = schema_field.name + field_names.append(name) + try: + value = data[name] + except KeyError: + if schema_field.default is schema_field.empty: + raise MissingSchemaField(name) + + value = schema_field.default + + if value is None and schema_field.default is None: + out[name] = value + continue + + try: + out[name] = _hydrate_object(value, param_type) + except TypeAssertError as e: + raise TypeError( + BAD_FIELD_TYPE_MSG.format( + field=name, exp_type=e.cls, act_type=type(e.obj) + ) + ) from e + except (MissingSchemaField, TypeAssertError, ParseError) as e: + raise ParseError( + "Unable to parse schema {!r}".format(schema_cls.__name__) + ) from e + + obj = schema_cls(**out) + + obj.unknown_fields.update( + {key: data[key] for key in data if key not in field_names} + ) + + if obj.unknown_fields: + warnings.warn( + "Unknown fields: {} while parsing schema {!r}".format( + list(obj.unknown_fields.keys()), schema_cls.__name__ + ) + ) + + return obj + + +def serialize(obj): + if isinstance(obj, Schema): + out = {} + for field in obj._fields: # type: SchemaField + val = getattr(obj, field.name) + out[field.name] = serialize(val) + + if obj.unknown_fields: + out.update(obj.unknown_fields) + + return out + + if isinstance(obj, list): + return [serialize(o) for o in obj] + + if isinstance(obj, dict): + return {k: serialize(v) for k, v in obj.items()} + + return obj + + +class CacheEntry: + def __init__(self, value, expire): + self.value = value + self.expire = expire + + +class Cache: + def __init__(self, lock_cls=RLock): + self._data = {} + self._lock = lock_cls() + + def clear(self): + self._data.clear() + + def put(self, key, value, ttl) -> CacheEntry: with self._lock: - self._now = datetime.now() - data = self._request("ticker", params={'limit': 0}) - self._handle_obj(*data) + self._data[key] = out = CacheEntry(value, time.time() + ttl) + return out - def get_currency_data(self, id_or_symbol, out_currency="USD"): + def get(self, key: str) -> Optional[CacheEntry]: with self._lock: - data = self._get_currency_data(id_or_symbol, out_currency) - return data + try: + entry = self._data[key] + except KeyError: + return None + + if time.time() >= entry.expire: + del self._data[key] + return None + + return entry + + +class CoinMarketCapAPI: + def __init__( + self, + api_key: str = None, + api_url: str = 'https://pro-api.coinmarketcap.com/v1/', + ) -> None: + self.api_key = api_key + self.api_url = URL(api_url) + self.cache = Cache() @property - def currencies(self): - return self._cache.values() + def request_headers(self): + return { + 'Accepts': 'application/json', + 'X-CMC_PRO_API_KEY': self.api_key, + } + + def get_currency_sign(self, currency: str) -> str: + return self.get_fiat_currency_map().symbols[currency] + + def get_quote(self, symbol: str, currency: str = 'USD') -> CryptoCurrency: + symbol = symbol.upper() + if symbol not in self.get_crypto_currency_map().names: + raise UnknownSymbolError(symbol) + + if currency not in self.get_fiat_currency_map().symbols: + raise UnknownFiatCurrencyError(currency) + + data = self.request( + 'cryptocurrency/quotes/latest', + symbol=symbol.upper(), + convert="{},BTC".format(currency), + ).data.cast_to(QuoteRequestResponse) + _, out = data.data.popitem() + return out + + def get_fiat_currency_map(self) -> FiatCurrencyMap: + return self._request_cache( + "fiat_currency_map", 'fiat/map', FiatCurrencyMap, 86400 + ) + + def get_crypto_currency_map(self) -> CryptoCurrencyMap: + return self._request_cache( + "crypto_currency_map", 'cryptocurrency/map', CryptoCurrencyMap, 86400 + ) + + def _request_cache(self, name: str, endpoint: str, fmt: Type[T], ttl: int) -> T: + out = self.cache.get(name) + if out is None: + currencies = self.request(endpoint).data.cast_to(fmt) + out = self.cache.put(name, currencies, ttl) + + return out.value + + def request(self, endpoint: str, **params) -> APIResponse: + url = str(self.api_url / endpoint) + with requests.get(url, headers=self.request_headers, params=params) as response: + api_response = APIResponse.from_response(self, response) + self.check(api_response) + + return api_response + + def check(self, response: APIResponse) -> None: + msg = response.data.status.error_message + if msg: + raise APIError(msg) -api = CMCApi() +api = CoinMarketCapAPI() + + +@hook.onload +def init_api(bot): + api.api_key = bot.config.get_api_key('coinmarketcap') class Alias: __slots__ = ("name", "cmds") - def __init__(self, name, *cmds): - self.name = name - if name not in cmds: - cmds = (name,) + cmds + def __init__(self, symbol, *cmds): + self.name = symbol + if symbol not in cmds: + cmds += (symbol,) self.cmds = cmds ALIASES = ( - Alias('bitcoin', 'btc'), - Alias('litecoin', 'ltc'), - Alias('dogecoin', 'doge'), + Alias('btc', 'bitcoin'), + Alias('ltc', 'litecoin'), + Alias('doge', 'dogecoin'), ) @@ -172,64 +571,61 @@ def init_aliases(): globals()[_hook.__name__] = hook.command(*alias.cmds, autohelp=False)(_hook) -@hook.onload -@hook.periodic(3600) -def update_cache(bot): - api.set_user_agent(bot.user_agent) - api.update_cache() - - -@hook.on_unload -def close_api(): - api.close() - - # main command @hook.command("crypto", "cryptocurrency") -def crypto_command(text): - """ [currency] - Returns current value of a cryptocurrency""" +def crypto_command(text, event): + """ [currency] - Returns current value of a cryptocurrency""" args = text.split() ticker = args.pop(0) - if not args: - currency = 'USD' - else: + if args: currency = args.pop(0).upper() + else: + currency = 'USD' try: - data = api.get_currency_data(ticker, currency.upper()) - except TickerNotFound as e: - return "Unable to find ticker for '{}'".format(e.currency) - except CurrencyConversionError as e: - return "Unable to convert '{}' to '{}'".format(e.in_name, e.out_name) - except APIRateLimitError: - return "API rate limit reached, please try again later" - - change = float(data['percent_change_24h']) + data = api.get_quote(ticker, currency) + except UnknownFiatCurrencyError as e: + return "Unknown fiat currency {!r}".format(e.name) + except UnknownSymbolError as e: + return "Unknown cryptocurrency {!r}".format(e.name) + except APIError: + event.reply("Unknown API error") + raise + + quote = data.quote[currency] + btc_quote = data.quote['BTC'] + change = quote.percent_change_24h if change > 0: - change_str = "$(dark_green) {}%$(clear)".format(change) + change_str = "$(dark_green)+{}%$(clear)".format(change) elif change < 0: - change_str = "$(dark_red) {}%$(clear)".format(change) + change_str = "$(dark_red){}%$(clear)".format(change) else: change_str = "{}%".format(change) - currency_sign = CURRENCY_SYMBOLS.get(currency, '') - - converted_value = data['price_' + currency.lower()] + currency_sign = api.get_currency_sign(currency) - return colors.parse("{} ({}) // $(orange){}{:,.2f}$(clear) {} - {:,.7f} BTC // {} change".format( - data['symbol'], data['id'], currency_sign, float(converted_value), currency.upper(), - float(data['price_btc']), change_str - )) + return colors.parse( + "{} ({}) // $(orange){}{:,.2f}$(clear) {} - {:,.7f} BTC // {} change".format( + data.symbol, + data.slug, + currency_sign, + quote.price, + currency, + btc_quote.price, + change_str, + ) + ) @hook.command("currencies", "currencylist", autohelp=False) def currency_list(): """- List all available currencies from the API""" - currencies = sorted(set((obj["symbol"], obj["id"]) for obj in api.currencies), key=itemgetter(0)) - lst = [ - '{: <10} {}'.format(symbol, name) for symbol, name in currencies - ] + currency_map = api.get_crypto_currency_map() + currencies = sorted( + set((obj.symbol, obj.name) for obj in currency_map.data), key=itemgetter(0) + ) + lst = ['{: <10} {}'.format(symbol, name) for symbol, name in currencies] lst.insert(0, 'Symbol Name') return "Available currencies: " + web.paste('\n'.join(lst)) diff --git a/tests/conftest.py b/tests/conftest.py index 73c707f43..1332301ff 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -1,8 +1,11 @@ import datetime +from unittest.mock import MagicMock import freezegun import pytest +from cloudbot.bot import bot + @pytest.fixture() def freeze_time(): @@ -14,3 +17,13 @@ def freeze_time(): with freezegun.freeze_time(dt, tz) as ft: yield ft + + +@pytest.fixture() +def mock_api_keys(): + try: + bot.set(MagicMock()) + bot.config.get_api_key.return_value = "APIKEY" + yield + finally: + bot.set(None) diff --git a/tests/plugin_tests/test_cryptocurrency.py b/tests/plugin_tests/test_cryptocurrency.py new file mode 100644 index 000000000..84700b807 --- /dev/null +++ b/tests/plugin_tests/test_cryptocurrency.py @@ -0,0 +1,416 @@ +from datetime import datetime, timedelta +from typing import Dict, List +from unittest.mock import MagicMock + +import pytest +from responses import Response + +from cloudbot.bot import bot +from cloudbot.event import CommandEvent +from plugins import cryptocurrency +from tests.util import HookResult, wrap_hook_response + + +def test_parse(): + assert cryptocurrency.ResponseStatus._fields != cryptocurrency.Quote._fields + cryptocurrency.Platform( + id=1, name='name', symbol='symbol', slug='slug', token_address='token', + ) + assert len(cryptocurrency.Platform._fields) == 5 + data = { + 'status': { + 'timestamp': 'ts', + 'error_code': 200, + 'error_message': None, + 'elapsed': 1, + 'credit_count': 1, + 'notice': None, + } + } + + obj = cryptocurrency.read_data(data, cryptocurrency.APIRequestResponse) + assert obj.status.credit_count == 1 + assert cryptocurrency.serialize(obj) == data + + +class MatchAPIKey(Response): + def __init__(self, method, url, api_key=None, **kwargs): + super().__init__(method, url, **kwargs) + self.api_key = api_key + + def matches(self, request): + if self.api_key: + assert request.headers['X-CMC_PRO_API_KEY'] == self.api_key + + return super().matches(request) + + +def init_response( + mock_requests, + crypto_map=True, + fiat_map=True, + quote=True, + error_msg=None, + check_api_key=False, + pct_change=18.9, +): + if check_api_key: + cryptocurrency.init_api(bot.get()) + + cryptocurrency.api.cache.clear() + now = datetime.now() + + iso_fmt = '%Y-%m-%dT%H:%M:%S.%f%z' + + if crypto_map: + mock_requests.add( + MatchAPIKey( + 'GET', + 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/map', + api_key='APIKEY' if check_api_key else None, + json={ + 'status': { + 'timestamp': now.strftime(iso_fmt), + 'error_code': 200, + 'elapsed': 1, + 'credit_count': 1, + }, + 'data': [ + { + 'id': 1, + 'name': 'bitcoin', + 'symbol': 'BTC', + 'slug': 'bitcoin', + 'is_active': 1, + }, + ], + }, + ) + ) + + if fiat_map: + mock_requests.add( + MatchAPIKey( + 'GET', + 'https://pro-api.coinmarketcap.com/v1/fiat/map', + api_key='APIKEY' if check_api_key else None, + json={ + 'status': { + 'timestamp': now.strftime(iso_fmt), + 'error_code': 200, + 'elapsed': 1, + 'credit_count': 1, + }, + 'data': [{'id': 1, 'name': 'Dollar', 'sign': '$', 'symbol': 'USD'}], + }, + ) + ) + + if quote: + mock_requests.add( + MatchAPIKey( + 'GET', + 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/quotes/latest?symbol=BTC&convert=USD%2CBTC', + api_key='APIKEY' if check_api_key else None, + json={ + 'status': { + 'timestamp': now.strftime(iso_fmt), + 'error_code': 400 if error_msg else 200, + 'error_message': error_msg, + 'elapsed': 1, + 'credit_count': 1, + }, + 'data': { + '1': { + 'id': 1, + 'name': 'Bitcoin', + 'slug': 'bitcoin', + 'symbol': 'BTC', + 'circulating_supply': 100, + 'total_supply': 1000, + 'date_added': (now - timedelta(days=5)).strftime(iso_fmt), + 'num_market_pairs': 1, + 'cmc_rank': 1, + 'last_updated': (now - timedelta(hours=1)).strftime( + iso_fmt + ), + 'tags': [], + 'quote': { + 'BTC': { + 'price': 2, + 'volume_24h': 5, + 'market_cap': 97, + 'percent_change_1h': 12.5, + 'percent_change_24h': 17.4, + 'percent_change_7d': 54.1, + 'last_updated': ( + now - timedelta(minutes=6) + ).strftime(iso_fmt), + }, + 'USD': { + 'price': 50000000000, + 'volume_24h': 20, + 'market_cap': 92, + 'percent_change_1h': 14.5, + 'percent_change_24h': pct_change, + 'percent_change_7d': 24.5, + 'last_updated': ( + now - timedelta(minutes=3) + ).strftime(iso_fmt), + }, + }, + }, + }, + }, + ) + ) + + +def test_api(mock_requests, mock_api_keys): + init_response(mock_requests, check_api_key=True) + + result = cryptocurrency.api.get_quote('BTC', 'USD') + + assert result.name == 'Bitcoin' + assert not result.unknown_fields + assert result.total_supply == 1000 + assert result.circulating_supply == 100 + + +class SomeSchema(cryptocurrency.Schema): + def __init__(self, a: List[List[Dict[str, List[str]]]]): + super().__init__() + self.a = a + + +def test_schema(): + cryptocurrency.read_data({'a': [[{'a': ['1']}]]}, SomeSchema) + + +class ConcreteSchema(cryptocurrency.Schema): + def __init__(self, a: str) -> None: + super().__init__() + self.a = a + + +class AbstractSchema(ConcreteSchema): + _abstract = True + + +class OtherConcreteSchema(AbstractSchema): + def __init__(self, a: str, b: str): + super().__init__(a) + self.b = b + + +def test_complex_schema(): + cryptocurrency.read_data({'a': 'hello', 'b': 'world'}, OtherConcreteSchema) + + +def test_invalid_schema_type(): + with pytest.raises( + TypeError, match="field 'a' expected type , got type " + ): + cryptocurrency.read_data({'a': 1, 'b': 'world'}, OtherConcreteSchema) + + +def test_schema_missing_field(): + with pytest.raises(cryptocurrency.ParseError) as exc: + cryptocurrency.read_data({'b': 'hello'}, OtherConcreteSchema) + + assert isinstance(exc.value.__cause__, cryptocurrency.MissingSchemaField) + + +class NestedSchema(cryptocurrency.Schema): + def __init__(self, a: OtherConcreteSchema) -> None: + super().__init__() + self.a = a + + +def test_schema_nested_exceptions(): + with pytest.raises(cryptocurrency.ParseError) as exc: + cryptocurrency.read_data({'a': {'b': 'hello'}}, NestedSchema) + + assert isinstance(exc.value.__cause__, cryptocurrency.ParseError) + assert isinstance(exc.value.__cause__.__cause__, cryptocurrency.MissingSchemaField) + + +def test_schema_unknown_fields(): + input_data = {'a': {'a': 'hello', 'b': 'world'}, 'c': 1} + with pytest.warns( + UserWarning, + match=r"Unknown fields: \['c'\] while parsing schema 'NestedSchema'", + ): + cryptocurrency.read_data(input_data, NestedSchema) + + obj = cryptocurrency.read_data(input_data, NestedSchema) + assert cryptocurrency.serialize(obj) == input_data + + +def test_cache(freeze_time): + c = cryptocurrency.Cache() + c.put('foo', 'bar', 30) + + # Object with a lifespan of 30 seconds should die at 30 seconds + freeze_time.tick(timedelta(seconds=29)) + assert c.get('foo') is not None + assert c.get('foo').value == 'bar' + freeze_time.tick() + assert c.get('foo') is None + + +def test_crypto_cmd(mock_requests): + init_response(mock_requests) + + conn = MagicMock() + conn.config = {} + conn.bot = None + + event = CommandEvent( + text='BTC USD', + cmd_prefix='.', + triggered_command='crypto', + hook=MagicMock(), + bot=conn.bot, + conn=conn, + channel='#foo', + nick='foobaruser', + ) + res = wrap_hook_response(cryptocurrency.crypto_command, event) + + assert res == [ + HookResult( + return_type='return', + value='BTC (bitcoin) // \x0307$50,000,000,000.00\x0f USD - 2.0000000 BTC // \x0303+18.9%\x0f change', + ) + ] + + +def _run_alias(): + conn = MagicMock() + conn.config = {} + conn.bot = None + + event = CommandEvent( + text='', + cmd_prefix='.', + triggered_command='btc', + hook=MagicMock(), + bot=conn.bot, + conn=conn, + channel='#foo', + nick='foobaruser', + ) + return wrap_hook_response(cryptocurrency.btc_alias, event) + + +def test_btc_alias(mock_requests): + init_response(mock_requests) + + res = _run_alias() + + assert res == [ + HookResult( + return_type='return', + value='BTC (bitcoin) // \x0307$50,000,000,000.00\x0f USD - 2.0000000 BTC // \x0303+18.9%\x0f change', + ) + ] + + +def test_btc_alias_neg_change(mock_requests): + init_response(mock_requests, pct_change=-14.5) + + res = _run_alias() + + assert res == [ + HookResult( + return_type='return', + value='BTC (bitcoin) // \x0307$50,000,000,000.00\x0f USD - 2.0000000 BTC // \x0305-14.5%\x0f change', + ) + ] + + +def test_btc_alias_no_change(mock_requests): + init_response(mock_requests, pct_change=0) + + res = _run_alias() + + assert res == [ + HookResult( + return_type='return', + value='BTC (bitcoin) // \x0307$50,000,000,000.00\x0f USD - 2.0000000 BTC // 0% change', + ) + ] + + +def test_crypto_cmd_bad_symbol(mock_requests): + init_response(mock_requests, fiat_map=False, quote=False) + + conn = MagicMock() + conn.config = {} + conn.bot = None + + event = CommandEvent( + text='ABC USD', + cmd_prefix='.', + triggered_command='crypto', + hook=MagicMock(), + bot=conn.bot, + conn=conn, + channel='#foo', + nick='foobaruser', + ) + res = wrap_hook_response(cryptocurrency.crypto_command, event) + + assert res == [HookResult('return', "Unknown cryptocurrency 'ABC'")] + + +def test_crypto_cmd_bad_fiat(mock_requests): + init_response(mock_requests, quote=False) + + conn = MagicMock() + conn.config = {} + conn.bot = None + + event = CommandEvent( + text='BTC ABC', + cmd_prefix='.', + triggered_command='crypto', + hook=MagicMock(), + bot=conn.bot, + conn=conn, + channel='#foo', + nick='foobaruser', + ) + res = wrap_hook_response(cryptocurrency.crypto_command, event) + + assert res == [HookResult('return', "Unknown fiat currency 'ABC'")] + + +def test_cmd_api_error(mock_requests): + init_response(mock_requests, error_msg='FooBar') + conn = MagicMock() + conn.config = {} + conn.bot = None + + event = CommandEvent( + text='BTC USD', + cmd_prefix='.', + triggered_command='crypto', + hook=MagicMock(), + bot=conn.bot, + conn=conn, + channel='#foo', + nick='foobaruser', + ) + res = [] + with pytest.raises(cryptocurrency.APIError, match='FooBar'): + wrap_hook_response(cryptocurrency.crypto_command, event, res) + + assert res == [HookResult('message', ('#foo', '(foobaruser) Unknown API error'))] + + +def test_list_currencies(patch_paste, mock_requests): + init_response(mock_requests, quote=False, fiat_map=False) + cryptocurrency.currency_list() + patch_paste.assert_called_with('Symbol Name\nBTC bitcoin') diff --git a/tests/util/__init__.py b/tests/util/__init__.py index 625755ab4..35e7a9461 100644 --- a/tests/util/__init__.py +++ b/tests/util/__init__.py @@ -3,6 +3,7 @@ from cloudbot.util.func_utils import call_with_args __all__ = ( + 'HookResult', 'wrap_hook_response', ) @@ -38,7 +39,7 @@ def __repr__(self): ', '.join( '{}={!r}'.format(k, getattr(self, k)) for k in ('return_type', 'value', 'data') - ) + ), )