Skip to content

Commit

Permalink
Merge branch 'master' into selection_in_hists
Browse files Browse the repository at this point in the history
  • Loading branch information
riga authored Nov 26, 2024
2 parents a6bf144 + f2b8359 commit ff70e5a
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 15 deletions.
1 change: 1 addition & 0 deletions analysis_templates/cms_minimal/law.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ skip_ensure_proxy: False
# some remote workflow parameter defaults
htcondor_flavor: $CF_HTCONDOR_FLAVOR
htcondor_share_software: False
htcondor_disk: -1
slurm_flavor: $CF_SLURM_FLAVOR
slurm_partition: $CF_SLURM_PARTITION

Expand Down
2 changes: 1 addition & 1 deletion columnflow/tasks/framework/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def req_params(cls, inst: AnalysisTask, **kwargs) -> dict:
_prefer_cli = law.util.make_set(kwargs.get("_prefer_cli", [])) | {
"version", "workflow", "job_workers", "poll_interval", "walltime", "max_runtime",
"retries", "acceptance", "tolerance", "parallel_jobs", "shuffle_jobs", "htcondor_cpus",
"htcondor_gpus", "htcondor_memory", "htcondor_pool", "pilot",
"htcondor_gpus", "htcondor_memory", "htcondor_disk", "htcondor_pool", "pilot",
}
kwargs["_prefer_cli"] = _prefer_cli

Expand Down
20 changes: 19 additions & 1 deletion columnflow/tasks/framework/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,11 @@ def common_destination_info(self, info: dict[str, str]) -> dict[str, str]:

_default_htcondor_flavor = law.config.get_expanded("analysis", "htcondor_flavor", law.NO_STR)
_default_htcondor_share_software = law.config.get_expanded_boolean("analysis", "htcondor_share_software", False)
_default_htcondor_disk = law.util.parse_bytes(
law.config.get_expanded_float("analysis", "htcondor_disk", law.NO_FLOAT),
input_unit="GB",
unit="GB",
)


class HTCondorWorkflow(AnalysisTask, law.htcondor.HTCondorWorkflow, RemoteWorkflowMixin):
Expand Down Expand Up @@ -565,9 +570,16 @@ class HTCondorWorkflow(AnalysisTask, law.htcondor.HTCondorWorkflow, RemoteWorkfl
default=law.NO_FLOAT,
unit="MB",
significant=False,
description="requested memeory in MB; empty value leads to the cluster default setting; "
description="requested memory in MB; empty value leads to the cluster default setting; "
"empty default",
)
htcondor_disk = law.BytesParameter(
default=_default_htcondor_disk,
unit="GB",
significant=False,
description="requested disk space in GB; empty value leads to the cluster default setting; "
f"{'empty default' if _default_htcondor_disk <= 0 else 'default: ' + str(_default_htcondor_disk)}",
)
htcondor_flavor = luigi.ChoiceParameter(
default=_default_htcondor_flavor,
choices=(
Expand Down Expand Up @@ -697,6 +709,12 @@ def htcondor_job_config(self, config, job_num, branches):
if self.htcondor_memory is not None and self.htcondor_memory > 0:
config.custom_content.append(("Request_Memory", self.htcondor_memory))

# request disk space
if self.htcondor_disk is not None and self.htcondor_disk > 0:
# TODO: the exact conversion might be flavor dependent in the future, use kB for npw
# e.g. https://confluence.desy.de/pages/viewpage.action?pageId=128354529
config.custom_content.append(("RequestDisk", self.htcondor_disk * 1024**2))

# render variables
config.render_variables["cf_bootstrap_name"] = "htcondor_standalone"
if self.htcondor_flavor not in ("", law.NO_STR):
Expand Down
49 changes: 41 additions & 8 deletions columnflow/tasks/reduction.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,8 @@ class MergeReductionStats(
SelectorStepsMixin,
CalibratorsMixin,
DatasetTask,
law.LocalWorkflow,
RemoteWorkflow,
):

n_inputs = luigi.IntParameter(
Expand All @@ -279,6 +281,7 @@ class MergeReductionStats(

# upstream requirements
reqs = Requirements(
RemoteWorkflow.reqs,
ReduceEvents=ReduceEvents,
)

Expand All @@ -298,10 +301,31 @@ def resolve_param_values(cls, params):

return params

def create_branch_map(self):
# single branch without payload
return {0: None}

def workflow_requires(self):
reqs = super().workflow_requires()
if self.merged_size == 0:
return reqs

reqs["events"] = self.reqs.ReduceEvents.req_different_branching(
self,
branches=((0, self.n_inputs),),
)
return reqs

def requires(self):
if self.merged_size == 0:
return []
return self.reqs.ReduceEvents.req(self, branches=((0, self.n_inputs),))

return self.reqs.ReduceEvents.req_different_branching(
self,
workflow="local",
branches=((0, self.n_inputs),),
_exclude={"branch"},
)

def output(self):
return {"stats": self.target(f"stats_n{self.n_inputs}.json")}
Expand Down Expand Up @@ -429,7 +453,7 @@ def is_sandboxed(self):
@law.workflow_property(setter=True, cache=True, empty_value=0)
def file_merging(self):
# check if the merging stats are present
stats = self.reqs.MergeReductionStats.req(self).output()["stats"]
stats = self.reqs.MergeReductionStats.req_different_branching(self, branch=0).output()["stats"]
return stats.load(formatter="json")["merge_factor"] if stats.exists() else 0

@law.dynamic_workflow_condition
Expand All @@ -444,14 +468,14 @@ def create_branch_map(self):

def merge_workflow_requires(self):
return {
"stats": self.reqs.MergeReductionStats.req(self),
"stats": self.reqs.MergeReductionStats.req_different_branching(self),
"events": self.reqs.ReduceEvents.req_different_branching(self, branches=((0, -1),)),
}

def merge_requires(self, start_branch, end_branch):
return {
"stats": self.reqs.MergeReductionStats.req(self),
"events": self.reqs.ReduceEvents.req(
"stats": self.reqs.MergeReductionStats.req_different_branching(self, branch=0),
"events": self.reqs.ReduceEvents.req_different_branching(
self,
branches=((start_branch, end_branch),),
workflow="local",
Expand Down Expand Up @@ -508,6 +532,7 @@ class ProvideReducedEvents(
CalibratorsMixin,
DatasetTask,
law.LocalWorkflow,
RemoteWorkflow,
):

skip_merging = luigi.BoolParameter(
Expand All @@ -524,18 +549,26 @@ class ProvideReducedEvents(

# upstream requirements
reqs = Requirements(
RemoteWorkflow.reqs,
ReduceEvents=ReduceEvents,
MergeReductionStats=MergeReductionStats,
MergeReducedEvents=MergeReducedEvents,
)

@classmethod
def _resolve_workflow_parameters(cls, params):
# always fallback to local workflows
params["effective_workflow"] = "local"

return super()._resolve_workflow_parameters(params)

@law.workflow_property(setter=True, cache=True, empty_value=0)
def file_merging(self):
if self.skip_merging or self.dataset_info_inst.n_files == 1:
return 1

# check if the merging stats are present
stats = self.reqs.MergeReductionStats.req(self).output()["stats"]
stats = self.reqs.MergeReductionStats.req_different_branching(self, branch=0).output()["stats"]
return stats.load(formatter="json")["merge_factor"] if stats.exists() else 0

@law.dynamic_workflow_condition
Expand Down Expand Up @@ -576,7 +609,7 @@ def workflow_requires(self):
reqs["events"] = self._req_reduced_events()
else:
# here, the merging is unclear so require the stats
reqs["reduction_stats"] = self.reqs.MergeReductionStats.req(self)
reqs["reduction_stats"] = self.reqs.MergeReductionStats.req_different_branching(self)

if self.force_merging:
# require merged events when forced
Expand All @@ -598,7 +631,7 @@ def requires(self):
if self.skip_merging or (not self.force_merging and self.dataset_info_inst.n_files == 1):
reqs["events"] = self._req_reduced_events()
else:
reqs["reduction_stats"] = self.reqs.MergeReductionStats.req(self)
reqs["reduction_stats"] = self.reqs.MergeReductionStats.req_different_branching(self, branch=0)

if self.force_merging:
reqs["events"] = self._req_merged_reduced_events()
Expand Down
9 changes: 4 additions & 5 deletions columnflow/tasks/selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,7 @@ class MergeSelectionStats(
CalibratorsMixin,
DatasetTask,
law.tasks.ForestMerge,
RemoteWorkflow,
):
# flag that sets the *hists* output to optional if True
selection_hists_optional = default_selection_hists_optional
Expand All @@ -296,11 +297,9 @@ class MergeSelectionStats(
# merge 25 stats files into 1 at every step of the merging cascade
merge_factor = 25

# skip receiving some parameters via req
exclude_params_req_get = {"workflow"}

# upstream requirements
reqs = Requirements(
RemoteWorkflow.reqs,
SelectEvents=SelectEvents,
)

Expand All @@ -309,10 +308,10 @@ def create_branch_map(self):
return law.tasks.ForestMerge.create_branch_map(self)

def merge_workflow_requires(self):
return self.reqs.SelectEvents.req(self, _exclude={"branches"})
return self.reqs.SelectEvents.req_different_branching(self, _exclude={"branches"})

def merge_requires(self, start_branch, end_branch):
return self.reqs.SelectEvents.req(
return self.reqs.SelectEvents.req_different_branching(
self,
branches=((start_branch, end_branch),),
workflow="local",
Expand Down
1 change: 1 addition & 0 deletions law.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ skip_ensure_proxy: False
# some remote workflow parameter defaults
htcondor_flavor: $CF_HTCONDOR_FLAVOR
htcondor_share_software: False
htcondor_disk: -1
slurm_flavor: $CF_SLURM_FLAVOR
slurm_partition: $CF_SLURM_PARTITION

Expand Down

0 comments on commit ff70e5a

Please sign in to comment.