diff --git a/bb2etp/core.py b/bb2etp/core.py index d54f0e9e37521016a6a20da65aaccd263b8d0e16..928195403a42d1ef8268f792bba5ac4ec3a9619b 100644 --- a/bb2etp/core.py +++ b/bb2etp/core.py @@ -1,15 +1,21 @@ from dataclasses import dataclass, field from typing import Any, List, Optional, Union -#import yaml + +# import yaml import json import os from pathlib import Path -json_default_path = os.environ["BEAMBACKGROUND_JSON"] if "BEAMBACKGROUND_JSON" in os.environ.keys() else "beambackgrounds.json" +json_default_path = ( + os.environ["BEAMBACKGROUND_JSON"] + if "BEAMBACKGROUND_JSON" in os.environ.keys() + else "beambackgrounds.json" +) + @dataclass class BeambackgroundMetadata: - ''' + """ class to store metadata of a beambackground and provide functions for convenient access @param key: key name under which it is stored in the json file @param type: type of the beam background. Possible values are 'run-dependent' and 'run-independent' @@ -24,12 +30,33 @@ class BeambackgroundMetadata: @param bucket: rundependent only!, bucket number of the beam background files for use in globaltags @param gridka_server: rundependent only!, server to use for the gridka paths @param global_tags: A list of global Tags associated with this beam background - ''' + """ + key: str - type:str= field(metadata={"validate": lambda t: t in ["run-dependent", "run-independent"]}) - kek_path: Union[List[str], str] = field(metadata={"validate": lambda p: isinstance(p, str) or isinstance(p, list) or (p is None)}) - gridka_path: Union[List[str], str] = field(metadata={"validate": lambda p: isinstance(p, str) or isinstance(p, list) or (p is None)}) - ceph_path: Union[List[str], str] = field(metadata={"validate": lambda p: isinstance(p, str) or isinstance(p, list) or (p is None)}) + type: str = field( + metadata={"validate": lambda t: t in ["run-dependent", "run-independent"]} + ) + kek_path: Union[List[str], str] = field( + metadata={ + "validate": lambda p: isinstance(p, str) + or isinstance(p, list) + or (p is None) + } + ) + gridka_path: Union[List[str], str] = field( + metadata={ + "validate": lambda p: isinstance(p, str) + or isinstance(p, list) + or (p is None) + } + ) + ceph_path: Union[List[str], str] = field( + metadata={ + "validate": lambda p: isinstance(p, str) + or isinstance(p, list) + or (p is None) + } + ) luminosity: float = 0 peak_luminosity: float = 0 n_events: int = 0 @@ -46,92 +73,123 @@ class BeambackgroundMetadata: @property def expList(self) -> List[int]: - ''' + """ convenience function to get the experiment numbers for the beambackgrounds - ''' + """ return [self.experiment] def __post_init__(self): - ''' + """ after the init, turn all 'None' strings into None - ''' + """ for field_name in self.__dataclass_fields__: field_value = getattr(self, field_name) - if field_value == 'None': + if field_value == "None": setattr(self, field_name, None) @property def on_kek(self) -> bool: - return "kek.jp" in os.environ['HOSTNAME'] - + return "kek.jp" in os.environ["HOSTNAME"] + @property def kek_file_list(self) -> List[str]: - ''' + """ function to get the file list for the kek storage - ''' - assert self.on_kek, "Executions does not seem to be on KEK. Kek paths are not accessible." + """ + assert ( + self.on_kek + ), "Executions does not seem to be on KEK. Kek paths are not accessible." if isinstance(self.kek_path, str): - assert (os.path.exists(self.kek_path) and os.path.isdir(self.kek_path)), f"The path '{self.kek_path}' does not exist or is not a directory." - return [f"{self.kek_path}/{filename}" for filename in os.listdir(self.kek_path)] + assert os.path.exists(self.kek_path) and os.path.isdir( + self.kek_path + ), f"The path '{self.kek_path}' does not exist or is not a directory." + return [ + f"{self.kek_path}/{filename}" for filename in os.listdir(self.kek_path) + ] else: return self.kek_path - + @property def on_etp(self) -> bool: - return f"/home/{os.environ['USER']}" == os.environ['HOME'] #probably not the best way to check this + return ( + f"/home/{os.environ['USER']}" == os.environ["HOME"] + ) # probably not the best way to check this @property def ceph_file_list(self) -> List[str]: - ''' + """ function to get the file list for the ceph storage - ''' - assert self.on_etp, "Executions does not seem to be on ETP. Ceph paths are not accessible." + """ + assert ( + self.on_etp + ), "Executions does not seem to be on ETP. Ceph paths are not accessible." if isinstance(self.ceph_path, str): - assert (os.path.exists(self.ceph_path) and os.path.isdir(self.ceph_path)), f"The path '{self.ceph_path}' does not exist or is not a directory." - return [f"{self.ceph_path}/{filename}" for filename in os.listdir(self.ceph_path)] + assert os.path.exists(self.ceph_path) and os.path.isdir( + self.ceph_path + ), f"The path '{self.ceph_path}' does not exist or is not a directory." + return [ + f"{self.ceph_path}/{filename}" + for filename in os.listdir(self.ceph_path) + ] else: return self.ceph_path - + @property def has_certificate(self) -> bool: - return "X509_USER_PROXY" in os.environ.keys() #TODO: Implement check if certificate is available + return ( + "X509_USER_PROXY" in os.environ.keys() + ) # TODO: Implement check if certificate is available @property def gridka_file_list(self) -> List[str]: - ''' + """ function to get the file list for the gridka storage - ''' + """ assert self.has_certificate, "No certificate found. Please set X509_USER_PROXY." - assert self.gridka_path is not None, "gridka_paths is None. Please set the paths." + assert ( + self.gridka_path is not None + ), "gridka_paths is None. Please set the paths." if isinstance(self.gridka_path, str): from XRootD import client from XRootD.client.flags import DirListFlags + client = client.FileSystem(self.gridka_server) file_list = [] - status, listing = client.dirlist(self.gridka_path, DirListFlags.STAT, timeout=10) + status, listing = client.dirlist( + self.gridka_path, DirListFlags.STAT, timeout=10 + ) if not status.ok: raise RuntimeError(status.message) if listing is None: - print("Warning: No files found in directory.") # TODO use logger or turn to assert? + print( + "Warning: No files found in directory." + ) # TODO use logger or turn to assert? return file_list for entry in listing: if entry.name.endswith(".root") and not entry.name == "": - file_list.append(f"{self.gridka_server}/{self.gridka_path}/{entry.name}") + file_list.append( + f"{self.gridka_server}/{self.gridka_path}/{entry.name}" + ) return file_list else: return self.gridka_path def download_from_gridka(self, target_path: str) -> None: - ''' + """ function to download the files from Gridka to a target path using XRootD python bindings @param target_path: path to the target directory - ''' + """ assert self.has_certificate, "No certificate found. Please set X509_USER_PROXY." - assert self.gridka_path is not None, "gridka_paths is None. Please set the paths." + assert ( + self.gridka_path is not None + ), "gridka_paths is None. Please set the paths." from XRootD import client + client = client.FileSystem(self.gridka_server) for file in self.gridka_file_list: - status, _ = client.copy(f"{self.gridka_server}/{file}", f"{target_path}/{Path(file).name}") + status, _ = client.copy( + f"{self.gridka_server}/{file}", f"{target_path}/{Path(file).name}" + ) if not status.ok: if "File already exists" in status.message: print(f"Warning: File '{file}' already exists localy. Skipping.") @@ -139,61 +197,74 @@ class BeambackgroundMetadata: raise RuntimeError(status.message) def update_json_entry(self, json_data: str = json_default_path) -> str: - ''' + """ function to update the json entry for the background and save it to the json file @param json_data: path to the json file, default is json_default_path - ''' + """ # Load the json data - with open(json_data, 'r') as file: + with open(json_data, "r") as file: data = json.load(file) if self.key not in data.keys(): data[self.key] = {} # Update the specific key's values - data[self.key]['type'] = self.type - data[self.key]['kek_path'] = self.kek_path - data[self.key]['gridka_path'] = self.gridka_path - data[self.key]['ceph_path'] = self.ceph_path - data[self.key]['luminostiy'] = self.luminosity - data[self.key]['n_events'] = self.n_events - data[self.key]['experiment'] = self.experiment - data[self.key]['run'] = self.run - data[self.key]['bucket'] = self.bucket - data[self.key]['date'] = self.date - data[self.key]['beam_energy'] = self.beam_energy - data[self.key]['qualitiy'] = self.qualitiy - data[self.key]['gridka_server'] = self.gridka_server - data[self.key]['global_tags'] = self.global_tags + data[self.key]["type"] = self.type + data[self.key]["kek_path"] = self.kek_path + data[self.key]["gridka_path"] = self.gridka_path + data[self.key]["ceph_path"] = self.ceph_path + data[self.key]["luminostiy"] = self.luminosity + data[self.key]["n_events"] = self.n_events + data[self.key]["experiment"] = self.experiment + data[self.key]["run"] = self.run + data[self.key]["bucket"] = self.bucket + data[self.key]["date"] = self.date + data[self.key]["beam_energy"] = self.beam_energy + data[self.key]["qualitiy"] = self.qualitiy + data[self.key]["gridka_server"] = self.gridka_server + data[self.key]["global_tags"] = self.global_tags - with open(json_data, 'w') as file: + with open(json_data, "w") as file: # Dump the updated data json.dump(data, file) - + def send_from_kek_to_ceph(self, ceph_base_path: str, username: str): - ''' + """ function to send the files from KEK to Ceph at ETP using scp and the portal1 as entrypoint @param ceph_base_path: base path on Ceph to copy to. @param username: username at ETP to be used for scp - ''' - assert self.on_kek, "Executions does not seem to be on KEK. Kek paths are not accessible." + """ + assert ( + self.on_kek + ), "Executions does not seem to be on KEK. Kek paths are not accessible." assert self.kek_path is not None, "kek_paths is None. Please set the paths." for file in self.kek_file_list: - os.system(f"scp {file} {username}@portal1.etp.kit.edu:{ceph_base_path}/{Path(file).name}") + os.system( + f"scp {file} {username}@portal1.etp.kit.edu:{ceph_base_path}/{Path(file).name}" + ) self.ceph_path = ceph_base_path def send_from_kek_to_gridka(self, gridka_base_path: str): - ''' + """ function to send the files from KEK to Gridka using XRootD python bindings @param gridka_base_path: base path on Gridka to copy to. @param username: username at ETP to be used for scp - ''' - assert self.on_kek, "Executions does not seem to be on KEK. Kek paths are not accessible." + """ + assert ( + self.on_kek + ), "Executions does not seem to be on KEK. Kek paths are not accessible." assert self.kek_path is not None, "kek_paths is None. Please set the paths." - assert (os.path.exists(self.kek_path) and os.path.isdir(self.kek_path)), f"The path '{self.kek_path}' does not exist or is not a directory." + assert os.path.exists(self.kek_path) and os.path.isdir( + self.kek_path + ), f"The path '{self.kek_path}' does not exist or is not a directory." assert self.has_certificate, "No certificate found. Please set X509_USER_PROXY." from XRootD import client + client = client.FileSystem(self.gridka_server) for file in self.kek_file_list: - status, _ = client.copy(f"file://{file}", f"{self.gridka_server}/{gridka_base_path}/{Path(file).name}") + + status, _ = client.copy( + f"file://{file}", + f"{self.gridka_server}/{gridka_base_path}/{Path(file).name}", + ) if not status.ok: if "File already exists" in status.message: print(f"Warning: File '{file}' already exists on Gridka. Skipping.") @@ -201,30 +272,36 @@ class BeambackgroundMetadata: raise RuntimeError(status.message) def send_from_ceph_to_gridka(self, gridka_base_path: str, username: str): - ''' + """ function to send the files from Ceph to Gridka using XRootD python bindings @param gridka_base_path: base path on Gridka to copy to. @param username: username at ETP to be used for scp - ''' - assert self.on_etp, "Executions does not seem to be on ETP. Ceph paths are not accessible." + """ + assert ( + self.on_etp + ), "Executions does not seem to be on ETP. Ceph paths are not accessible." assert self.ceph_path is not None, "ceph_paths is None. Please set the paths." - #assert (os.path.exists(self.ceph_path) and os.path.isdir(self.ceph_path)), f"The path '{self.ceph_path}' does not exist or is not a directory." + # assert (os.path.exists(self.ceph_path) and os.path.isdir(self.ceph_path)), f"The path '{self.ceph_path}' does not exist or is not a directory." assert self.has_certificate, "No certificate found. Please set X509_USER_PROXY." from XRootD import client + client = client.FileSystem(self.gridka_server) for file in self.ceph_file_list: - status, _ = client.copy(f"file://{file}", f"{self.gridka_server}/{gridka_base_path}/{Path(file).name}") + status, _ = client.copy( + f"file://{file}", + f"{self.gridka_server}/{gridka_base_path}/{Path(file).name}", + ) if not status.ok: if "File already exists" in status.message: print(f"Warning: File '{file}' already exists on Gridka. Skipping.") else: raise RuntimeError(status.message) - def get_file_list(self, key = None) -> List[str]: - ''' + def get_file_list(self, key=None) -> List[str]: + """ function to get the file list for a specific storage. By default checks on which system it is and uses the appropriate paths. @param key: key for the storage. Possible values are 'kek', 'gridka' and 'ceph', default is 'gridka' - ''' + """ if key == "kek": return self.kek_file_list elif key == "gridka": @@ -242,30 +319,41 @@ class BeambackgroundMetadata: raise RuntimeError("No valid key given and no valid environment found.") else: raise RuntimeError(f"Key '{key}' not known. Use 'kek', 'gridka' or 'ceph'.") - + + def get_to_ceph_from_kek(self, host: str = "kekcc"): + """ + function to get the files from KEK to Ceph at ETP using rsync. + """ + assert ( + self.on_etp + ), "Executions does not seem to be on ETP. Ceph paths are not accessible." + assert self.kek_path is not None, "kek_paths is None. Please set the paths." + os.system(f"rsync -acz -r --mkpath {host}:{self.kek_path} {self.ceph_path}") + + def get_beam_background_by_key(key: str, json_data: str = json_default_path): - ''' + """ convenience function to search within a json file for a specific key and return the corresponding BackgroundData object @param key: key name under which it is stored in the json file @param json_data: path to the json file, default is json_default_path - ''' - with open(json_data, 'r') as file: + """ + with open(json_data, "r") as file: backgrounds_data = json.load(file) - + return BeambackgroundMetadata( key=key, - type=backgrounds_data[key]['type'], - kek_path=backgrounds_data[key]['kek_path'], - gridka_path=backgrounds_data[key]['gridka_path'], - ceph_path=backgrounds_data[key]['ceph_path'], - luminosity=backgrounds_data[key]['luminostiy'], - n_events=backgrounds_data[key]['n_events'], - experiment=backgrounds_data[key]['experiment'], - date=backgrounds_data[key]['date'], - beam_energy=backgrounds_data[key]['beam_energy'], - qualitiy=backgrounds_data[key]['qualitiy'], - global_tags=backgrounds_data[key]['global_tags'], - gridka_server=backgrounds_data[key]['gridka_server'], - run=backgrounds_data[key]['run'], - bucket=backgrounds_data[key]['bucket'], + type=backgrounds_data[key]["type"], + kek_path=backgrounds_data[key]["kek_path"], + gridka_path=backgrounds_data[key]["gridka_path"], + ceph_path=backgrounds_data[key]["ceph_path"], + luminosity=backgrounds_data[key]["luminostiy"], + n_events=backgrounds_data[key]["n_events"], + experiment=backgrounds_data[key]["experiment"], + date=backgrounds_data[key]["date"], + beam_energy=backgrounds_data[key]["beam_energy"], + qualitiy=backgrounds_data[key]["qualitiy"], + global_tags=backgrounds_data[key]["global_tags"], + gridka_server=backgrounds_data[key]["gridka_server"], + run=backgrounds_data[key]["run"], + bucket=backgrounds_data[key]["bucket"], )