-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcrypto.py
78 lines (62 loc) · 2.07 KB
/
crypto.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
# Created and Modified by Malik B. Parker 2021
from requests import Request, Session, post
from requests.exceptions import ConnectionError, Timeout, TooManyRedirects
import json
import time
##### Globals
# Coin Market Cap API KEY
coinmarketcap_key = open("coinmarketcap.key").readline()
# Gathers data from all the coins on the website
coin_url = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/listings/latest'
# IFTTT API Key
ifttt_key = open("ifttt.key").readline()
# Sends a webhook to IFTTT with Price information
ifttt_webhook_url = f"https://maker.ifttt.com/trigger/new_DOGE_data/with/key/{ifttt_key}"
# Dictionary containing information on all coins from Coin Market Cap
coins = {}
##### Functions
# Gets the latest DOGE Price as well as populate the coins dictionary
def get_latest_doge():
global coins
try:
if(DEBUG): # Uses the sample data so that you don't waste an API Call
response = open('sample_data.json')
data = json.load(response)
else:
response = session.get(coin_url, params=parameters)
data = json.loads(response.text)
data = data["data"]
for coin in data:
coins[coin["symbol"]] = coin
# Pretty print the results for DOGE to the console
# print(json.dumps(coins["DOGE"], indent=2))
price = coins["DOGE"]["quote"]["USD"]["price"]
return price
except (ConnectionError, Timeout, TooManyRedirects) as e:
print(e)
# Send the price value to IFTTT via the Webhook
def post_ifttt_webhook(price):
try:
data = {'value1': price}
post(ifttt_webhook_url, json=data)
except:
print("ERROR")
##### Session Start-Up Parameters
parameters = {
'start':'1',
'limit':'5000',
'convert':'USD'
}
headers = {
'Accepts': 'application/json',
'X-CMC_PRO_API_KEY': coinmarketcap_key,
}
session = Session()
session.headers.update(headers)
WAIT_TO_SEND_TIME = 300 # Seconds
DEBUG = False # Set to true if you want to use the sample data
if __name__ == "__main__":
# while(True):
# post_ifttt_webhook(get_latest_doge())
# time.sleep(WAIT_TO_SEND_TIME)
post_ifttt_webhook(get_latest_doge())