Skip to content
Snippets Groups Projects
Commit 673125ed authored by Cedric Verstege's avatar Cedric Verstege
Browse files

Improve Excalibur WF by chaching files for energy shifts

parent 219ed52d
Branches
No related tags found
No related merge requests found
......@@ -18,12 +18,12 @@ class CommonConfig(luigi.Config):
class ExcaliburConfig(luigi.Config):
excalibur_config_version = luigi.Parameter()
jet_radius = luigi.Parameter(default="4")
energy_shift = luigi.EnumParameter(enum=Energy_Shifts, default=Energy_Shifts.NONE)
jec_uncertainty_source = luigi.Parameter(default="Total")
@inherits(CommonConfig)
class LumberjackConfig(luigi.Config):
energy_shift = luigi.EnumParameter(enum=Energy_Shifts, default=Energy_Shifts.NONE)
jec_uncertainty_source = luigi.Parameter(default="Total")
excalibur_config_version = luigi.Parameter()
jet_radius = luigi.Parameter(default="4")
karma_config = luigi.Parameter(default="zjet_excalibur")
......
......@@ -65,11 +65,11 @@ excalibur_env = set_environment_variables(
)
def run_command(command, env, *args, **kwargs):
def run_command(command, *args, **kwargs):
command_str = " ".join(command)
logger.info(f'Running command: "{command_str}"')
code, out, error = interruptable_popen(
command, *args, env=env, stdout=PIPE, stderr=PIPE, **kwargs
command, *args, stdout=PIPE, stderr=PIPE, **kwargs
)
if code != 0:
raise RuntimeError(
......
......@@ -4,7 +4,6 @@ import law
import luigi
from analysis.framework.dataset_map import cross_sections, dataset_map
from analysis.framework.tasks import AnalysisTask, ExcaliburConfig, ExternalCMSSWTask
from analysis.framework.utils import Energy_Shifts, get_energy_shift_ext
from analysis.tasks.Excalibur.CreatePileupWeights import (
CreatePileupWeights,
PileupShift,
......@@ -46,11 +45,6 @@ class CreateExcaliburMCConfig(AnalysisTask):
"GetGeneratorWeights": GetGeneratorWeights.req(self),
"Skim": SkimDataset.req(self),
}
if self.dataset in DY_SETS:
for dataset in DY_SETS:
_ret["GetGeneratorWeights_{}".format(dataset)] = (
GetGeneratorWeights.req(self, dataset=dataset)
)
return _ret
def output(self):
......@@ -60,7 +54,7 @@ class CreateExcaliburMCConfig(AnalysisTask):
f"{self.cmssw_path}/src/Excalibur/cfg/excalibur",
f"zjet{self.data_period}",
self.excalibur_config_version,
f"{self.data_period}_{self.dataset}{jet_ext}{get_energy_shift_ext(self.energy_shift, self.jec_uncertainty_source)}.py",
f"{self.data_period}_{self.dataset}{jet_ext}.py",
)
)
......@@ -70,27 +64,16 @@ class CreateExcaliburMCConfig(AnalysisTask):
dataset_vars = dataset_map[self.data_period]["common_mc"]
dataset_vars.update(dataset_map[self.data_period][self.dataset])
jer_shift = 0
if self.energy_shift == Energy_Shifts.JER_UP:
jer_shift = +1
elif self.energy_shift == Energy_Shifts.JER_DOWN:
jer_shift = -1
jec_unc_shift = 0.0
if self.energy_shift == Energy_Shifts.JEC_UP:
jec_unc_shift = +1.0
elif self.energy_shift == Energy_Shifts.JEC_DOWN:
jec_unc_shift = -1.0
dataset_wildcards = [
col.dir.uri() + "/*"
for col in self.input()["Skim"]["collection"].collections
]
variables = dict(
year=str(self.data_period).strip("ulUL_APVprepostVFP"),
dataset_files=self.input()["Skim"]["collection"].uri(),
dataset_files=dataset_wildcards,
jet_radius=self.jet_radius,
jec=dataset_vars["jec_version"],
jec_unc=self.jec_uncertainty_source,
jec_unc_shift=jec_unc_shift,
jer=dataset_vars["jer_version"],
jer_shift=int(jer_shift),
roccor=dataset_vars["roccor"],
pileup_weight_file=os.path.abspath(self.input()["PileUpWeights"].abspath),
pileup_weight_file_up=os.path.abspath(
......
import concurrent.futures
import json
import os
from copy import deepcopy
import law
import luigi
......@@ -16,8 +17,11 @@ from analysis.tasks.Excalibur import CreateExcaliburDataConfig, CreateExcaliburM
from analysis.tasks.Remote import HTCondorWorkflow
from analysis.tasks.Skimming import SkimDataset
from law.decorator import localize
from law.logger import get_logger
from luigi.util import inherits
logger = get_logger(__name__)
@inherits(ExcaliburConfig)
class ExcaliburJsonConfig(AnalysisTask):
......@@ -33,8 +37,7 @@ class ExcaliburJsonConfig(AnalysisTask):
jet_ext = "_Puppi" if "Puppi" in self.jet_type else ""
return self.remote_target(
self.excalibur_config_version,
self.dataset,
f"{self.dataset}{jet_ext}{get_energy_shift_ext(self.energy_shift, self.jec_uncertainty_source)}.json",
f"{self.dataset}{jet_ext}.json",
)
def run(self):
......@@ -52,7 +55,7 @@ class ExcaliburJsonConfig(AnalysisTask):
class ExcaliburWorkflow(AnalysisTask, HTCondorWorkflow, law.LocalWorkflow):
jet_type = luigi.Parameter(default="AK4")
files_per_job = luigi.IntParameter(
default=20,
default=2,
description="Number of files analyzed per job.",
)
exclude_params_req = {"files_per_job"}
......@@ -65,7 +68,7 @@ class ExcaliburWorkflow(AnalysisTask, HTCondorWorkflow, law.LocalWorkflow):
return self.remote_target(
self.excalibur_config_version,
self.dataset,
f"{self.dataset}{jet_ext}{get_energy_shift_ext(self.energy_shift, self.jec_uncertainty_source)}",
f"{self.dataset}{jet_ext}",
f"job_{self.branch}.root",
)
......@@ -75,19 +78,98 @@ class ExcaliburWorkflow(AnalysisTask, HTCondorWorkflow, law.LocalWorkflow):
def requires(self):
return {"skim": SkimDataset.req(self, branch=-1, branches=self.branch_data)}
@localize(input=False, output=True)
@localize(input=True, output=True)
def run(self):
input_files = self.input()["skim"]["collection"].uri()
branch_cfg = law.LocalFileTarget(is_tmp="json")
with self.workflow_requires()["json_cfg"].output().localize() as json_cfg:
tmp = json_cfg.open(mode="r").read()
tmp = tmp.replace("$CMSSW_DIR", os.environ["CMSSW_DIR"])
_d = json.loads(tmp)
_d["InputFiles"] = input_files
_d["OutputPath"] = self.output().abspath
branch_cfg.dump(_d, indent=4)
command = ["excalibur", branch_cfg.abspath]
run_command(command, env=excalibur_env, cwd=branch_cfg.parent.abspath)
# cwd = law.LocalDirectoryTarget(is_tmp="excalibur")
cwd = law.LocalDirectoryTarget("/work/cverstege/ZJet/zjet_analysis/test/")
cwd.touch()
with self.workflow_input()["json_cfg"].localize() as json_cfg:
cfg = json_cfg.open(mode="r").read()
cfg = cfg.replace("$CMSSW_DIR", self.cmssw_path)
cfg_dict = json.loads(cfg)
cfg_dict["InputFiles"] = self.input()["skim"]["collection"].uri()
if "data" in self.dataset:
raise NotImplementedError(
"Implementing data should be easy, just skip all shifts"
)
# build list of all shifts to run
shift_list = []
for energy_shift in Energy_Shifts:
if energy_shift in (Energy_Shifts.JEC_UP, Energy_Shifts.JEC_DOWN):
for jec in jec_uncertainty_sources:
shift_list.append((energy_shift, jec))
else:
shift_list.append((energy_shift, "Total"))
# execute excalibur for each energy shift in parallel up to the requested number of CPUs
futures = []
output_files = []
with concurrent.futures.ThreadPoolExecutor(
max_workers=self.htcondor_cpus
) as executor:
for energy_shift, jec in shift_list:
shift_json = law.LocalFileTarget(
f"{cwd.abspath}/{energy_shift.name}_{jec}.json",
)
shift_root = law.LocalFileTarget(
f"{cwd.abspath}/{energy_shift.name}_{jec}.root"
)
output_files.append(shift_root)
cfg_shift = deepcopy(cfg_dict)
cfg_shift["OutputPath"] = shift_root.abspath
cfg_shift["JetEnergyCorrectionUncertaintySource"] = jec
if energy_shift == Energy_Shifts.NONE:
pass
elif energy_shift == Energy_Shifts.JER_UP:
cfg_shift["JERShift"] = int(1)
cfg_shift["Pipelines"].pop("genzjetcuts_L1L2L3")
cfg_shift["Pipelines"]["zjetcuts_L1L2L3_JER_UP"] = cfg_shift[
"Pipelines"
].pop("zjetcuts_L1L2L3")
elif energy_shift == Energy_Shifts.JER_DOWN:
cfg_shift["JERShift"] = int(-1)
cfg_shift["Pipelines"].pop("genzjetcuts_L1L2L3")
cfg_shift["Pipelines"]["zjetcuts_L1L2L3_JER_DOWN"] = cfg_shift[
"Pipelines"
].pop("zjetcuts_L1L2L3")
elif energy_shift == Energy_Shifts.JEC_UP:
cfg_shift["JetEnergyCorrectionUncertaintyShift"] = 1.0
cfg_shift["Pipelines"].pop("genzjetcuts_L1L2L3")
cfg_shift["Pipelines"][f"zjetcuts_L1L2L3_JEC_{jec}_UP"] = cfg_shift[
"Pipelines"
].pop("zjetcuts_L1L2L3")
elif energy_shift == Energy_Shifts.JEC_DOWN:
cfg_shift["JetEnergyCorrectionUncertaintyShift"] = -1.0
cfg_shift["Pipelines"].pop("genzjetcuts_L1L2L3")
cfg_shift["Pipelines"][f"zjetcuts_L1L2L3_JEC_{jec}_DOWN"] = (
cfg_shift["Pipelines"].pop("zjetcuts_L1L2L3")
)
else:
raise ValueError(f"Unknown energy shift: {energy_shift}")
shift_json.dump(cfg_shift, indent=4)
logger.info(f"Adding json_cfg: {shift_json.abspath}")
future = executor.submit(
run_command,
["excalibur", shift_json.abspath],
env=excalibur_env,
cwd=cwd.abspath,
)
futures.append(future)
# Wait for all futures to complete and check for errors
for future in concurrent.futures.as_completed(futures):
try:
future.result()
except RuntimeError as e:
raise e
# Hadd all shifts and move to remote storage
filelist = [x.abspath for x in output_files]
command = ["hadd", "-f", self.output().abspath] + filelist
self.output().touch()
run_command(command)
@inherits(ExcaliburConfig)
......
[DEFAULT]
cmssw_path = ${CMSSW_DIR}
[BundleCMSSW]
[SkimDataset]
......@@ -19,8 +16,9 @@ htcondor_walltime: 12h
[ExcaliburJsonConfig]
[ExcaliburWorkflow]
htcondor_disk: 5GB
htcondor_memory: 2GB
htcondor_cpus: 4
htcondor_disk: 6GB
htcondor_memory: 6GB
htcondor_walltime: 6h
[ExcaliburMerge]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment