Skip to content

Commit

Permalink
update example for new api
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 29, 2024
1 parent f4a4873 commit 3021914
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions examples/partition_example/partition_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def relativedelta(*args, **kwargs):
metadata={"partition_expr": "LastModifiedDate"},
)
def salesforce_customers(context: AssetExecutionContext) -> pd.DataFrame:
start_date_str = context.asset_partition_key_for_output()
start_date_str = context.partition_info.key

timezone = pytz.timezone("GMT") # Replace 'Your_Timezone' with the desired timezone
start_obj = datetime.datetime.strptime(start_date_str, "%Y-%m-%d").replace(tzinfo=timezone)
Expand Down Expand Up @@ -65,7 +65,7 @@ def realized_vol(context: AssetExecutionContext, orats_daily_prices: pd.DataFram
The volatility is calculated using various methods such as close-to-close, Parkinson, Hodges-Tompkins, and Yang-Zhang.
The function returns a DataFrame with the calculated volatilities.
"""
trade_date = context.asset_partition_key_for_output()
trade_date = context.partition_info.key
ticker_id = 1

df = all_realvols(orats_daily_prices, ticker_id, trade_date)
Expand All @@ -80,7 +80,7 @@ def realized_vol(context: AssetExecutionContext, orats_daily_prices: pd.DataFram

@asset(io_manager_def="parquet_io_manager", partitions_def=hourly_partitions)
def my_custom_df(context: AssetExecutionContext) -> pd.DataFrame:
start, end = context.asset_partitions_time_window_for_output()
start, end = context.partition_info.time_window

df = pd.DataFrame({"timestamp": pd.date_range(start, end, freq="5T")})
df["count"] = df["timestamp"].map(lambda a: random.randint(1, 1000))
Expand All @@ -93,7 +93,7 @@ def fetch_blog_posts_from_external_api(*args, **kwargs):

@asset(partitions_def=HourlyPartitionsDefinition(start_date="2022-01-01-00:00"))
def blog_posts(context: AssetExecutionContext) -> List[Dict]:
partition_datetime_str = context.asset_partition_key_for_output()
partition_datetime_str = context.partition_info.key
hour = datetime.datetime.fromisoformat(partition_datetime_str)
posts = fetch_blog_posts_from_external_api(hour_when_posted=hour)
return posts
Expand All @@ -106,7 +106,7 @@ def blog_posts(context: AssetExecutionContext) -> List[Dict]:
key_prefix=["snowflake", "eldermark_proxy"],
)
def resident(context: AssetExecutionContext) -> Output[pd.DataFrame]:
start, end = context.asset_partitions_time_window_for_output()
start, end = context.partition_info.time_window
filter_str = f"LastMod_Stamp >= {start.timestamp()} AND LastMod_Stamp < {end.timestamp()}"

records = context.resources.eldermark.fetch_obj(obj="Resident", filter=filter_str)
Expand Down

0 comments on commit 3021914

Please sign in to comment.