From 8357bca0665f987ac039202805894f354f2b7ba8 Mon Sep 17 00:00:00 2001 From: Klaus Rabbertz <klaus.rabbertz@cern.ch> Date: Fri, 19 Apr 2024 15:45:02 +0200 Subject: [PATCH] Add automised FnloCppread quality check and removal of incomplete parts --- analysis/tasks/CopyTables.py | 5 +- analysis/tasks/FnloCppread.py | 126 ++++++++++++++++++++++++++-------- 2 files changed, 102 insertions(+), 29 deletions(-) diff --git a/analysis/tasks/CopyTables.py b/analysis/tasks/CopyTables.py index 8a350df..63ece00 100644 --- a/analysis/tasks/CopyTables.py +++ b/analysis/tasks/CopyTables.py @@ -35,7 +35,10 @@ class CopyTables(Task, TarballExtractionMixin, law.LocalWorkflow): def workflow_requires(self): if self.force: - return {"fastprod": FastProd.req(self, acceptance=0)} + # Do not really check using 'force' since very time consuming. + # A simple change of the acceptance can easily be done using cmd line options. + # return return {"fastprod": FastProd.req(self, acceptance=0)} + return 0 else: return {"fastprod": FastProd.req(self)} diff --git a/analysis/tasks/FnloCppread.py b/analysis/tasks/FnloCppread.py index c7a5783..6deb8a6 100644 --- a/analysis/tasks/FnloCppread.py +++ b/analysis/tasks/FnloCppread.py @@ -4,7 +4,9 @@ import law import luigi import logging import os +import pathlib import re +import shutil from subprocess import PIPE @@ -38,7 +40,10 @@ class FnloCppread(Task, law.LocalWorkflow): def workflow_requires(self): if self.force: - return {"copytables": CopyTables.req(self, acceptance=0)} + # Do not really check using 'force' since very time consuming. + # A simple change of the acceptance can easily be done using cmd line options. + # return {"copytables": CopyTables.req(self, acceptance=0)} + return 0 else: return {"copytables": CopyTables.req(self)} @@ -54,25 +59,49 @@ class FnloCppread(Task, law.LocalWorkflow): ) def run(self): - job_ok = True # set to False if a subtask fails -> no tag file written + job_ok = True # set to False if files are not found or all subtasks fail -> no tag file written try: - tableglob = os.path.join( - self.merge_dir, self.name, self.branch_data['channel'], - '*.s{}.tab.gz'.format(self.branch_data['seed']) - ) + tablepath = os.path.join(self.merge_dir, self.name, self.branch_data['channel']) + logger.debug('Table path is: %s', tablepath) + jobfailpath = os.path.join(tablepath,"Failed") + logger.debug('Job fail path is: %s', jobfailpath) + tableglob = os.path.join(tablepath,'*.s{}.tab.gz'.format(self.branch_data['seed'])) logger.debug('Table glob is: %s', tableglob) + crossglob = os.path.join(tablepath,'*.cross.s{}.dat'.format(self.branch_data['seed'])) + logger.debug('Cross glob is: %s', crossglob) + joblogglob = os.path.join(tablepath,'*.fastprod.s{}.log'.format(self.branch_data['seed'])) + logger.debug('Job log glob is: %s', joblogglob) + jobfailglob = os.path.join(tablepath,'*.s{}.*'.format(self.branch_data['seed'])) + logger.debug('Job fail glob is: %s', jobfailglob) + + files = glob.glob(jobfailglob) + if not files: + message = "Could not find any files for glob {} of branch {}. Counted as FAIL!".format(jobfailglob, self.branch) + raise RuntimeError(message) + + files = glob.glob(crossglob) + if not files: + message = "Could not find total x section file for glob {} of branch {}. Counted as FAIL!".format(crossglob, self.branch) + raise RuntimeError(message) + elif (not os.path.isfile(files[0])) or (os.path.getsize(files[0]) == 0): + message = "Total x section file {} of branch {} not found or empty. Counted as FAIL!".format(files[0], self.branch) + raise RuntimeError(message) + + files = glob.glob(joblogglob) + if not files: + message = "Could not find fastprod log file for glob {} of branch {}. Counted as FAIL!".format(joblogglob, self.branch) + raise RuntimeError(message) + elif (not os.path.isfile(files[0])) or (os.path.getsize(files[0]) == 0): + message = "Fastprod log file {} of branch {} not found or empty. Counted as FAIL!".format(files[0], self.branch) + raise RuntimeError(message) tables = glob.glob(tableglob) - if not tables: - raise RuntimeError( - 'Could not find any input tables for branch %s. ' - 'Glob was: %s'.format( - self.branch, tableglob - ) - ) + if len(tables) != len(self.observables): + logger.warning('Number of tables %s does not match number of observables %s. Only subset of results usable!', + len(tables),len(self.observables)) - ierr = 0 # keep track of failed subtasks + nres = 0 for table in tables: logger.debug("Processing table: %s", table) @@ -81,14 +110,14 @@ class FnloCppread(Task, law.LocalWorkflow): for i, observable in enumerate(self.observables): if observable in table: iscl = i - logger.debug( - "Table name '%s' matches observable %s with index %s", - table, observable, iscl - ) + logger.debug("Table name '%s' matches observable %s with index %s", table, observable, iscl) - logfile = re.sub('.gz$', '', table) - logfile = re.sub('.tab$', '', logfile) + '.log' - logger.debug("Log file is: %s", table) + tmpfile = re.sub('.gz$', '', table) + logfile = re.sub('.tab$', '.log', tmpfile) + logger.debug("Log file is: %s", logfile) + datfile = re.sub('.tab$', '.dat', tmpfile) + logger.debug("Dat file is: %s", datfile) + filestub = re.sub('.tab$', '*', tmpfile) scl = self.scale[0] if len(self.scale) > 1: @@ -104,11 +133,9 @@ class FnloCppread(Task, law.LocalWorkflow): ) command_line_args = [ - 'fnlo-tk-cppread', table, self.pdf, sclcom, - self.ascode, self.norm, scl + 'fnlo-tk-cppread', table, self.pdf, sclcom, self.ascode, self.norm, scl ] - logger.info("Running command: %s", - colored(' '.join(command_line_args), 'yellow')) + logger.info("Running command: %s", colored(' '.join(command_line_args), 'yellow')) code, out, error = interruptable_popen(command_line_args, stdout=PIPE, @@ -117,22 +144,65 @@ class FnloCppread(Task, law.LocalWorkflow): logger.debug("Standard output:\n%s\n", out) logger.debug("Standard error:\n%s\n", error) - if (code == 0): + if (code == 0 and os.path.isfile(datfile) and (os.path.getsize(datfile) > 0)): logger.info("fnlo-tk-cppread status: SUCCESS") # on success, write output to log file with open(logfile, 'w') as outfile: outfile.write(out) + nres = nres + 1 else: - raise RuntimeError("fnlo-tk-cppread status: FAIL (exit code {})".format(code)) + if not os.path.isfile(table): + logger.warning('Table %s is missing. Skipping this subresult!', table) + elif (os.path.getsize(table) == 0): + logger.warning('Table %s is empty. Skipping this subresult!', table) + elif (code != 0): + logger.warning('Evaluation of table %s gives error %d. Skipping this subresult!', table, code) + elif not os.path.isfile(datfile): + logger.warning('Datfile %s is missing. Skipping this subresult!', datfile) + elif (os.path.getsize(datfile) == 0): + logger.warning('Datfile %s is empty. Skipping this subresult!', datfile) + else: + logger.warning('This should not happen. So anyway skipping this subresult!') + files = glob.glob(filestub) + if files: + logger.warning('Moving files of failed subtask with seed %s out of the way!', self.branch_data['seed']) + logger.debug('Creating job fail directory if not yet existing: %s', jobfailpath) + pathlib.Path(jobfailpath).mkdir(parents=True, exist_ok=True) + for file in files: + filename = os.path.basename(file) + destfile = os.path.join(jobfailpath,filename) + if os.path.isfile(destfile): + logger.debug('Removing already existing file %s of failed job subtask ...', destfile) + os.remove(destfile) + logger.debug('Moving file %s of failed job subtask to folder %s ...', file, jobfailpath) + shutil.move(file, jobfailpath) + if nres == 0: + message = "Not even a single subtask of seed {} for branch {} could be completed without error. Counted as FAIL!".format(self.branch_data['seed'],self.branch) + raise RuntimeError(message) # on error, make log entry and reraise except Exception as e: job_ok = False logger.error('%s raised during job execution: %s', e.__class__.__name__, e) + files = glob.glob(jobfailglob) + if files: + logger.error('Moving all files of failed job with seed %s out of the way!', self.branch_data['seed']) + logger.debug('Creating job fail directory if not yet existing: %s', jobfailpath) + pathlib.Path(jobfailpath).mkdir(parents=True, exist_ok=True) + for file in files: + filename = os.path.basename(file) + destfile = os.path.join(jobfailpath,filename) + if os.path.isfile(destfile): + logger.debug('Removing already existing file %s of failed job ...', destfile) + os.remove(destfile) + logger.debug('Moving file %s of failed job to folder %s ...', file, jobfailpath) + shutil.move(file, jobfailpath) + else: + logger.error('No files found for job with seed %s.', self.branch_data['seed']) raise finally: - # if no subtask failed, write tag file to mark task complete + # if at least one subtask succeeded, write tag file to mark task complete if job_ok: self.output().touch() -- GitLab