-
Notifications
You must be signed in to change notification settings - Fork 1
/
import-data.nf
53 lines (41 loc) · 1.3 KB
/
import-data.nf
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
#!/usr/bin/env nextflow
nextflow.enable.dsl = 2
include { lookup_ref_ids } from './workflows/lookup-references'
include { batch_lookup_ontology_information } from './workflows/lookup-ontology-info'
include { parse_databases } from './workflows/parse-databases'
include { parse_metadata } from './workflows/parse-metadata'
include { load_data } from './workflows/load-data'
include { slack_message } from './workflows/utils/slack'
include { slack_closure } from './workflows/utils/slack'
workflow import_data {
take: _flag
emit: post_release
main:
Channel.of("Starting data import pipeline") | slack_message
Channel.empty() \
| mix(
parse_databases(),
parse_metadata(),
) \
| branch {
terms: it.name == "terms.csv"
ref_ids: it.name == "ref_ids.csv"
csv: true
} \
| set { results }
results.terms | batch_lookup_ontology_information | set { term_info }
results.ref_ids | lookup_ref_ids | set { references }
results.csv \
| mix(term_info, references) \
| load_data \
| set { post_release }
}
workflow {
import_data(Channel.of('ready'))
}
workflow.onError {
slack_closure("Import pipeline encountered an error and failed")
}
workflow.onComplete {
slack_closure("Workflow completed ${$workflow.status ? 'Ok' : 'with errors'} ")
}