diff --git a/analysis/tasks/CopyFastWarm.py b/analysis/tasks/CopyFastWarm.py index 36a95fedd68320576343176c243471de9e4d9028..664d63e754fb1d55b279ffe8df0a468d54dd5860 100644 --- a/analysis/tasks/CopyFastWarm.py +++ b/analysis/tasks/CopyFastWarm.py @@ -28,7 +28,7 @@ class CopyFastWarm(Task, TarballExtractionMixin, law.LocalWorkflow): def workflow_requires(self): if self.force: - return {"fastwarm": FastWarm.req(self, acceptance=0)} + return 0 else: return {"fastwarm": FastWarm.req(self)} diff --git a/analysis/tasks/CopyRewarmup.py b/analysis/tasks/CopyRewarmup.py index ba2f3e5bbc832a34c06223e28da1e736e4c21861..38e16f6778d9d520a3c2859b21691178d18bff93 100644 --- a/analysis/tasks/CopyRewarmup.py +++ b/analysis/tasks/CopyRewarmup.py @@ -43,7 +43,7 @@ class CopyRewarmup(Task, TarballExtractionMixin, law.LocalWorkflow): def workflow_requires(self): if self.force: - return {"rewarmup": Rewarmup.req(self, acceptance=0)} + return 0 else: return {"rewarmup": Rewarmup.req(self)} diff --git a/analysis/tasks/CopyWarmup.py b/analysis/tasks/CopyWarmup.py index 6acf5150d6723b3938bf34fdfe3585e9603ed666..a581a88c4f907bb2ec03c879d8ca340348c165a0 100644 --- a/analysis/tasks/CopyWarmup.py +++ b/analysis/tasks/CopyWarmup.py @@ -43,7 +43,7 @@ class CopyWarmup(Task, TarballExtractionMixin, law.LocalWorkflow): def workflow_requires(self): if self.force: - return {"warmup": Warmup.req(self, acceptance=0)} + return 0 else: return {"warmup": Warmup.req(self)} diff --git a/analysis/tasks/MultiGridClosure.py b/analysis/tasks/MultiGridClosure.py index 41d1e6c45b03e7caee378102a12252840e5c3775..4adf3c46f5bfd28947f0eacd323897dc3f1d565c 100644 --- a/analysis/tasks/MultiGridClosure.py +++ b/analysis/tasks/MultiGridClosure.py @@ -48,7 +48,7 @@ class MultiGridClosure(Task, law.LocalWorkflow): def workflow_requires(self): if self.force: - return {"fnlocppread": FnloCppread.req(self, acceptance=0)} + return 0 else: return {"fnlocppread": FnloCppread.req(self)} @@ -73,17 +73,33 @@ class MultiGridClosure(Task, law.LocalWorkflow): with self.output().temporary_path() as self.temp_output_path: os.mkdir(self.temp_output_path) - tabglob = ( - "{self.merge_dir}/{self.name}/{self.branch_data[channel]}/" - "*.{self.branch_data[observable]}.s{self.branch_data[seed]}.tab.gz" - ).format(self=self) - logger.debug("Table glob: %s", tabglob) - tables = glob.glob(tabglob) - - if not tables: - raise RuntimeError('No matching file for table glob {}.'.format(tabglob)) - - # Loop over all matching tables is done in python script; only take the first one from all matches + tablepath = os.path.join(self.merge_dir, self.name, self.branch_data['channel']) + logger.debug('Table path is: %s', tablepath) + # FastProd starting seed for channel + iseed = self.branch_data['seed'] + # FastProd job number for channel + njobs = FastProd().fastprod_jobs[self.branch_data['index']] + fseed = iseed + njobs + # Find smallest seed for which table exists (and corresponding dat file, see check in FnloCppread) + ind = iseed + iseed = -1 + while ind < fseed: + tableglob = os.path.join(tablepath,'*.{}.s{}.tab.gz'.format(self.branch_data['observable'],ind)) + logger.debug("Table glob: %s", tableglob) + tables = glob.glob(tableglob) + # There should be exactly one matching table + if tables: + iseed = ind + break + else: + logger.warning('No table found for table glob {} and job seed {}. Trying to find first table in next job!'.format(tableglob,ind)) + ind = ind+1 + + if iseed < 0: + message = "No job found for channel {} with a table for observable {}. Multigrid closure plots not possible! Fail!".format(self.branch_data['channel'],self.branch_data['observable']) + raise RuntimeError(message) + + # Loop over all matching tables is done in python script; only take the first one tabfile = tables[0] logger.debug("First table file: %s", tabfile) diff --git a/analysis/tasks/PlotRuntime.py b/analysis/tasks/PlotRuntime.py index b422ddfd59f28cd9b9a1346c739b7f2597ce52df..68521ace4a45c4cd89fa0e50d655e02afabfdaa0 100644 --- a/analysis/tasks/PlotRuntime.py +++ b/analysis/tasks/PlotRuntime.py @@ -43,7 +43,7 @@ class PlotRuntime(Task, law.LocalWorkflow): def workflow_requires(self): if self.force: - return {"copytables": CopyTables.req(self, acceptance=0)} + return 0 else: return {"copytables": CopyTables.req(self)} diff --git a/analysis/tasks/PlotVegasGrids.py b/analysis/tasks/PlotVegasGrids.py index 778d88cc920386ae687c6e692117eaaa0f0490a1..2b7adb37f6a0379a0ad5653d358231c99eae8d4d 100644 --- a/analysis/tasks/PlotVegasGrids.py +++ b/analysis/tasks/PlotVegasGrids.py @@ -55,7 +55,7 @@ class PlotVegasGrids(Task, law.LocalWorkflow): def workflow_requires(self): if self.force: - return {"copywarmup": CopyWarmup.req(self, acceptance=0)} + return 0 else: return {"copywarmup": CopyWarmup.req(self)} diff --git a/analysis/tasks/SingleGridClosure.py b/analysis/tasks/SingleGridClosure.py index bd70008fda4d797b552029f1469fae168bdd5d40..56cf5baa822d4593bf8a66bc96e4578d8eadcef0 100644 --- a/analysis/tasks/SingleGridClosure.py +++ b/analysis/tasks/SingleGridClosure.py @@ -40,14 +40,12 @@ class SingleGridClosure(Task, law.LocalWorkflow): """Branch data dicts. Each branch should correspond to exactly one output directory.""" return { i : {'channel': ch, 'index': ch_index, 'observable': obs, 'seed': FastProd().starting_seeds[ch_index]} - for i, ((ch_index, ch), obs) in enumerate(product( - enumerate(self.channels), self.observables, - )) + for i, ((ch_index, ch), obs) in enumerate(product(enumerate(self.channels), self.observables, )) } def workflow_requires(self): if self.force: - return {"fnlocppread": FnloCppread.req(self, acceptance=0)} + return 0 else: return {"fnlocppread": FnloCppread.req(self)} @@ -68,17 +66,44 @@ class SingleGridClosure(Task, law.LocalWorkflow): ) def run(self): - tabglob = ( - "{self.merge_dir}/{self.name}/{self.branch_data[channel]}/" - "*.{self.branch_data[observable]}.s{self.branch_data[seed]}.tab.gz" - ).format(self=self) - logger.debug("Table glob: %s", tabglob) - tables = glob.glob(tabglob) + tablepath = os.path.join(self.merge_dir, self.name, self.branch_data['channel']) + logger.debug('Table path is: %s', tablepath) + # FastProd starting seed for channel + iseed = self.branch_data['seed'] + # FastProd job number for channel + njobs = FastProd().fastprod_jobs[self.branch_data['index']] + fseed = iseed + njobs + # Find smallest seed for which all tables exist (and corresponding dat files, see check in FnloCppread) + nobs = len(self.observables) + tableglob = os.path.join(tablepath,'*.s{}.tab.gz'.format(self.branch_data['seed'])) + tables = glob.glob(tableglob) + ntabs = len(tables) + logger.debug('Test table glob is: %s', tableglob) + logger.debug('Starting seed: {}, no. of jobs: {}, no. of observables: {}, no. table files: {}'.format(iseed,njobs,nobs,ntabs)) + if nobs != ntabs: + logger.warning('Only {} tables available instead of {} for first job seed {}. Trying to find job with table for each observable!'.format(ntabs,nobs,iseed)) + ind = iseed+1 + iseed = -1 + while ind < fseed: + tableglob = os.path.join(tablepath,'*.s{}.tab.gz'.format(ind)) + tables = glob.glob(tableglob) + ntabs = len(tables) + logger.debug('Found {} tables out of {} for seed {}.'.format(ntabs,nobs,ind)) + if ntabs == nobs: + iseed = ind + break + else: + ind = ind+1 + + if iseed < 0: + message = "No job found for channel {} with table for each observable. Single job closure plots not possible! Fail!".format(self.branch_data['channel']) + raise RuntimeError(message) - if not tables: - raise RuntimeError('No matching file for table glob {}.'.format(tabglob)) + tableglob = os.path.join(tablepath,'*.{}.s{}.tab.gz'.format(self.branch_data['observable'],iseed)) + logger.debug('Table glob is: %s', tableglob) + tables = glob.glob(tableglob) - # Only take the first one from all matches + # There should be exactly one matching table tabfile = tables[0] logger.debug("Table file: %s", tabfile)