From cb1a37dc95b944df4023e66caebf72e72873e60b Mon Sep 17 00:00:00 2001
From: Klaus Rabbertz <klaus.rabbertz@cern.ch>
Date: Sat, 20 Apr 2024 16:42:45 +0200
Subject: [PATCH] Improve Singe- and MultiGridClosure to skip in case first
 seeds; always give return 0 for force = True

---
 analysis/tasks/CopyFastWarm.py      |  2 +-
 analysis/tasks/CopyRewarmup.py      |  2 +-
 analysis/tasks/CopyWarmup.py        |  2 +-
 analysis/tasks/MultiGridClosure.py  | 40 +++++++++++++++-------
 analysis/tasks/PlotRuntime.py       |  2 +-
 analysis/tasks/PlotVegasGrids.py    |  2 +-
 analysis/tasks/SingleGridClosure.py | 51 +++++++++++++++++++++--------
 7 files changed, 71 insertions(+), 30 deletions(-)

diff --git a/analysis/tasks/CopyFastWarm.py b/analysis/tasks/CopyFastWarm.py
index 36a95fe..664d63e 100644
--- a/analysis/tasks/CopyFastWarm.py
+++ b/analysis/tasks/CopyFastWarm.py
@@ -28,7 +28,7 @@ class CopyFastWarm(Task, TarballExtractionMixin, law.LocalWorkflow):
 
     def workflow_requires(self):
         if self.force:
-            return {"fastwarm": FastWarm.req(self, acceptance=0)}
+            return 0
         else:
             return {"fastwarm": FastWarm.req(self)}
 
diff --git a/analysis/tasks/CopyRewarmup.py b/analysis/tasks/CopyRewarmup.py
index ba2f3e5..38e16f6 100644
--- a/analysis/tasks/CopyRewarmup.py
+++ b/analysis/tasks/CopyRewarmup.py
@@ -43,7 +43,7 @@ class CopyRewarmup(Task, TarballExtractionMixin, law.LocalWorkflow):
 
     def workflow_requires(self):
         if self.force:
-            return {"rewarmup": Rewarmup.req(self, acceptance=0)}
+            return 0
         else:
             return {"rewarmup": Rewarmup.req(self)}
 
diff --git a/analysis/tasks/CopyWarmup.py b/analysis/tasks/CopyWarmup.py
index 6acf515..a581a88 100644
--- a/analysis/tasks/CopyWarmup.py
+++ b/analysis/tasks/CopyWarmup.py
@@ -43,7 +43,7 @@ class CopyWarmup(Task, TarballExtractionMixin, law.LocalWorkflow):
 
     def workflow_requires(self):
         if self.force:
-            return {"warmup": Warmup.req(self, acceptance=0)}
+            return 0
         else:
             return {"warmup": Warmup.req(self)}
 
diff --git a/analysis/tasks/MultiGridClosure.py b/analysis/tasks/MultiGridClosure.py
index 41d1e6c..4adf3c4 100644
--- a/analysis/tasks/MultiGridClosure.py
+++ b/analysis/tasks/MultiGridClosure.py
@@ -48,7 +48,7 @@ class MultiGridClosure(Task, law.LocalWorkflow):
 
     def workflow_requires(self):
         if self.force:
-            return {"fnlocppread": FnloCppread.req(self, acceptance=0)}
+            return 0
         else:
             return {"fnlocppread": FnloCppread.req(self)}
 
@@ -73,17 +73,33 @@ class MultiGridClosure(Task, law.LocalWorkflow):
             with self.output().temporary_path() as self.temp_output_path:
                 os.mkdir(self.temp_output_path)
 
-                tabglob = (
-                    "{self.merge_dir}/{self.name}/{self.branch_data[channel]}/"
-                    "*.{self.branch_data[observable]}.s{self.branch_data[seed]}.tab.gz"
-                ).format(self=self)
-                logger.debug("Table glob: %s", tabglob)
-                tables = glob.glob(tabglob)
-
-                if not tables:
-                    raise RuntimeError('No matching file for table glob {}.'.format(tabglob))
-
-                # Loop over all matching tables is done in python script; only take the first one from all matches
+                tablepath = os.path.join(self.merge_dir, self.name, self.branch_data['channel'])
+                logger.debug('Table path is: %s', tablepath)
+                # FastProd starting seed for channel
+                iseed = self.branch_data['seed']
+                # FastProd job number for channel
+                njobs = FastProd().fastprod_jobs[self.branch_data['index']]
+                fseed = iseed + njobs
+                # Find smallest seed for which table exists (and corresponding dat file, see check in FnloCppread)
+                ind = iseed
+                iseed = -1
+                while ind < fseed:
+                    tableglob = os.path.join(tablepath,'*.{}.s{}.tab.gz'.format(self.branch_data['observable'],ind))
+                    logger.debug("Table glob: %s", tableglob)
+                    tables = glob.glob(tableglob)
+                    # There should be exactly one matching table
+                    if tables:
+                        iseed = ind
+                        break
+                    else:
+                        logger.warning('No table found for table glob {} and job seed {}. Trying to find first table in next job!'.format(tableglob,ind))
+                        ind = ind+1
+
+                if iseed < 0:
+                    message = "No job found for channel {} with a table for observable {}. Multigrid closure plots not possible! Fail!".format(self.branch_data['channel'],self.branch_data['observable'])
+                    raise RuntimeError(message)
+
+                # Loop over all matching tables is done in python script; only take the first one
                 tabfile = tables[0]
                 logger.debug("First table file: %s", tabfile)
 
diff --git a/analysis/tasks/PlotRuntime.py b/analysis/tasks/PlotRuntime.py
index b422ddf..68521ac 100644
--- a/analysis/tasks/PlotRuntime.py
+++ b/analysis/tasks/PlotRuntime.py
@@ -43,7 +43,7 @@ class PlotRuntime(Task, law.LocalWorkflow):
 
     def workflow_requires(self):
         if self.force:
-            return {"copytables": CopyTables.req(self, acceptance=0)}
+            return 0
         else:
             return {"copytables": CopyTables.req(self)}
 
diff --git a/analysis/tasks/PlotVegasGrids.py b/analysis/tasks/PlotVegasGrids.py
index 778d88c..2b7adb3 100644
--- a/analysis/tasks/PlotVegasGrids.py
+++ b/analysis/tasks/PlotVegasGrids.py
@@ -55,7 +55,7 @@ class PlotVegasGrids(Task, law.LocalWorkflow):
 
     def workflow_requires(self):
         if self.force:
-            return {"copywarmup": CopyWarmup.req(self, acceptance=0)}
+            return 0
         else:
             return {"copywarmup": CopyWarmup.req(self)}
 
diff --git a/analysis/tasks/SingleGridClosure.py b/analysis/tasks/SingleGridClosure.py
index bd70008..56cf5ba 100644
--- a/analysis/tasks/SingleGridClosure.py
+++ b/analysis/tasks/SingleGridClosure.py
@@ -40,14 +40,12 @@ class SingleGridClosure(Task, law.LocalWorkflow):
         """Branch data dicts. Each branch should correspond to exactly one output directory."""
         return {
             i : {'channel': ch, 'index': ch_index, 'observable': obs, 'seed': FastProd().starting_seeds[ch_index]}
-            for i, ((ch_index, ch), obs) in enumerate(product(
-                enumerate(self.channels), self.observables,
-            ))
+            for i, ((ch_index, ch), obs) in enumerate(product(enumerate(self.channels), self.observables, ))
         }
 
     def workflow_requires(self):
         if self.force:
-            return {"fnlocppread": FnloCppread.req(self, acceptance=0)}
+            return 0
         else:
             return {"fnlocppread": FnloCppread.req(self)}
 
@@ -68,17 +66,44 @@ class SingleGridClosure(Task, law.LocalWorkflow):
         )
 
     def run(self):
-        tabglob = (
-            "{self.merge_dir}/{self.name}/{self.branch_data[channel]}/"
-            "*.{self.branch_data[observable]}.s{self.branch_data[seed]}.tab.gz"
-        ).format(self=self)
-        logger.debug("Table glob: %s", tabglob)
-        tables = glob.glob(tabglob)
+        tablepath = os.path.join(self.merge_dir, self.name, self.branch_data['channel'])
+        logger.debug('Table path is: %s', tablepath)
+        # FastProd starting seed for channel
+        iseed = self.branch_data['seed']
+        # FastProd job number for channel
+        njobs = FastProd().fastprod_jobs[self.branch_data['index']]
+        fseed = iseed + njobs
+        # Find smallest seed for which all tables exist (and corresponding dat files, see check in FnloCppread)
+        nobs  = len(self.observables)
+        tableglob = os.path.join(tablepath,'*.s{}.tab.gz'.format(self.branch_data['seed']))
+        tables = glob.glob(tableglob)
+        ntabs = len(tables)
+        logger.debug('Test table glob is: %s', tableglob)
+        logger.debug('Starting seed: {}, no. of jobs: {}, no. of observables: {}, no. table files: {}'.format(iseed,njobs,nobs,ntabs))
+        if nobs != ntabs:
+            logger.warning('Only {} tables available instead of {} for first job seed {}. Trying to find job with table for each observable!'.format(ntabs,nobs,iseed))
+            ind = iseed+1
+            iseed = -1
+            while ind < fseed:
+                tableglob = os.path.join(tablepath,'*.s{}.tab.gz'.format(ind))
+                tables = glob.glob(tableglob)
+                ntabs = len(tables)
+                logger.debug('Found {} tables out of {} for seed {}.'.format(ntabs,nobs,ind))
+                if ntabs == nobs:
+                    iseed = ind
+                    break
+                else:
+                    ind = ind+1
+
+        if iseed < 0:
+            message = "No job found for channel {} with table for each observable. Single job closure plots not possible! Fail!".format(self.branch_data['channel'])
+            raise RuntimeError(message)
 
-        if not tables:
-            raise RuntimeError('No matching file for table glob {}.'.format(tabglob))
+        tableglob = os.path.join(tablepath,'*.{}.s{}.tab.gz'.format(self.branch_data['observable'],iseed))
+        logger.debug('Table glob is: %s', tableglob)
+        tables = glob.glob(tableglob)
 
-        # Only take the first one from all matches
+        # There should be exactly one matching table
         tabfile = tables[0]
         logger.debug("Table file: %s", tabfile)
 
-- 
GitLab