Skip to content
Snippets Groups Projects
Commit 8357bca0 authored by Klaus Rabbertz's avatar Klaus Rabbertz
Browse files

Add automised FnloCppread quality check and removal of incomplete parts

parent 8d62b5ac
Branches
No related tags found
No related merge requests found
...@@ -35,7 +35,10 @@ class CopyTables(Task, TarballExtractionMixin, law.LocalWorkflow): ...@@ -35,7 +35,10 @@ class CopyTables(Task, TarballExtractionMixin, law.LocalWorkflow):
def workflow_requires(self): def workflow_requires(self):
if self.force: if self.force:
return {"fastprod": FastProd.req(self, acceptance=0)} # Do not really check using 'force' since very time consuming.
# A simple change of the acceptance can easily be done using cmd line options.
# return return {"fastprod": FastProd.req(self, acceptance=0)}
return 0
else: else:
return {"fastprod": FastProd.req(self)} return {"fastprod": FastProd.req(self)}
......
...@@ -4,7 +4,9 @@ import law ...@@ -4,7 +4,9 @@ import law
import luigi import luigi
import logging import logging
import os import os
import pathlib
import re import re
import shutil
from subprocess import PIPE from subprocess import PIPE
...@@ -38,7 +40,10 @@ class FnloCppread(Task, law.LocalWorkflow): ...@@ -38,7 +40,10 @@ class FnloCppread(Task, law.LocalWorkflow):
def workflow_requires(self): def workflow_requires(self):
if self.force: if self.force:
return {"copytables": CopyTables.req(self, acceptance=0)} # Do not really check using 'force' since very time consuming.
# A simple change of the acceptance can easily be done using cmd line options.
# return {"copytables": CopyTables.req(self, acceptance=0)}
return 0
else: else:
return {"copytables": CopyTables.req(self)} return {"copytables": CopyTables.req(self)}
...@@ -54,25 +59,49 @@ class FnloCppread(Task, law.LocalWorkflow): ...@@ -54,25 +59,49 @@ class FnloCppread(Task, law.LocalWorkflow):
) )
def run(self): def run(self):
job_ok = True # set to False if a subtask fails -> no tag file written job_ok = True # set to False if files are not found or all subtasks fail -> no tag file written
try: try:
tableglob = os.path.join( tablepath = os.path.join(self.merge_dir, self.name, self.branch_data['channel'])
self.merge_dir, self.name, self.branch_data['channel'], logger.debug('Table path is: %s', tablepath)
'*.s{}.tab.gz'.format(self.branch_data['seed']) jobfailpath = os.path.join(tablepath,"Failed")
) logger.debug('Job fail path is: %s', jobfailpath)
tableglob = os.path.join(tablepath,'*.s{}.tab.gz'.format(self.branch_data['seed']))
logger.debug('Table glob is: %s', tableglob) logger.debug('Table glob is: %s', tableglob)
crossglob = os.path.join(tablepath,'*.cross.s{}.dat'.format(self.branch_data['seed']))
logger.debug('Cross glob is: %s', crossglob)
joblogglob = os.path.join(tablepath,'*.fastprod.s{}.log'.format(self.branch_data['seed']))
logger.debug('Job log glob is: %s', joblogglob)
jobfailglob = os.path.join(tablepath,'*.s{}.*'.format(self.branch_data['seed']))
logger.debug('Job fail glob is: %s', jobfailglob)
files = glob.glob(jobfailglob)
if not files:
message = "Could not find any files for glob {} of branch {}. Counted as FAIL!".format(jobfailglob, self.branch)
raise RuntimeError(message)
files = glob.glob(crossglob)
if not files:
message = "Could not find total x section file for glob {} of branch {}. Counted as FAIL!".format(crossglob, self.branch)
raise RuntimeError(message)
elif (not os.path.isfile(files[0])) or (os.path.getsize(files[0]) == 0):
message = "Total x section file {} of branch {} not found or empty. Counted as FAIL!".format(files[0], self.branch)
raise RuntimeError(message)
files = glob.glob(joblogglob)
if not files:
message = "Could not find fastprod log file for glob {} of branch {}. Counted as FAIL!".format(joblogglob, self.branch)
raise RuntimeError(message)
elif (not os.path.isfile(files[0])) or (os.path.getsize(files[0]) == 0):
message = "Fastprod log file {} of branch {} not found or empty. Counted as FAIL!".format(files[0], self.branch)
raise RuntimeError(message)
tables = glob.glob(tableglob) tables = glob.glob(tableglob)
if not tables: if len(tables) != len(self.observables):
raise RuntimeError( logger.warning('Number of tables %s does not match number of observables %s. Only subset of results usable!',
'Could not find any input tables for branch %s. ' len(tables),len(self.observables))
'Glob was: %s'.format(
self.branch, tableglob
)
)
ierr = 0 # keep track of failed subtasks nres = 0
for table in tables: for table in tables:
logger.debug("Processing table: %s", table) logger.debug("Processing table: %s", table)
...@@ -81,14 +110,14 @@ class FnloCppread(Task, law.LocalWorkflow): ...@@ -81,14 +110,14 @@ class FnloCppread(Task, law.LocalWorkflow):
for i, observable in enumerate(self.observables): for i, observable in enumerate(self.observables):
if observable in table: if observable in table:
iscl = i iscl = i
logger.debug( logger.debug("Table name '%s' matches observable %s with index %s", table, observable, iscl)
"Table name '%s' matches observable %s with index %s",
table, observable, iscl
)
logfile = re.sub('.gz$', '', table) tmpfile = re.sub('.gz$', '', table)
logfile = re.sub('.tab$', '', logfile) + '.log' logfile = re.sub('.tab$', '.log', tmpfile)
logger.debug("Log file is: %s", table) logger.debug("Log file is: %s", logfile)
datfile = re.sub('.tab$', '.dat', tmpfile)
logger.debug("Dat file is: %s", datfile)
filestub = re.sub('.tab$', '*', tmpfile)
scl = self.scale[0] scl = self.scale[0]
if len(self.scale) > 1: if len(self.scale) > 1:
...@@ -104,11 +133,9 @@ class FnloCppread(Task, law.LocalWorkflow): ...@@ -104,11 +133,9 @@ class FnloCppread(Task, law.LocalWorkflow):
) )
command_line_args = [ command_line_args = [
'fnlo-tk-cppread', table, self.pdf, sclcom, 'fnlo-tk-cppread', table, self.pdf, sclcom, self.ascode, self.norm, scl
self.ascode, self.norm, scl
] ]
logger.info("Running command: %s", logger.info("Running command: %s", colored(' '.join(command_line_args), 'yellow'))
colored(' '.join(command_line_args), 'yellow'))
code, out, error = interruptable_popen(command_line_args, code, out, error = interruptable_popen(command_line_args,
stdout=PIPE, stdout=PIPE,
...@@ -117,22 +144,65 @@ class FnloCppread(Task, law.LocalWorkflow): ...@@ -117,22 +144,65 @@ class FnloCppread(Task, law.LocalWorkflow):
logger.debug("Standard output:\n%s\n", out) logger.debug("Standard output:\n%s\n", out)
logger.debug("Standard error:\n%s\n", error) logger.debug("Standard error:\n%s\n", error)
if (code == 0): if (code == 0 and os.path.isfile(datfile) and (os.path.getsize(datfile) > 0)):
logger.info("fnlo-tk-cppread status: SUCCESS") logger.info("fnlo-tk-cppread status: SUCCESS")
# on success, write output to log file # on success, write output to log file
with open(logfile, 'w') as outfile: with open(logfile, 'w') as outfile:
outfile.write(out) outfile.write(out)
nres = nres + 1
else:
if not os.path.isfile(table):
logger.warning('Table %s is missing. Skipping this subresult!', table)
elif (os.path.getsize(table) == 0):
logger.warning('Table %s is empty. Skipping this subresult!', table)
elif (code != 0):
logger.warning('Evaluation of table %s gives error %d. Skipping this subresult!', table, code)
elif not os.path.isfile(datfile):
logger.warning('Datfile %s is missing. Skipping this subresult!', datfile)
elif (os.path.getsize(datfile) == 0):
logger.warning('Datfile %s is empty. Skipping this subresult!', datfile)
else: else:
raise RuntimeError("fnlo-tk-cppread status: FAIL (exit code {})".format(code)) logger.warning('This should not happen. So anyway skipping this subresult!')
files = glob.glob(filestub)
if files:
logger.warning('Moving files of failed subtask with seed %s out of the way!', self.branch_data['seed'])
logger.debug('Creating job fail directory if not yet existing: %s', jobfailpath)
pathlib.Path(jobfailpath).mkdir(parents=True, exist_ok=True)
for file in files:
filename = os.path.basename(file)
destfile = os.path.join(jobfailpath,filename)
if os.path.isfile(destfile):
logger.debug('Removing already existing file %s of failed job subtask ...', destfile)
os.remove(destfile)
logger.debug('Moving file %s of failed job subtask to folder %s ...', file, jobfailpath)
shutil.move(file, jobfailpath)
if nres == 0:
message = "Not even a single subtask of seed {} for branch {} could be completed without error. Counted as FAIL!".format(self.branch_data['seed'],self.branch)
raise RuntimeError(message)
# on error, make log entry and reraise # on error, make log entry and reraise
except Exception as e: except Exception as e:
job_ok = False job_ok = False
logger.error('%s raised during job execution: %s', e.__class__.__name__, e) logger.error('%s raised during job execution: %s', e.__class__.__name__, e)
files = glob.glob(jobfailglob)
if files:
logger.error('Moving all files of failed job with seed %s out of the way!', self.branch_data['seed'])
logger.debug('Creating job fail directory if not yet existing: %s', jobfailpath)
pathlib.Path(jobfailpath).mkdir(parents=True, exist_ok=True)
for file in files:
filename = os.path.basename(file)
destfile = os.path.join(jobfailpath,filename)
if os.path.isfile(destfile):
logger.debug('Removing already existing file %s of failed job ...', destfile)
os.remove(destfile)
logger.debug('Moving file %s of failed job to folder %s ...', file, jobfailpath)
shutil.move(file, jobfailpath)
else:
logger.error('No files found for job with seed %s.', self.branch_data['seed'])
raise raise
finally: finally:
# if no subtask failed, write tag file to mark task complete # if at least one subtask succeeded, write tag file to mark task complete
if job_ok: if job_ok:
self.output().touch() self.output().touch()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment