Skip to content

Commit

Permalink
remove asset_partition_*_for_output from docs
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Jan 29, 2024
1 parent 0f413ec commit 544d4f1
Show file tree
Hide file tree
Showing 21 changed files with 35 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset

@asset(partitions_def=DailyPartitionsDefinition(start_date="2023-10-01"))
def my_daily_partitioned_asset(context: AssetExecutionContext) -> None:
partition_date_str = context.asset_partition_key_for_output()
partition_date_str = context.partition_key

url = f"https://api.nasa.gov/planetary/apod?api_key=DEMO_KEY&date={partition_date_str}"
target_location = f"nasa/{partition_date_str}.csv"
Expand Down Expand Up @@ -208,7 +208,7 @@ from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset

@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context: AssetExecutionContext) -> pd.DataFrame:
partition_date_str = context.asset_partition_key_for_output()
partition_date_str = context.partition_key
return pd.read_csv(f"coolweatherwebsite.com/weather_obs&date={partition_date_str}")
```

Expand Down
4 changes: 2 additions & 2 deletions docs/content/integrations/bigquery/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ from dagster import AssetExecutionContext, StaticPartitionsDefinition, asset
metadata={"partition_expr": "SPECIES"},
)
def iris_data_partitioned(context: AssetExecutionContext) -> pd.DataFrame:
species = context.asset_partition_key_for_output()
species = context.partition_key

full_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
Expand Down Expand Up @@ -165,7 +165,7 @@ from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset
metadata={"partition_expr": "TIMESTAMP_SECONDS(TIME)"},
)
def iris_data_per_day(context: AssetExecutionContext) -> pd.DataFrame:
partition = context.asset_partition_key_for_output()
partition = context.partition_key

# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'TIME' with that stores
Expand Down
2 changes: 1 addition & 1 deletion docs/content/integrations/dbt/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,7 @@ You can define a Dagster <PyObject module="dagster" object="PartitionDefinition"

Partitioned assets will be able to access the <PyObject module="dagster" object="TimeWindow"/>'s start and end dates, and these can be passed to dbt's CLI as variables which can be used to filter incremental models.

When a partition definition to passed to the <PyObject module="dagster_dbt" object="dbt_assets" decorator/> decorator, all assets are defined to operate on the same partitions. With this in mind, we can retrieve any time window from <PyObject module="dagster" object="asset_partitions_time_window_for_output"/> method in order to get the current start and end partitions.
When a partition definition to passed to the <PyObject module="dagster_dbt" object="dbt_assets" decorator/> decorator, all assets are defined to operate on the same partitions. With this in mind, we can retrieve any time window from <PyObject module="dagster" object="partition_time_window"/> property in order to get the current start and end partitions.

```python
import json
Expand Down
4 changes: 2 additions & 2 deletions docs/content/integrations/deltalake/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ from dagster import StaticPartitionsDefinition, asset
metadata={"partition_expr": "species"},
)
def iris_dataset_partitioned(context) -> pd.DataFrame:
species = context.asset_partition_key_for_output()
species = context.partition_key

full_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
Expand Down Expand Up @@ -134,7 +134,7 @@ from dagster import DailyPartitionsDefinition, asset
metadata={"partition_expr": "time"},
)
def iris_data_per_day(context) -> pd.DataFrame:
partition = context.asset_partition_key_for_output()
partition = context.partition_key

# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'time' with that stores
Expand Down
4 changes: 2 additions & 2 deletions docs/content/integrations/duckdb/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ from dagster import AssetExecutionContext, StaticPartitionsDefinition, asset
metadata={"partition_expr": "SPECIES"},
)
def iris_dataset_partitioned(context: AssetExecutionContext) -> pd.DataFrame:
species = context.asset_partition_key_for_output()
species = context.partition_key

full_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
Expand Down Expand Up @@ -168,7 +168,7 @@ from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset
metadata={"partition_expr": "TO_TIMESTAMP(TIME)"},
)
def iris_data_per_day(context: AssetExecutionContext) -> pd.DataFrame:
partition = context.asset_partition_key_for_output()
partition = context.partition_key

# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'time' with that stores
Expand Down
4 changes: 2 additions & 2 deletions docs/content/integrations/snowflake/reference.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ from dagster import AssetExecutionContext, StaticPartitionsDefinition, asset
metadata={"partition_expr": "SPECIES"},
)
def iris_dataset_partitioned(context: AssetExecutionContext) -> pd.DataFrame:
species = context.asset_partition_key_for_output()
species = context.partition_key

full_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
Expand Down Expand Up @@ -186,7 +186,7 @@ from dagster import AssetExecutionContext, DailyPartitionsDefinition, asset
metadata={"partition_expr": "TO_TIMESTAMP(TIME::INT)"},
)
def iris_data_per_day(context: AssetExecutionContext) -> pd.DataFrame:
partition = context.asset_partition_key_for_output()
partition = context.partition_key

# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'time' with that stores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

@asset(partitions_def=DailyPartitionsDefinition(start_date="2023-10-01"))
def my_daily_partitioned_asset(context: AssetExecutionContext) -> None:
partition_date_str = context.asset_partition_key_for_output()
partition_date_str = context.partition_key

url = f"https://api.nasa.gov/planetary/apod?api_key=DEMO_KEY&date={partition_date_str}"
target_location = f"nasa/{partition_date_str}.csv"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@

@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))
def my_daily_partitioned_asset(context: AssetExecutionContext) -> pd.DataFrame:
partition_date_str = context.asset_partition_key_for_output()
partition_date_str = context.partition_key
return pd.read_csv(f"coolweatherwebsite.com/weather_obs&date={partition_date_str}")
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
metadata={"partition_expr": "SPECIES"},
)
def iris_data_partitioned(context: AssetExecutionContext) -> pd.DataFrame:
species = context.asset_partition_key_for_output()
species = context.partition_key

full_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def get_iris_data_for_date(*args, **kwargs):
metadata={"partition_expr": "TIMESTAMP_SECONDS(TIME)"},
)
def iris_data_per_day(context: AssetExecutionContext) -> pd.DataFrame:
partition = context.asset_partition_key_for_output()
partition = context.partition_key

# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'TIME' with that stores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
metadata={"partition_expr": "species"},
)
def iris_dataset_partitioned(context) -> pd.DataFrame:
species = context.asset_partition_key_for_output()
species = context.partition_key

full_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def get_iris_data_for_date(*args, **kwargs):
metadata={"partition_expr": "time"},
)
def iris_data_per_day(context) -> pd.DataFrame:
partition = context.asset_partition_key_for_output()
partition = context.partition_key

# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'time' with that stores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
metadata={"partition_expr": "SPECIES"},
)
def iris_dataset_partitioned(context: AssetExecutionContext) -> pd.DataFrame:
species = context.asset_partition_key_for_output()
species = context.partition_key

full_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def get_iris_data_for_date(*args, **kwargs):
metadata={"partition_expr": "TO_TIMESTAMP(TIME)"},
)
def iris_data_per_day(context: AssetExecutionContext) -> pd.DataFrame:
partition = context.asset_partition_key_for_output()
partition = context.partition_key

# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'time' with that stores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
metadata={"partition_expr": "SPECIES"},
)
def iris_dataset_partitioned(context: AssetExecutionContext) -> pd.DataFrame:
species = context.asset_partition_key_for_output()
species = context.partition_key

full_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ def get_iris_data_for_date(*args, **kwargs):
metadata={"partition_expr": "TO_TIMESTAMP(TIME::INT)"},
)
def iris_data_per_day(context: AssetExecutionContext) -> pd.DataFrame:
partition = context.asset_partition_key_for_output()
partition = context.partition_key

# get_iris_data_for_date fetches all of the iris data for a given date,
# the returned dataframe contains a column named 'time' with that stores
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any, Mapping, Tuple

from dagster import (
OpExecutionContext,
AssetExecutionContext,
_check as check,
)

Expand Down Expand Up @@ -89,8 +89,8 @@ def _get_item_timestamp(item_id):


def id_range_for_time(
context: OpExecutionContext, hn_client: HNClient
context: AssetExecutionContext, hn_client: HNClient
) -> Tuple[Tuple[int, int], Mapping[str, Any]]:
"""For the configured time partition, searches for the range of ids that were created in that time."""
start, end = context.asset_partitions_time_window_for_output()
start, end = context.partition_time_window
return _id_range_for_time(int(start.timestamp()), int(end.timestamp()), hn_client)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from dagster import Output, asset
from dagster import AssetExecutionContext, Output, asset
from pandas import DataFrame
from pyspark.sql import DataFrame as SparkDF
from pyspark.sql.types import (
Expand Down Expand Up @@ -39,7 +39,7 @@
partitions_def=hourly_partitions,
key_prefix=["s3", "core"],
)
def items(context, hn_client: HNClient) -> Output[DataFrame]:
def items(context: AssetExecutionContext, hn_client: HNClient) -> Output[DataFrame]:
"""Items from the Hacker News API: each is a story or a comment on a story."""
(start_id, end_id), item_range_metadata = id_range_for_time(context, hn_client)

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import wandb
from dagster import AssetIn, StaticPartitionsDefinition, asset
from dagster import AssetExecutionContext, AssetIn, StaticPartitionsDefinition, asset

partitions_def = StaticPartitionsDefinition(["red", "orange", "yellow", "blue", "green"])

Expand All @@ -17,10 +17,10 @@
}
},
)
def write_advanced_artifact(context):
def write_advanced_artifact(context: AssetExecutionContext):
"""Example writing an Artifact with partitions and custom metadata."""
artifact = wandb.Artifact(ARTIFACT_NAME, "dataset")
partition_key = context.asset_partition_key_for_output()
partition_key = context.partition_key

if partition_key == "red":
return "red"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import wandb
from dagster import (
AssetExecutionContext,
AssetIn,
DailyPartitionsDefinition,
MultiPartitionsDefinition,
Expand All @@ -26,9 +27,9 @@
}
},
)
def create_my_multi_partitioned_asset(context):
def create_my_multi_partitioned_asset(context: AssetExecutionContext):
"""Example writing an Artifact with mutli partitions and custom metadata."""
partition_key = context.asset_partition_key_for_output()
partition_key = context.partition_key
context.log.info(f"Creating partitioned asset for {partition_key}")
if partition_key == "red|2023-01-02":
artifact = wandb.Artifact("my_multi_partitioned_asset", "dataset")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import random

from dagster import (
AssetExecutionContext,
AssetIn,
DailyPartitionsDefinition,
TimeWindowPartitionMapping,
Expand All @@ -21,11 +22,11 @@
}
},
)
def create_my_daily_partitioned_asset(context):
def create_my_daily_partitioned_asset(context: AssetExecutionContext):
"""Example writing an Artifact with daily partitions and custom metadata."""
# Happens when the asset is materialized in multiple runs (one per partition)
if context.has_partition_key:
partition_key = context.asset_partition_key_for_output()
partition_key = context.partition_key
context.log.info(f"Creating partitioned asset for {partition_key}")
return random.randint(0, 100)

Expand Down

0 comments on commit 544d4f1

Please sign in to comment.