Skip to content
Snippets Groups Projects
Commit 7a343ece authored by Jonas Eppelt's avatar Jonas Eppelt
Browse files

format

parent 53172f9c
No related branches found
No related tags found
1 merge request!9Toml
from dataclasses import dataclass, field from dataclasses import dataclass, field
from typing import Any, List, Optional, Union from typing import Any, List, Optional, Union
# import yaml # import yaml
import json import json
import os import os
from pathlib import Path from pathlib import Path
json_default_path = os.environ["BEAMBACKGROUND_JSON"] if "BEAMBACKGROUND_JSON" in os.environ.keys() else "beambackgrounds.json" json_default_path = (
os.environ["BEAMBACKGROUND_JSON"]
if "BEAMBACKGROUND_JSON" in os.environ.keys()
else "beambackgrounds.json"
)
@dataclass @dataclass
class BeambackgroundMetadata: class BeambackgroundMetadata:
''' """
class to store metadata of a beambackground and provide functions for convenient access class to store metadata of a beambackground and provide functions for convenient access
@param key: key name under which it is stored in the json file @param key: key name under which it is stored in the json file
@param type: type of the beam background. Possible values are 'run-dependent' and 'run-independent' @param type: type of the beam background. Possible values are 'run-dependent' and 'run-independent'
...@@ -24,12 +30,33 @@ class BeambackgroundMetadata: ...@@ -24,12 +30,33 @@ class BeambackgroundMetadata:
@param bucket: rundependent only!, bucket number of the beam background files for use in globaltags @param bucket: rundependent only!, bucket number of the beam background files for use in globaltags
@param gridka_server: rundependent only!, server to use for the gridka paths @param gridka_server: rundependent only!, server to use for the gridka paths
@param global_tags: A list of global Tags associated with this beam background @param global_tags: A list of global Tags associated with this beam background
''' """
key: str key: str
type:str= field(metadata={"validate": lambda t: t in ["run-dependent", "run-independent"]}) type: str = field(
kek_path: Union[List[str], str] = field(metadata={"validate": lambda p: isinstance(p, str) or isinstance(p, list) or (p is None)}) metadata={"validate": lambda t: t in ["run-dependent", "run-independent"]}
gridka_path: Union[List[str], str] = field(metadata={"validate": lambda p: isinstance(p, str) or isinstance(p, list) or (p is None)}) )
ceph_path: Union[List[str], str] = field(metadata={"validate": lambda p: isinstance(p, str) or isinstance(p, list) or (p is None)}) kek_path: Union[List[str], str] = field(
metadata={
"validate": lambda p: isinstance(p, str)
or isinstance(p, list)
or (p is None)
}
)
gridka_path: Union[List[str], str] = field(
metadata={
"validate": lambda p: isinstance(p, str)
or isinstance(p, list)
or (p is None)
}
)
ceph_path: Union[List[str], str] = field(
metadata={
"validate": lambda p: isinstance(p, str)
or isinstance(p, list)
or (p is None)
}
)
luminosity: float = 0 luminosity: float = 0
peak_luminosity: float = 0 peak_luminosity: float = 0
n_events: int = 0 n_events: int = 0
...@@ -46,92 +73,123 @@ class BeambackgroundMetadata: ...@@ -46,92 +73,123 @@ class BeambackgroundMetadata:
@property @property
def expList(self) -> List[int]: def expList(self) -> List[int]:
''' """
convenience function to get the experiment numbers for the beambackgrounds convenience function to get the experiment numbers for the beambackgrounds
''' """
return [self.experiment] return [self.experiment]
def __post_init__(self): def __post_init__(self):
''' """
after the init, turn all 'None' strings into None after the init, turn all 'None' strings into None
''' """
for field_name in self.__dataclass_fields__: for field_name in self.__dataclass_fields__:
field_value = getattr(self, field_name) field_value = getattr(self, field_name)
if field_value == 'None': if field_value == "None":
setattr(self, field_name, None) setattr(self, field_name, None)
@property @property
def on_kek(self) -> bool: def on_kek(self) -> bool:
return "kek.jp" in os.environ['HOSTNAME'] return "kek.jp" in os.environ["HOSTNAME"]
@property @property
def kek_file_list(self) -> List[str]: def kek_file_list(self) -> List[str]:
''' """
function to get the file list for the kek storage function to get the file list for the kek storage
''' """
assert self.on_kek, "Executions does not seem to be on KEK. Kek paths are not accessible." assert (
self.on_kek
), "Executions does not seem to be on KEK. Kek paths are not accessible."
if isinstance(self.kek_path, str): if isinstance(self.kek_path, str):
assert (os.path.exists(self.kek_path) and os.path.isdir(self.kek_path)), f"The path '{self.kek_path}' does not exist or is not a directory." assert os.path.exists(self.kek_path) and os.path.isdir(
return [f"{self.kek_path}/{filename}" for filename in os.listdir(self.kek_path)] self.kek_path
), f"The path '{self.kek_path}' does not exist or is not a directory."
return [
f"{self.kek_path}/{filename}" for filename in os.listdir(self.kek_path)
]
else: else:
return self.kek_path return self.kek_path
@property @property
def on_etp(self) -> bool: def on_etp(self) -> bool:
return f"/home/{os.environ['USER']}" == os.environ['HOME'] #probably not the best way to check this return (
f"/home/{os.environ['USER']}" == os.environ["HOME"]
) # probably not the best way to check this
@property @property
def ceph_file_list(self) -> List[str]: def ceph_file_list(self) -> List[str]:
''' """
function to get the file list for the ceph storage function to get the file list for the ceph storage
''' """
assert self.on_etp, "Executions does not seem to be on ETP. Ceph paths are not accessible." assert (
self.on_etp
), "Executions does not seem to be on ETP. Ceph paths are not accessible."
if isinstance(self.ceph_path, str): if isinstance(self.ceph_path, str):
assert (os.path.exists(self.ceph_path) and os.path.isdir(self.ceph_path)), f"The path '{self.ceph_path}' does not exist or is not a directory." assert os.path.exists(self.ceph_path) and os.path.isdir(
return [f"{self.ceph_path}/{filename}" for filename in os.listdir(self.ceph_path)] self.ceph_path
), f"The path '{self.ceph_path}' does not exist or is not a directory."
return [
f"{self.ceph_path}/{filename}"
for filename in os.listdir(self.ceph_path)
]
else: else:
return self.ceph_path return self.ceph_path
@property @property
def has_certificate(self) -> bool: def has_certificate(self) -> bool:
return "X509_USER_PROXY" in os.environ.keys() #TODO: Implement check if certificate is available return (
"X509_USER_PROXY" in os.environ.keys()
) # TODO: Implement check if certificate is available
@property @property
def gridka_file_list(self) -> List[str]: def gridka_file_list(self) -> List[str]:
''' """
function to get the file list for the gridka storage function to get the file list for the gridka storage
''' """
assert self.has_certificate, "No certificate found. Please set X509_USER_PROXY." assert self.has_certificate, "No certificate found. Please set X509_USER_PROXY."
assert self.gridka_path is not None, "gridka_paths is None. Please set the paths." assert (
self.gridka_path is not None
), "gridka_paths is None. Please set the paths."
if isinstance(self.gridka_path, str): if isinstance(self.gridka_path, str):
from XRootD import client from XRootD import client
from XRootD.client.flags import DirListFlags from XRootD.client.flags import DirListFlags
client = client.FileSystem(self.gridka_server) client = client.FileSystem(self.gridka_server)
file_list = [] file_list = []
status, listing = client.dirlist(self.gridka_path, DirListFlags.STAT, timeout=10) status, listing = client.dirlist(
self.gridka_path, DirListFlags.STAT, timeout=10
)
if not status.ok: if not status.ok:
raise RuntimeError(status.message) raise RuntimeError(status.message)
if listing is None: if listing is None:
print("Warning: No files found in directory.") # TODO use logger or turn to assert? print(
"Warning: No files found in directory."
) # TODO use logger or turn to assert?
return file_list return file_list
for entry in listing: for entry in listing:
if entry.name.endswith(".root") and not entry.name == "": if entry.name.endswith(".root") and not entry.name == "":
file_list.append(f"{self.gridka_server}/{self.gridka_path}/{entry.name}") file_list.append(
f"{self.gridka_server}/{self.gridka_path}/{entry.name}"
)
return file_list return file_list
else: else:
return self.gridka_path return self.gridka_path
def download_from_gridka(self, target_path: str) -> None: def download_from_gridka(self, target_path: str) -> None:
''' """
function to download the files from Gridka to a target path using XRootD python bindings function to download the files from Gridka to a target path using XRootD python bindings
@param target_path: path to the target directory @param target_path: path to the target directory
''' """
assert self.has_certificate, "No certificate found. Please set X509_USER_PROXY." assert self.has_certificate, "No certificate found. Please set X509_USER_PROXY."
assert self.gridka_path is not None, "gridka_paths is None. Please set the paths." assert (
self.gridka_path is not None
), "gridka_paths is None. Please set the paths."
from XRootD import client from XRootD import client
client = client.FileSystem(self.gridka_server) client = client.FileSystem(self.gridka_server)
for file in self.gridka_file_list: for file in self.gridka_file_list:
status, _ = client.copy(f"{self.gridka_server}/{file}", f"{target_path}/{Path(file).name}") status, _ = client.copy(
f"{self.gridka_server}/{file}", f"{target_path}/{Path(file).name}"
)
if not status.ok: if not status.ok:
if "File already exists" in status.message: if "File already exists" in status.message:
print(f"Warning: File '{file}' already exists localy. Skipping.") print(f"Warning: File '{file}' already exists localy. Skipping.")
...@@ -139,61 +197,74 @@ class BeambackgroundMetadata: ...@@ -139,61 +197,74 @@ class BeambackgroundMetadata:
raise RuntimeError(status.message) raise RuntimeError(status.message)
def update_json_entry(self, json_data: str = json_default_path) -> str: def update_json_entry(self, json_data: str = json_default_path) -> str:
''' """
function to update the json entry for the background and save it to the json file function to update the json entry for the background and save it to the json file
@param json_data: path to the json file, default is json_default_path @param json_data: path to the json file, default is json_default_path
''' """
# Load the json data # Load the json data
with open(json_data, 'r') as file: with open(json_data, "r") as file:
data = json.load(file) data = json.load(file)
if self.key not in data.keys(): if self.key not in data.keys():
data[self.key] = {} data[self.key] = {}
# Update the specific key's values # Update the specific key's values
data[self.key]['type'] = self.type data[self.key]["type"] = self.type
data[self.key]['kek_path'] = self.kek_path data[self.key]["kek_path"] = self.kek_path
data[self.key]['gridka_path'] = self.gridka_path data[self.key]["gridka_path"] = self.gridka_path
data[self.key]['ceph_path'] = self.ceph_path data[self.key]["ceph_path"] = self.ceph_path
data[self.key]['luminostiy'] = self.luminosity data[self.key]["luminostiy"] = self.luminosity
data[self.key]['n_events'] = self.n_events data[self.key]["n_events"] = self.n_events
data[self.key]['experiment'] = self.experiment data[self.key]["experiment"] = self.experiment
data[self.key]['run'] = self.run data[self.key]["run"] = self.run
data[self.key]['bucket'] = self.bucket data[self.key]["bucket"] = self.bucket
data[self.key]['date'] = self.date data[self.key]["date"] = self.date
data[self.key]['beam_energy'] = self.beam_energy data[self.key]["beam_energy"] = self.beam_energy
data[self.key]['qualitiy'] = self.qualitiy data[self.key]["qualitiy"] = self.qualitiy
data[self.key]['gridka_server'] = self.gridka_server data[self.key]["gridka_server"] = self.gridka_server
data[self.key]['global_tags'] = self.global_tags data[self.key]["global_tags"] = self.global_tags
with open(json_data, 'w') as file: with open(json_data, "w") as file:
# Dump the updated data # Dump the updated data
json.dump(data, file) json.dump(data, file)
def send_from_kek_to_ceph(self, ceph_base_path: str, username: str): def send_from_kek_to_ceph(self, ceph_base_path: str, username: str):
''' """
function to send the files from KEK to Ceph at ETP using scp and the portal1 as entrypoint function to send the files from KEK to Ceph at ETP using scp and the portal1 as entrypoint
@param ceph_base_path: base path on Ceph to copy to. @param ceph_base_path: base path on Ceph to copy to.
@param username: username at ETP to be used for scp @param username: username at ETP to be used for scp
''' """
assert self.on_kek, "Executions does not seem to be on KEK. Kek paths are not accessible." assert (
self.on_kek
), "Executions does not seem to be on KEK. Kek paths are not accessible."
assert self.kek_path is not None, "kek_paths is None. Please set the paths." assert self.kek_path is not None, "kek_paths is None. Please set the paths."
for file in self.kek_file_list: for file in self.kek_file_list:
os.system(f"scp {file} {username}@portal1.etp.kit.edu:{ceph_base_path}/{Path(file).name}") os.system(
f"scp {file} {username}@portal1.etp.kit.edu:{ceph_base_path}/{Path(file).name}"
)
self.ceph_path = ceph_base_path self.ceph_path = ceph_base_path
def send_from_kek_to_gridka(self, gridka_base_path: str): def send_from_kek_to_gridka(self, gridka_base_path: str):
''' """
function to send the files from KEK to Gridka using XRootD python bindings function to send the files from KEK to Gridka using XRootD python bindings
@param gridka_base_path: base path on Gridka to copy to. @param gridka_base_path: base path on Gridka to copy to.
@param username: username at ETP to be used for scp @param username: username at ETP to be used for scp
''' """
assert self.on_kek, "Executions does not seem to be on KEK. Kek paths are not accessible." assert (
self.on_kek
), "Executions does not seem to be on KEK. Kek paths are not accessible."
assert self.kek_path is not None, "kek_paths is None. Please set the paths." assert self.kek_path is not None, "kek_paths is None. Please set the paths."
assert (os.path.exists(self.kek_path) and os.path.isdir(self.kek_path)), f"The path '{self.kek_path}' does not exist or is not a directory." assert os.path.exists(self.kek_path) and os.path.isdir(
self.kek_path
), f"The path '{self.kek_path}' does not exist or is not a directory."
assert self.has_certificate, "No certificate found. Please set X509_USER_PROXY." assert self.has_certificate, "No certificate found. Please set X509_USER_PROXY."
from XRootD import client from XRootD import client
client = client.FileSystem(self.gridka_server) client = client.FileSystem(self.gridka_server)
for file in self.kek_file_list: for file in self.kek_file_list:
status, _ = client.copy(f"file://{file}", f"{self.gridka_server}/{gridka_base_path}/{Path(file).name}")
status, _ = client.copy(
f"file://{file}",
f"{self.gridka_server}/{gridka_base_path}/{Path(file).name}",
)
if not status.ok: if not status.ok:
if "File already exists" in status.message: if "File already exists" in status.message:
print(f"Warning: File '{file}' already exists on Gridka. Skipping.") print(f"Warning: File '{file}' already exists on Gridka. Skipping.")
...@@ -201,19 +272,25 @@ class BeambackgroundMetadata: ...@@ -201,19 +272,25 @@ class BeambackgroundMetadata:
raise RuntimeError(status.message) raise RuntimeError(status.message)
def send_from_ceph_to_gridka(self, gridka_base_path: str, username: str): def send_from_ceph_to_gridka(self, gridka_base_path: str, username: str):
''' """
function to send the files from Ceph to Gridka using XRootD python bindings function to send the files from Ceph to Gridka using XRootD python bindings
@param gridka_base_path: base path on Gridka to copy to. @param gridka_base_path: base path on Gridka to copy to.
@param username: username at ETP to be used for scp @param username: username at ETP to be used for scp
''' """
assert self.on_etp, "Executions does not seem to be on ETP. Ceph paths are not accessible." assert (
self.on_etp
), "Executions does not seem to be on ETP. Ceph paths are not accessible."
assert self.ceph_path is not None, "ceph_paths is None. Please set the paths." assert self.ceph_path is not None, "ceph_paths is None. Please set the paths."
# assert (os.path.exists(self.ceph_path) and os.path.isdir(self.ceph_path)), f"The path '{self.ceph_path}' does not exist or is not a directory." # assert (os.path.exists(self.ceph_path) and os.path.isdir(self.ceph_path)), f"The path '{self.ceph_path}' does not exist or is not a directory."
assert self.has_certificate, "No certificate found. Please set X509_USER_PROXY." assert self.has_certificate, "No certificate found. Please set X509_USER_PROXY."
from XRootD import client from XRootD import client
client = client.FileSystem(self.gridka_server) client = client.FileSystem(self.gridka_server)
for file in self.ceph_file_list: for file in self.ceph_file_list:
status, _ = client.copy(f"file://{file}", f"{self.gridka_server}/{gridka_base_path}/{Path(file).name}") status, _ = client.copy(
f"file://{file}",
f"{self.gridka_server}/{gridka_base_path}/{Path(file).name}",
)
if not status.ok: if not status.ok:
if "File already exists" in status.message: if "File already exists" in status.message:
print(f"Warning: File '{file}' already exists on Gridka. Skipping.") print(f"Warning: File '{file}' already exists on Gridka. Skipping.")
...@@ -221,10 +298,10 @@ class BeambackgroundMetadata: ...@@ -221,10 +298,10 @@ class BeambackgroundMetadata:
raise RuntimeError(status.message) raise RuntimeError(status.message)
def get_file_list(self, key=None) -> List[str]: def get_file_list(self, key=None) -> List[str]:
''' """
function to get the file list for a specific storage. By default checks on which system it is and uses the appropriate paths. function to get the file list for a specific storage. By default checks on which system it is and uses the appropriate paths.
@param key: key for the storage. Possible values are 'kek', 'gridka' and 'ceph', default is 'gridka' @param key: key for the storage. Possible values are 'kek', 'gridka' and 'ceph', default is 'gridka'
''' """
if key == "kek": if key == "kek":
return self.kek_file_list return self.kek_file_list
elif key == "gridka": elif key == "gridka":
...@@ -243,29 +320,40 @@ class BeambackgroundMetadata: ...@@ -243,29 +320,40 @@ class BeambackgroundMetadata:
else: else:
raise RuntimeError(f"Key '{key}' not known. Use 'kek', 'gridka' or 'ceph'.") raise RuntimeError(f"Key '{key}' not known. Use 'kek', 'gridka' or 'ceph'.")
def get_to_ceph_from_kek(self, host: str = "kekcc"):
"""
function to get the files from KEK to Ceph at ETP using rsync.
"""
assert (
self.on_etp
), "Executions does not seem to be on ETP. Ceph paths are not accessible."
assert self.kek_path is not None, "kek_paths is None. Please set the paths."
os.system(f"rsync -acz -r --mkpath {host}:{self.kek_path} {self.ceph_path}")
def get_beam_background_by_key(key: str, json_data: str = json_default_path): def get_beam_background_by_key(key: str, json_data: str = json_default_path):
''' """
convenience function to search within a json file for a specific key and return the corresponding BackgroundData object convenience function to search within a json file for a specific key and return the corresponding BackgroundData object
@param key: key name under which it is stored in the json file @param key: key name under which it is stored in the json file
@param json_data: path to the json file, default is json_default_path @param json_data: path to the json file, default is json_default_path
''' """
with open(json_data, 'r') as file: with open(json_data, "r") as file:
backgrounds_data = json.load(file) backgrounds_data = json.load(file)
return BeambackgroundMetadata( return BeambackgroundMetadata(
key=key, key=key,
type=backgrounds_data[key]['type'], type=backgrounds_data[key]["type"],
kek_path=backgrounds_data[key]['kek_path'], kek_path=backgrounds_data[key]["kek_path"],
gridka_path=backgrounds_data[key]['gridka_path'], gridka_path=backgrounds_data[key]["gridka_path"],
ceph_path=backgrounds_data[key]['ceph_path'], ceph_path=backgrounds_data[key]["ceph_path"],
luminosity=backgrounds_data[key]['luminostiy'], luminosity=backgrounds_data[key]["luminostiy"],
n_events=backgrounds_data[key]['n_events'], n_events=backgrounds_data[key]["n_events"],
experiment=backgrounds_data[key]['experiment'], experiment=backgrounds_data[key]["experiment"],
date=backgrounds_data[key]['date'], date=backgrounds_data[key]["date"],
beam_energy=backgrounds_data[key]['beam_energy'], beam_energy=backgrounds_data[key]["beam_energy"],
qualitiy=backgrounds_data[key]['qualitiy'], qualitiy=backgrounds_data[key]["qualitiy"],
global_tags=backgrounds_data[key]['global_tags'], global_tags=backgrounds_data[key]["global_tags"],
gridka_server=backgrounds_data[key]['gridka_server'], gridka_server=backgrounds_data[key]["gridka_server"],
run=backgrounds_data[key]['run'], run=backgrounds_data[key]["run"],
bucket=backgrounds_data[key]['bucket'], bucket=backgrounds_data[key]["bucket"],
) )
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment