Skip to content

Commit

Permalink
Price Alpha Update: Working step function
Browse files Browse the repository at this point in the history
- Granular lambda functions with prices alpha daily and prices alpha monthly separated
- Separation in batches with sublists designed to run in one hour slots to avoid API limit errors
- Processing of monthly returns of WRDS and Alpha into one table at the end of statemachine
  • Loading branch information
nico-corthorn committed Oct 16, 2024
1 parent 0d50471 commit ce52231
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 184 deletions.
60 changes: 34 additions & 26 deletions esgtools/get_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import boto3
from botocore.exceptions import ClientError
from ast import literal_eval
import numpy as np
import pandas as pd

from utils import sql_manager, aws, utils
from alpha import api, table
Expand All @@ -15,28 +17,32 @@ def lambda_handler(event, context):
----------
event: dict, required
API Gateway Lambda Proxy Input Format
Event doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format
Keys:
ref_table: str
Example: prices_alpha
validate: str, will be converted to bool
Example: True
asset_types: str, will be converted to List[str] by splitting on ,
Example: Stock or Stock,ETF
max_assets_in_batch: str, will be converted to int
Example: 4500
context: object, required
Lambda Context runtime methods and attributes
Context doc: https://docs.aws.amazon.com/lambda/latest/dg/python-context-object.html
Returns
-------
API Gateway Lambda Proxy Output Format: dict
Return doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html
Dict using HTML protocol
"""
print("event")
print(event)
print()
print("event", event)

# Example
# http://127.0.0.1:3000/get-assets?ref_table=prices_alpha&group=100
# 'queryStringParameters': {'ref_table': 'prices_alpha', 'group': '100'}
# {"queryStringParameters": {"ref_table": "prices_alpha", "group": "100"}}
# 'queryStringParameters': {'ref_table': 'prices_alpha'}
# {"queryStringParameters": {"ref_table": "prices_alpha"}}

print("event['queryStringParameters']")
print(event["queryStringParameters"])
Expand All @@ -50,20 +56,22 @@ def lambda_handler(event, context):
if "validate" in inputs else False
asset_types = inputs["asset_types"].split(",") \
if "asset_types" in inputs else ["Stock"]
group = int(inputs["group"]) if "group" in inputs else 10
#group = int(inputs["group"]) if "group" in inputs else 10
max_assets_in_batch = int(inputs["max_assets_in_batch"]) if "max_assets_in_batch" in inputs else 75*60
n_lists_in_group = int(inputs["n_lists_in_group"]) if "n_lists_in_group" in inputs else 10
print(f"ref_table = {ref_table}")
print(f"validate = {validate}")
print(f"asset_types = {asset_types}")
print(f"group = {group}")
print(f"max_assets_in_batch = {max_assets_in_batch}")
print(f"n_lists_in_group = {n_lists_in_group}")

# Decrypts secret using the associated KMS key.
db_credentials = literal_eval(aws.get_secret("prod/awsportfolio/key"))
api_key = literal_eval(aws.get_secret("prod/AlphaApi/key"))["ALPHAVANTAGE_API_KEY"]

alpha_scraper = api.AlphaScraper(api_key=api_key)

assets_sublists = []

assets = pd.DataFrame()
if ref_table == "prices_alpha":
keys = ["symbol", "date"]
alpha_prices = table.AlphaTablePrices(
Expand All @@ -73,8 +81,6 @@ def lambda_handler(event, context):
sql_params=db_credentials
)
assets = alpha_prices.get_assets(validate, asset_types)
assets_sublists = [{"symbols": ','.join(list(assets.loc[i:i+group-1].symbol))} \
for i in range(0, len(assets), group)]
else:
accounting_keys = ["symbol", "report_type", "report_date", "currency", "account_name"]
balance_accounts = ['totalAssets', 'commonStock', 'commonStockSharesOutstanding']
Expand All @@ -97,26 +103,28 @@ def lambda_handler(event, context):
)
if ref_table == "balance_alpha":
assets = alpha_balance.get_assets(validate, asset_types)
assets_sublists = [{"symbols": ','.join(list(assets.loc[i:i+group].symbol))} \
for i in range(0, len(assets), group)]
elif ref_table == "income_alpha":
assets = alpha_income.get_assets(validate, asset_types)
assets_sublists = [{"symbols": ','.join(list(assets.loc[i:i+group].symbol))} \
for i in range(0, len(assets), group)]
elif ref_table == "accounting":
assets_balance = alpha_balance.get_assets(validate, asset_types)
assets_income = alpha_income.get_assets(validate, asset_types)
assets = list(set(assets_balance.symbol).union(set(assets_income.symbol)))
assets.sort()
assets_sublists = [{"symbols": ','.join(list(assets[i:i+group]))} \
for i in range(0, len(assets), group)]

print(assets_sublists)

assets_sublists = []
if assets.shape[0] > 0:
for i in range(0, len(assets), max_assets_in_group):
symbols_group: pd.Series = assets.loc[i:i+max_assets_in_group-1].symbol
partition_group = np.array_split(symbols_group, n_lists_in_group)
assets_sublists.append([{
"symbols": ",".join(list(sublist)),
"size": size
} for sublist in partition_group])

return {
"statusCode": 200,
"body": json.dumps({
"message": f"returning {len(assets_sublists)} sublists with maximum {group} symbols each",
"message": f"Returning {len(assets_sublists)} groups with {n_lists_in_group} sublists each.",
}),
"assets": assets_sublists,
}
}
40 changes: 8 additions & 32 deletions esgtools/update_prices.py → esgtools/update_prices_alpha.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,29 +9,8 @@


def lambda_handler(event, context):
"""Sample pure Lambda function
Parameters
----------
event: dict, required
API Gateway Lambda Proxy Input Format
Event doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html#api-gateway-simple-proxy-for-lambda-input-format
context: object, required
Lambda Context runtime methods and attributes
Context doc: https://docs.aws.amazon.com/lambda/latest/dg/python-context-object.html
Returns
-------
API Gateway Lambda Proxy Output Format: dict
Return doc: https://docs.aws.amazon.com/apigateway/latest/developerguide/set-up-lambda-proxy-integrations.html
"""
print("event")
print(event)
print()
""" Update prices_alpha for symbols given in event. """
print("event", event)

# Example
# http://127.0.0.1:3000/update-prices?size=compact&symbols=AMZN,AAPL,MSFT
Expand All @@ -44,9 +23,9 @@ def lambda_handler(event, context):
inputs = event

# Gather parameters
size = inputs["size"] if "size" in inputs else "compact"
size = inputs.get("size", "full")
symbols = inputs["symbols"].split(",") if "symbols" in inputs else []
parallel = utils.str2bool(inputs["parallel"]) if "parallel" in inputs else False
parallel = utils.str2bool(inputs.get("parallel", "false"))
print(f"size = {size}")
print(f"symbols = {symbols}")
print(f"parallel = {parallel}")
Expand All @@ -55,26 +34,23 @@ def lambda_handler(event, context):
db_credentials = literal_eval(aws.get_secret("prod/awsportfolio/key"))
api_key = literal_eval(aws.get_secret("prod/AlphaApi/key"))["ALPHAVANTAGE_API_KEY"]

alpha_scraper = api.AlphaScraper(api_key=api_key)
alpha_scraper = api.AlphaScraper(api_key=api_key, wait=False)
prices_keys = ["symbol", "date"]
alpha_prices = table.AlphaTablePrices(
"prices_alpha",
prices_keys,
alpha_scraper,
sql_params=db_credentials
)
alpha_prices_monthly = table.AlphaTablePricesMonthly(
"prices_alpha_monthly",
sql_params=db_credentials
)

if symbols:
alpha_prices.update_list(symbols, size=size, parallel=parallel)
alpha_prices_monthly.update_list(symbols, parallel=parallel)

return {
"statusCode": 200,
"body": json.dumps({
"message": f"Daily and monthly prices updated for symbols = {symbols}, size = {size}",
"message": f"Daily prices updated for symbols = {symbols}, size = {size}",
}),
"symbols": ",".join(symbols),
"size": size
}
46 changes: 46 additions & 0 deletions esgtools/update_prices_alpha_monthly.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
import json
import boto3
from botocore.exceptions import ClientError
from ast import literal_eval

from utils import sql_manager, aws, utils
from alpha import api, table


def lambda_handler(event, context):
print("event", event)

# Example
# http://127.0.0.1:3000/update-prices?size=compact&symbols=AMZN,AAPL,MSFT
# 'queryStringParameters': {'parallel': '1', 'size': 'compact', 'symbols': 'AMZN,AAPL,MSFT'}

# Inputs
if 'queryStringParameters' in event:
inputs = event["queryStringParameters"]
else:
inputs = event

# Gather parameters
symbols = inputs["symbols"].split(",") if "symbols" in inputs else []
parallel = utils.str2bool(inputs["parallel"]) if "parallel" in inputs else False
print(f"symbols = {symbols}")
print(f"parallel = {parallel}")

# Decrypts secret using the associated KMS key.
db_credentials = literal_eval(aws.get_secret("prod/awsportfolio/key"))

alpha_prices_monthly = table.AlphaTablePricesMonthly(
"prices_alpha_monthly",
sql_params=db_credentials
)

if symbols:
alpha_prices_monthly.update_list(symbols, parallel=parallel)

return {
"statusCode": 200,
"body": json.dumps({
"message": "Monthly prices updated for symbols provided",
}),
}
Loading

0 comments on commit ce52231

Please sign in to comment.