Skip to content

Commit

Permalink
Deprecate publish component (#941)
Browse files Browse the repository at this point in the history
  • Loading branch information
DriesSchaumont authored Jan 10, 2025
1 parent a6b11a4 commit 96510f6
Show file tree
Hide file tree
Showing 10 changed files with 46 additions and 47 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# openpipelines 2.1.0

# MAJOR CHANGES

* The `transfer/publish` component is deprecated and will be removed in a future major release (PR #941).

# MINOR CHANGES

* `grep_annotation_column` and `subset_obsp`: Fix compatibility for SciPy (PR #945).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ dependencies:
- name: correction/cellbender_remove_background
- name: filter/filter_with_counts
- name: filter/subset_h5mu
- name: transfer/publish
# test_dependencies:
# - name: convert/from_10xh5_to_h5mu
test_resources:
Expand Down
33 changes: 22 additions & 11 deletions src/workflows/ingestion/cellranger_postprocessing/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,28 @@ workflow run_wf {
input_ch

main:
// perform correction if so desired

output_ch = input_ch
| map{id, state ->
assert (state.perform_correction || state.min_genes != null || state.min_counts != null):
"Either perform_correct, min_genes or min_counts should be specified!"
[id, state]
}
// Make sure there is not conflict between the output from this workflow
// And the output from any of the components
| map {id, state ->
def new_state = state + ["workflow_output": state.output]
[id, new_state]
}
// perform correction if so desired
| cellbender_remove_background.run(
runIf: {id, state -> state.perform_correction},
fromState: { id, state ->
[
input: state.input,
epochs: state.cellbender_epochs,
output_layer: "cellbender_corrected",
output_compression: "gzip"
output_compression: "gzip",
output: state.workflow_output,
]
},
toState: { id, output, state ->
Expand All @@ -31,19 +42,19 @@ workflow run_wf {
min_counts: state.min_counts,
layer: state.layer,
output_compression: "gzip",
do_subset: true
do_subset: true,
output: state.workflow_output,
]
},
toState: [input: "output"]
)
// Make sure to use the correct ouput file names,
// irrespective wether or not any of the above
// components were run
| publish.run(
fromState: [ input: "input", output: "output" ],
toState: ["output": "output"]
)
| setState(["output"])
// irrespective of which component(s) (or combinations of then)
// were run. The above components
// should put their output into 'input'
| map {id, state ->
[id, ["output": state.input]]
}

emit:
output_ch
Expand Down
1 change: 0 additions & 1 deletion src/workflows/integration/totalvi_leiden/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,6 @@ dependencies:
- name: integrate/totalvi
- name: dimred/umap
- name: neighbors/find_neighbors
- name: transfer/publish
resources:
- type: nextflow_script
path: main.nf
Expand Down
17 changes: 7 additions & 10 deletions src/workflows/integration/totalvi_leiden/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,12 @@ workflow neighbors_leiden_umap {
| umap.run(
fromState: [
"input": "input",
"output": "workflow_output",
"uns_neighbors": "uns_neighbors",
"obsm_output": "obsm_umap",
"query_modality": "modality",
],
args: ["output_compression": "gzip"]
toState: ["output": "output"]
)

Expand Down Expand Up @@ -110,7 +112,9 @@ workflow run_wf {
}
[id, new_state, state]
}
| neighbors_leiden_umap
// neighbors_leiden_umap is not a subworkflow or module, but a
// workflow defined in this script, so not .run functionality is available
| neighbors_leiden_umap
| map { id, state, orig_state -> // for ADT
stateMapping = [
"uns_neighbors": "prot_uns_neighbors",
Expand All @@ -131,16 +135,9 @@ workflow run_wf {
}
[id, new_state + ["input": state.output]]
}
// neighbors_leiden_umap is not a subworkflow or module, but a
// workflow defined in this script, so not .run functionality is available
| neighbors_leiden_umap
| publish.run(
fromState: { id, state -> [
"input": state.output,
"output": state.workflow_output,
"compression": "gzip"
]
},
toState: ["output", "output"]
)
| setState(["output", "reference_model_path", "query_model_path"])
emit:
output_ch
Expand Down
1 change: 0 additions & 1 deletion src/workflows/multiomics/process_batches/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,6 @@ dependencies:
alias: dimensionality_reduction_rna
- name: workflows/multiomics/dimensionality_reduction
alias: dimensionality_reduction_prot
- name: transfer/publish
- name: workflows/multiomics/dimensionality_reduction
alias: dimensionality_reduction_scaling_rna
resources:
Expand Down
21 changes: 10 additions & 11 deletions src/workflows/multiomics/process_batches/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ workflow run_wf {
"modality": "rna",
"var_pca_feature_selection": state.highly_variable_features_var_output, // run PCA on highly variable genes only
"pca_overwrite": state.pca_overwrite,
"output": state.workflow_output,
],
"dimensionality_reduction_scaling_rna":
[
Expand All @@ -194,31 +195,29 @@ workflow run_wf {
"uns_neighbors": "neighbors_scaled",
"obsp_neighbor_connectivities": "connectivities_scaled",
"obsp_neighbor_distances": "distances_scaled",
"output": state.workflow_output,
],
"dimensionality_reduction_prot":
[
"id": id,
"input": state.input,
"layer": "clr",
"modality": "prot",
"pca_overwrite": state.pca_overwrite
"pca_overwrite": state.pca_overwrite,
"output": state.workflow_output,
]
]
return stateMappings[component.name]
},
toState: ["input": "output"]
)
}
| publish.run(
fromState: { id, state -> [
"input": state.input,
"output": state.workflow_output,
]
},
toState: ["output": "output"]
)
| setState(["output"])

// At the end of the reduce statement,
// the `toState` closure put the output back into
// into the 'input' slot
| map {id, state ->
[id, ["output": state.input]]
}

emit:
output_ch
Expand Down
1 change: 0 additions & 1 deletion src/workflows/multiomics/process_samples/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ dependencies:
alias: split_modalities_workflow
- name: dataflow/merge
- name: dataflow/concatenate_h5mu
- name: transfer/publish
- name: workflows/rna/rna_singlesample
- name: workflows/prot/prot_singlesample
- name: workflows/gdo/gdo_singlesample
Expand Down
1 change: 0 additions & 1 deletion src/workflows/qc/qc/config.vsh.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,6 @@ argument_groups:
description: Destination path to the output.
example: output.h5mu
dependencies:
- name: transfer/publish
- name: metadata/grep_annotation_column
- name: qc/calculate_qc_metrics
resources:
Expand Down
13 changes: 3 additions & 10 deletions src/workflows/qc/qc/main.nf
Original file line number Diff line number Diff line change
Expand Up @@ -63,22 +63,15 @@ workflow run_wf {
"output_var_num_nonzero_obs": state.output_var_num_nonzero_obs,
"output_var_total_counts_obs": state.output_var_total_counts_obs,
"output_var_obs_mean": state.output_var_obs_mean,
"output_var_pct_dropout": state.output_var_pct_dropout
"output_var_pct_dropout": state.output_var_pct_dropout,
"output": state.workflow_output,
"compression": "gzip"
]
if (state.var_qc_metrics) {
newState += ["var_qc_metrics": state.var_qc_metrics]
}
return newState
},
toState: ["input": "output"]
)
| publish.run(
fromState: { id, state -> [
"input": state.input,
"output": state.workflow_output,
"compression": "gzip"
]
},
toState: ["output": "output"]
)
| setState(["output"])
Expand Down

0 comments on commit 96510f6

Please sign in to comment.