-
Notifications
You must be signed in to change notification settings - Fork 1
/
dca_template.py
70 lines (57 loc) · 1.96 KB
/
dca_template.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
"""
Cryptoflow Dollar Cost Averaging DAG
"""
import os
import sys
from datetime import timedelta
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from cryptoflow.buyatmarket import BuyAtMarket
from cryptoflow.config import get_dca_config
ASSET = os.path.basename(__file__).replace("dca_", "").replace(".py", "").upper()
AMOUNT_USD = get_dca_config(ASSET, 'amount_usd')
SCHEDULE = get_dca_config(ASSET, 'schedule')
START_DATE = days_ago(1)
default_args = {
'owner': 'cryptoflow',
}
dag = DAG(
dag_id=f'dca_{ASSET.lower()}',
default_args=default_args,
schedule_interval=SCHEDULE,
start_date=START_DATE,
catchup=False,
dagrun_timeout=timedelta(minutes=1),
tags=['crypto', 'dollar_cost_average'],
params={
"amount_usd": "5"
}
)
# [START dollar_cost_average]
def do_dollar_cost_average():
""" Do dollar cost average buys """
return_message = None
buymarket = BuyAtMarket(ASSET, AMOUNT_USD)
best_price = buymarket.get_best_price()
print("BUY, HODL, BUY, HODL, BUY, HODL!!!")
response = buymarket.buy_market(best_price, AMOUNT_USD)
if response['success']:
return_message = response['message']
else:
if best_price['exchange'] == "coinbasepro" and response['message'] == "Insufficient funds":
# Insufficient funds on coinbasepro, so let's try gemini.
print("Insufficient funds on coinbasepro, trying gemini...")
best_price['exchange'] = "gemini"
response = buymarket.buy_market(best_price, AMOUNT_USD)
if not response['success']:
print(f"{response['reason']} {response['message']}")
sys.exit(1)
return f"Order placed: {return_message}"
dollar_cost_average = PythonOperator(
task_id='dollar_cost_average',
python_callable=do_dollar_cost_average,
op_kwargs={ "dip_price": "0.0", "amount_usd": "5" },
dag=dag,
)
# [END dollar_cost_average]