diff --git a/hooli_data_eng/assets/forecasting/__init__.py b/hooli_data_eng/assets/forecasting/__init__.py index 62a4e163..7ec804ae 100644 --- a/hooli_data_eng/assets/forecasting/__init__.py +++ b/hooli_data_eng/assets/forecasting/__init__.py @@ -99,7 +99,7 @@ def model_stats_by_month( ) -> Output[pd.DataFrame]: """Model errors by month""" a, b = order_forecast_model - target_date = pd.to_datetime(context.asset_partition_key_for_output()) + target_date = pd.to_datetime(context.partition_key) target_month = target_date.month weekly_order_summary["order_date"] = pd.to_datetime( weekly_order_summary["order_date"] diff --git a/hooli_data_eng/assets/raw_data/__init__.py b/hooli_data_eng/assets/raw_data/__init__.py index 1e05b80b..fd75f01f 100644 --- a/hooli_data_eng/assets/raw_data/__init__.py +++ b/hooli_data_eng/assets/raw_data/__init__.py @@ -27,7 +27,7 @@ def _daily_partition_seq(start, end): start = pd.to_datetime(start) end = pd.to_datetime(end) daily_diffs = int((end - start) / timedelta(hours=24)) - + return [str(start + timedelta(hours=i)) for i in range(daily_diffs)] @@ -41,7 +41,7 @@ def users(context, api: RawDataAPI) -> pd.DataFrame: """A table containing all users data""" # during a backfill the partition range will span multiple hours # during a single run the partition range will be for a single hour - first_partition, last_partition = context.asset_partitions_time_window_for_output() + first_partition, last_partition = context.partition_time_window partition_seq = _daily_partition_seq(first_partition, last_partition) all_users = [] for partition in partition_seq: @@ -69,8 +69,8 @@ def check_users(context, users: pd.DataFrame): partitions_def=daily_partitions, metadata={"partition_expr": "DT"}, retry_policy=RetryPolicy( - max_retries=3, - delay=1, + max_retries=3, + delay=1, backoff=Backoff.LINEAR, jitter=Jitter.FULL ), @@ -78,14 +78,14 @@ def check_users(context, users: pd.DataFrame): ) def orders(context, api: RawDataAPI) -> pd.DataFrame: """A table containing all orders that have been placed""" - first_partition, last_partition = context.asset_partitions_time_window_for_output() + first_partition, last_partition = context.partition_time_window partition_seq = _daily_partition_seq(first_partition, last_partition) all_orders = [] for partition in partition_seq: resp = api.get_orders(partition) users = pd.read_json(resp.json()) all_orders.append(users) - + all_orders_df = pd.concat(all_orders) all_orders_df['dt'] = pd.to_datetime(all_orders_df['dt'], unit = "ms") return all_orders_df