From ca1e500c0bce2a62dcb6406ebc3286af1e7705d6 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Fri, 2 Feb 2024 17:58:20 +0100 Subject: [PATCH 1/2] FIX(kedro-airflow): grouping with fork Signed-off-by: Simon Brugman --- kedro-airflow/kedro_airflow/grouping.py | 36 ++++++++++++------------- 1 file changed, 17 insertions(+), 19 deletions(-) diff --git a/kedro-airflow/kedro_airflow/grouping.py b/kedro-airflow/kedro_airflow/grouping.py index 581c84f41..6515851c8 100644 --- a/kedro-airflow/kedro_airflow/grouping.py +++ b/kedro-airflow/kedro_airflow/grouping.py @@ -2,7 +2,7 @@ from collections import defaultdict -from kedro.io import DataCatalog, MemoryDataset +from kedro.io import DataCatalog, MemoryDataSet from kedro.pipeline.node import Node from kedro.pipeline.pipeline import Pipeline @@ -11,15 +11,15 @@ def _is_memory_dataset(catalog, dataset_name: str) -> bool: if dataset_name == "parameters" or dataset_name.startswith("params:"): return False - dataset = catalog._datasets.get(dataset_name, None) - return dataset is not None and isinstance(dataset, MemoryDataset) + dataset = catalog._data_sets.get(dataset_name, None) + return dataset is not None and isinstance(dataset, MemoryDataSet) def get_memory_datasets(catalog: DataCatalog, pipeline: Pipeline) -> set[str]: - """Gather all datasets in the pipeline that are of type MemoryDataset, excluding 'parameters'.""" + """Gather all datasets in the pipeline that are of type MemoryDataSet, excluding 'parameters'.""" return { dataset_name - for dataset_name in pipeline.datasets() + for dataset_name in pipeline.data_sets() if _is_memory_dataset(catalog, dataset_name) } @@ -29,15 +29,8 @@ def node_sequence_name(node_sequence: list[Node]) -> str: def group_memory_nodes(catalog: DataCatalog, pipeline: Pipeline): - """ - Nodes that are connected through MemoryDatasets cannot be distributed across - multiple machines, e.g. be in different Kubernetes pods. This function - groups nodes that are connected through MemoryDatasets in the pipeline - together. Essentially, this computes connected components over the graph of - nodes connected by MemoryDatasets. - """ # get all memory datasets in the pipeline - memory_datasets = get_memory_datasets(catalog, pipeline) + ds = get_memory_datasets(catalog, pipeline) # Node sequences node_sequences = [] @@ -45,11 +38,11 @@ def group_memory_nodes(catalog: DataCatalog, pipeline: Pipeline): # Mapping from dataset name -> node sequence index sequence_map = {} for node in pipeline.nodes: - if all(o not in memory_datasets for o in node.inputs + node.outputs): + if all(o not in ds for o in node.inputs + node.outputs): # standalone node node_sequences.append([node]) else: - if all(i not in memory_datasets for i in node.inputs): + if all(i not in ds for i in node.inputs): # start of a sequence; create a new sequence and store the id node_sequences.append([node]) sequence_id = len(node_sequences) - 1 @@ -57,22 +50,27 @@ def group_memory_nodes(catalog: DataCatalog, pipeline: Pipeline): # continuation of a sequence; retrieve sequence_id sequence_id = None for i in node.inputs: - if i in memory_datasets: - assert sequence_id is None or sequence_id == sequence_map[i] - sequence_id = sequence_map[i] + if i in ds: + if sequence_id is None: + sequence_id = sequence_map[i] + else: + # merge sequences + node_sequences[sequence_id].extend(node_sequences[sequence_map[i]]) + node_sequences[sequence_map[i]] = None # Append to map node_sequences[sequence_id].append(node) # map outputs to sequence_id for o in node.outputs: - if o in memory_datasets: + if o in ds: sequence_map[o] = sequence_id # Named node sequences nodes = { node_sequence_name(node_sequence): node_sequence for node_sequence in node_sequences + if node_sequence is not None } # Inverted mapping From bd8a51e0330ac4400756d0ad8b87ffcb87bdd708 Mon Sep 17 00:00:00 2001 From: Simon Brugman Date: Fri, 2 Feb 2024 18:02:20 +0100 Subject: [PATCH 2/2] FIX(kedro-airflow): grouping --- kedro-airflow/kedro_airflow/grouping.py | 28 ++++++++++++++++--------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/kedro-airflow/kedro_airflow/grouping.py b/kedro-airflow/kedro_airflow/grouping.py index 6515851c8..1604e3b64 100644 --- a/kedro-airflow/kedro_airflow/grouping.py +++ b/kedro-airflow/kedro_airflow/grouping.py @@ -2,7 +2,7 @@ from collections import defaultdict -from kedro.io import DataCatalog, MemoryDataSet +from kedro.io import DataCatalog, MemoryDataset from kedro.pipeline.node import Node from kedro.pipeline.pipeline import Pipeline @@ -11,15 +11,15 @@ def _is_memory_dataset(catalog, dataset_name: str) -> bool: if dataset_name == "parameters" or dataset_name.startswith("params:"): return False - dataset = catalog._data_sets.get(dataset_name, None) - return dataset is not None and isinstance(dataset, MemoryDataSet) + dataset = catalog._datasets.get(dataset_name, None) + return dataset is not None and isinstance(dataset, MemoryDataset) def get_memory_datasets(catalog: DataCatalog, pipeline: Pipeline) -> set[str]: - """Gather all datasets in the pipeline that are of type MemoryDataSet, excluding 'parameters'.""" + """Gather all datasets in the pipeline that are of type MemoryDataset, excluding 'parameters'.""" return { dataset_name - for dataset_name in pipeline.data_sets() + for dataset_name in pipeline.datasets() if _is_memory_dataset(catalog, dataset_name) } @@ -29,8 +29,16 @@ def node_sequence_name(node_sequence: list[Node]) -> str: def group_memory_nodes(catalog: DataCatalog, pipeline: Pipeline): + """ + Nodes that are connected through MemoryDatasets cannot be distributed across + multiple machines, e.g. be in different Kubernetes pods. This function + groups nodes that are connected through MemoryDatasets in the pipeline + together. Essentially, this computes connected components over the graph of + nodes connected by MemoryDatasets. + """ + # get all memory datasets in the pipeline - ds = get_memory_datasets(catalog, pipeline) + memory_datasets = get_memory_datasets(catalog, pipeline) # Node sequences node_sequences = [] @@ -38,11 +46,11 @@ def group_memory_nodes(catalog: DataCatalog, pipeline: Pipeline): # Mapping from dataset name -> node sequence index sequence_map = {} for node in pipeline.nodes: - if all(o not in ds for o in node.inputs + node.outputs): + if all(o not in memory_datasets for o in node.inputs + node.outputs): # standalone node node_sequences.append([node]) else: - if all(i not in ds for i in node.inputs): + if all(i not in memory_datasets for i in node.inputs): # start of a sequence; create a new sequence and store the id node_sequences.append([node]) sequence_id = len(node_sequences) - 1 @@ -50,7 +58,7 @@ def group_memory_nodes(catalog: DataCatalog, pipeline: Pipeline): # continuation of a sequence; retrieve sequence_id sequence_id = None for i in node.inputs: - if i in ds: + if i in memory_datasets: if sequence_id is None: sequence_id = sequence_map[i] else: @@ -63,7 +71,7 @@ def group_memory_nodes(catalog: DataCatalog, pipeline: Pipeline): # map outputs to sequence_id for o in node.outputs: - if o in ds: + if o in memory_datasets: sequence_map[o] = sequence_id # Named node sequences