-
Notifications
You must be signed in to change notification settings - Fork 11
/
Snakefile-build-raw
120 lines (97 loc) · 2.88 KB
/
Snakefile-build-raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
"""
Snakefile for doing the first stages of data processing from the daq sandbox files
to the blinded raw data. It handles:
- moving the daq files from the sandbox to the sorted file system
- running build raw on this data (with trimming)
- blinding the physics data
"""
import pathlib, os, json, sys
from scripts.util.patterns import (
get_pattern_unsorted_data,
get_pattern_tier_daq,
get_pattern_tier_raw,
)
from scripts.util.utils import (
subst_vars_in_snakemake_config,
runcmd,
config_path,
chan_map_path,
filelist_path,
pars_path,
metadata_path,
)
from scripts.util.pars_loading import pars_catalog
import scripts.util as ds
check_in_cycle = True
# Set with `snakemake --configfile=/path/to/your/config.json`
# configfile: "have/to/specify/path/to/your/config.json"
subst_vars_in_snakemake_config(workflow, config)
setup = config["setups"]["l200"]
configs = config_path(setup)
chan_maps = chan_map_path(setup)
swenv = runcmd(setup)
meta = metadata_path(setup)
basedir = workflow.basedir
wildcard_constraints:
experiment="\w+",
period="p\d{2}",
run="r\d{3}",
datatype="\w{3}",
timestamp="\d{8}T\d{6}Z",
localrules:
gen_filelist,
autogen_output,
raw_par_catalog = ds.pars_key_resolve.get_par_catalog(
["-*-*-*-cal"],
[
get_pattern_unsorted_data(setup),
get_pattern_tier_daq(setup),
get_pattern_tier_raw(setup),
],
{"cal": ["par_raw"]},
)
onstart:
print("Starting workflow")
raw_par_cat_file = os.path.join(pars_path(setup), "raw", "validity.jsonl")
if os.path.isfile(raw_par_cat_file):
os.remove(os.path.join(pars_path(setup), "raw", "validity.jsonl"))
pathlib.Path(os.path.dirname(raw_par_cat_file)).mkdir(parents=True, exist_ok=True)
ds.pars_key_resolve.write_to_jsonl(raw_par_catalog, raw_par_cat_file)
onsuccess:
print("Workflow finished, no error")
shell("rm *.gen || true")
shell(f"rm {filelist_path(setup)}/* || true")
include: "rules/common.smk"
include: "rules/filelist_gen.smk"
include: "rules/main.smk"
include: "rules/raw.smk"
include: "rules/blinding_check.smk"
rule gen_filelist:
"""
Generate file list.
"""
input:
lambda wildcards: get_filelist(
wildcards,
setup,
get_tier_pattern(wildcards.tier),
ignore_keys_file=os.path.join(configs, "empty_keys.keylist"),
analysis_runs_file=None,
),
output:
os.path.join(filelist_path(setup), "{label}-{tier}.filelist"),
run:
with open(output[0], "w") as f:
for fn in input:
f.write(f"{fn}\n")
rule sort_data:
"""
This rules moves the daq data from the unsorted sandbox dir
to the sorted dirs under generated
"""
input:
get_pattern_unsorted_data(setup),
output:
get_pattern_tier_daq(setup),
shell:
"mv {input} {output}"