Skip to content
Snippets Groups Projects
Commit 8f77f9c0 authored by Thorsten Chwalek's avatar Thorsten Chwalek
Browse files

Upload New File

parent 74d4b51b
Branches
No related tags found
No related merge requests found
"""
This file contains utilities used in the TP1 exercises.
"""
import numpy.typing as npt
from typing import Union, Any, List, Tuple
from itertools import product, cycle
import numpy as np
import pandas as pd
import vector as vec
import matplotlib.pyplot as plt
### Class to access particle four vectors
@pd.api.extensions.register_dataframe_accessor("v4")
class FourVecAccessor(object):
def __init__(self, pandas_obj):
# to distinguish between multiple particles or single particle
# we only need to save the column information,
self._obj_columns = pandas_obj.columns
# to keep data consistent when appending columns unsing this accessor save indices to use them in returns
self._obj_indices = pandas_obj.index
# get the correct index level, 0 for single particle, 1 for multiple
_vars = self._obj_columns.get_level_values(self._obj_columns.nlevels - 1)
if 'M' in _vars and "px" in _vars:
kin_vars = ["M", "px", "py", "pz"]
elif 'mass' in _vars and 'px' in _vars:
kin_vars = ["mass", "px", "py", "pz"]
elif 'mass' in _vars and 'pt' in _vars:
kin_vars = ["mass", "pt", "phi", "eta"]
elif 'E' in _vars and "px" in _vars:
kin_vars = ["E", "px", "py", "pz"]
elif 'E' in _vars and 'pt' in _vars:
kin_vars = ["E", "pt", "phi", "eta"]
else:
raise KeyError("No matching structure implemented for interpreting the data as a four "
"momentum!")
# the following lines are where the magic happens
# no multi-index, just on particle
if self._obj_columns.nlevels == 1:
# get the dtypes for the kinetic variables
dtypes = pandas_obj.dtypes
kin_view = list(map(lambda x: (x, dtypes[x]), kin_vars))
# get the kinetic variables from the dataframe and convert it to a numpy array.
# require it to be C_CONTIGUOUS, vector uses C-Style
# This array can then be viewed as a vector object.
# Every property not given is calculated on the fly by the vector object.
# E.g. the mass is not stored but calculated when the energy is given and vice versa.
self._v4 = np.require(pandas_obj[kin_vars].to_numpy(), requirements='C').view(
kin_view).view(vec.MomentumNumpy4D)
# multi-index, e.g. getting the four momentum for multiple particles
elif self._obj_columns.nlevels == 2:
# get the dtypes for the kinetic variables
# assume the same dtypes for the other particles
dtypes = pandas_obj[self._obj_columns.get_level_values(0).unique()[0]].dtypes
kin_view = list(map(lambda x: (x, dtypes[x]), kin_vars))
self._v4 = np.require(pandas_obj.loc[:, (self._obj_columns.get_level_values(0).unique(),
kin_vars)].to_numpy(),
requirements='C').view(kin_view).view(vec.MomentumNumpy4D)
else:
raise IndexError("Expected a dataframe with a maximum of two multi-index levels.")
def __getattribute__(self, item):
"""
Attributes of this accessor are forwarded to the four vector.
Returns either a pandas dataframe, if we have multiple particles
or a pandas Series for a single particle.
"""
try:
return object.__getattribute__(self, item)
except AttributeError:
try:
return pd.DataFrame(self._v4.__getattribute__(item),
columns=pd.MultiIndex.from_product(
[self._obj_columns.unique(0), [item]]),
index=self._obj_indices)
except ValueError:
try:
return pd.Series(self._v4.__getattribute__(item).flatten(), name=item, index=self._obj_indices)
except AttributeError as e:
if "'function' object has no attribute 'flatten'" in str(e):
raise AttributeError(
"Functions of the four vectors can NOT be called directly via the "
"accessor. Use the vector property instead! "
"Usage: 'df['particle'].v4.vector.desired_function()'")
raise e
@property
def vector(self):
"""The four vector object itself. It's required when using methods like boosting."""
if self._obj_columns.nlevels == 1:
return self._v4[:, 0]
else:
return self._v4
### Function to sort elements of an event
def get_sorted_df_by_low_level_criteria(df, criteria, axis=1):
"""
Sorts in ascending order the top levels of a given pd.DataFrame with two levels columns by given criteria
that has the form of a pandas DataFrame containing low level quantity, i.e. df.leptons.v4.pt
@param df: pd.DataFrame containing the top level objects that will be sorted
@param criteria: pd.DataFrame containing the sorting criteria values that are passed to np.argsort
@param axis: 0 or 1; row wise sort: axis=1; column wise sort: axis=0. Make sure to pass the
right sorted DataFrame (axis option)
@return: pd.DataFrame
"""
N, n, v = *df.shape, df[df.columns.get_level_values(0)[0]].shape[1]
idx = np.argsort(criteria, axis=axis)
sorted_data = np.take_along_axis(df.to_numpy().reshape((N, int(n / v), v)),
idx[:, :, None], axis).reshape((N, n))
return pd.DataFrame(data=sorted_data, columns=df.columns)
### Proposal for a plotting function of physics object quantities in a grid plot given one or more MultiIndex pd.DataFrames
### with stack option
def _object_quantity_grid_plot(df, physics_object, object_quantity, label=None, unit=None, hist_range=None, stack=None):
"""
Plot the distributions of given quantities of given physics objects that are present in given DataFrame(s). For
two or more DataFrames, the plotting is done object/quantity wise in the same subplot.
:param df: pd.DataFrame or list of pd.DataFrames
:param physics_object: str or list of str of physics objects, i. e. "lepton_0" or ["jet_0", "jet_1"]
:param object_quantity: str or list of str, i. e. "pt" or ["pt", "eta", "phi"]
:param label: str or list of str with the same length as df if df is a list
:param unit: str or list of str with the same length as object_quantity for x-axis label
:param hist_range: None or (float, float) or list of those with same length as object_quantity or physics_object or the same
length as all created subplots:
a) physics_object = ["jet_0", "jet_1", "jet_2"], object_quantity = ["pt"],
hist_range = [(0, 100), (0, 50), (0, 25)]
=> creates subplots of (jet_0, pt), (jet_1, pt) and (jet_2, pt) in (0, 100), (0, 50) and (0, 25)
intervals.
b) physics_object = ["jet_0", "jet_1", "jet_2"], object_quantity = ["pt", "eta"],
hist_range = [(0, 100), (-4, 4)]
=> creates subplots where pt of all subplots ranges are set to (0, 100) and eta ranges to (-4, 4)
c) physics_object = ["jet_0", "jet_1", "jet_2"], object_quantity = ["pt", "eta"],
hist_range = [(0, 100), (-4, 4), None, (-4, 4), None, (-4, 4)]
=> creates six subplots where all eta ranges are set to (-4, 4) and (jet_0, pt) range is set
to (0, 100). All other ranges are set by default.
:param stack: str; "df" or "physics_object". In case of "physics_object" only one df should be passed
a) df=df1, physics_object=["QCD", "LM6", "LM9"], object_quantity=["H_t, N_jets"], stack="physics_object"
=> creates two subplots where "H_t" and "N_jets" of provided physics_object is stacked
b) df=[df1, df2], physics_object=["jet_1", "jet_2"], object_quantity=["pt", "eta"], stack="df"
=> creates for every physics_object x object_quantity a subplot where df1 and df2 are stacked
:return: None
"""
def to_list(it): # helper for argument conversion
return it if isinstance(it, list) else [it]
def get_quantity(dataframe, _physics_object, _object_quantity): # helper to get pd.Series...
try: # of a four vector quantity
return getattr(dataframe[_physics_object].v4, _object_quantity).to_numpy()
except AttributeError: # of other quantity
return dataframe.loc[:, (_physics_object, _object_quantity)].to_numpy()
# converting to lists and padding some to the same length with None (for zip/product/cycle)
df, label, unit, hist_range = to_list(df), to_list(label), to_list(unit), to_list(hist_range)
physics_object, object_quantity = to_list(physics_object), to_list(object_quantity)
unit = unit + [None] * (len(object_quantity) - len(unit)) # padding up to object_quantity dim if necessary
label = label + [None] * (len(df) - len(label)) # padding up to df dim if necessary
# checking for hist_range length in case to prevent strange plots - might be discarded
assert (len(hist_range) == 1 or len(hist_range) == len(object_quantity)
or (len(hist_range) == len(physics_object) and len(object_quantity) == 1)
or len(hist_range) == len(list(product(physics_object, object_quantity)))), \
"hist_range have either to be None or a single 2D - tuple or have the same length" \
" as physics_object if a single object_quantity is given or have the same length" \
" as object_quantity or have the same length as physics_object * object_quantity"
hist_range = cycle(hist_range)
# checking for the specific case - might be discarded
assert len(df) == 1 and stack == "physics_object" or stack != "physics_object", \
"Please pass a single DataFrame with pd.MultiIndex columns to stack multiple" \
" physics objects or change the option to stack='df'"
# number of created plots and subdivision in columns and rows
n_plots = len(list(product(physics_object if stack != "physics_object" else df, object_quantity)))
n_columns = len(object_quantity) if len(object_quantity) > 1 else (3 if n_plots > 3 else n_plots)
n_rows = int(np.ceil(n_plots / n_columns))
fig, ax = plt.subplots(n_rows, n_columns, figsize=(25, int(5 * n_rows)))
if stack == "physics_object": # stacking provided physics objects
for i, (_ax, (_quantity, _unit)) in enumerate(zip(ax.flatten(), zip(object_quantity, unit))):
_hist_range = next(hist_range) # set a specific range for _quantity or None
_ax.hist([get_quantity(df[0], _object, _quantity) for _object in physics_object],
bins=100, histtype="bar", range=_hist_range, density=False, stacked=True,
label=physics_object)
_ax.set(yscale="log", ylabel="Events",
xlabel=f"{_quantity} {'in ' + _unit if _unit else ''}")
_ax.legend()
else: # unstacked (comparing) variant or dataframe wise stacking
for i, (_ax, (_object, (_quantity, _unit))) in enumerate(zip(ax.flatten(),
product(physics_object,
zip(object_quantity, unit)))):
_hist_range = next(hist_range) # set a specific range for (_object, _quantity) or None
if stack == "df": # dataframe wise stacking
_ax.hist([get_quantity(_df, _object, _quantity) for _df, _ in zip(df, label)],
bins=100, histtype="bar", range=_hist_range, density=False, stacked=True,
label=[_label if _label else f"Label_{j}" for j, (_, _label) in enumerate(zip(df, label))])
else: # for shape comparison without stacking
edges = None # for auto setting of first histogram edges
for j, (_df, _label) in enumerate(zip(df, label)): # in case multiple dataframes are given
_, edges, _ = _ax.hist(get_quantity(_df, _object, _quantity),
bins=edges if edges is not None else 100, # same edges
histtype="step", range=_hist_range,
label=_label if _label else f"Label_{j}") # default or custom
_ax.set(yscale="log", title=f"{_object}: {_quantity}", ylabel="Events",
xlabel=f"{_quantity} {'in ' + _unit if _unit else ''}")
_ax.legend()
plt.tight_layout()
plt.show()
### Proposal for a plotting function of physics object quantities in a grid plot given one or more MultiIndex pd.DataFrames
### without stack option
def plot_quantities(
df: Union[pd.DataFrame, List[pd.DataFrame]],
column: Union[str, List[str]],
quantity: Union[str, List[str], Any] = None,
bins: Union[int, List[int], Any] = None,
hist_range: Union[Tuple[float, float], List[Tuple[float, float]], Any] = None,
density: Union[bool, List[bool]] = False,
weights: Union[pd.Series, List[pd.Series], npt.NDArray, List[npt.NDArray] , Any] = None,
label: Union[str, List[str], Any] = None,
unit: Union[str, List[str], Any] = None,
yscale: Union[str, List[str]] = "log",
suptitle: Union[str, List[str], Any] = None,
):
"""
Plot the distributions of given quantities of given physics objects that are present in given DataFrame(s). For
two or more DataFrames, the plotting is done object-quantity wise in the same subplot.
:param df: pd.DataFrame or list of pd.DataFrames
:param column: str or list of str of physics objects, i. e. "lepton_0" or ["jet_0", "jet_1"]
:param quantity: str or list of str, i. e. "pt" or ["pt", "eta", "phi"] in case of a two level index, None
if only one index level exists
:param label: str or list of str with the same length as df if df is a list
:param unit: str or list of str with the same length as quantity for x-axis label or the same length as columns in case
quantity is None
:param density: bool or list of bool with the same length as quantity
:param hist_range: None or (float, float) or list of those with same length as quantity or column or the same
length as all created subplots:
a) column = ["jet_0", "jet_1", "jet_2"], quantity = ["pt"],
hist_range = [(0, 100), (0, 50), (0, 25)]
=> creates subplots of (jet_0, pt), (jet_1, pt) and (jet_2, pt) in (0, 100), (0, 50) and (0, 25)
intervals.
b) column = ["jet_0", "jet_1", "jet_2"], quantity = ["pt", "eta"],
hist_range = [(0, 100), (-4, 4)]
=> creates subplots where pt of all subplots ranges are set to (0, 100) and eta ranges to (-4, 4)
c) column = ["jet_0", "jet_1", "jet_2"], quantity = ["pt", "eta"],
hist_range = [(0, 100), (-4, 4), None, (-4, 4), None, (-4, 4)]
=> creates six subplots where all eta ranges are set to (-4, 4) and (jet_0, pt) range is set
to (0, 100). All other ranges are set by default.
:param bins: int or list of int, setting the number of used bins. In case of a list: the same rules applies for bins as for
hist_range
:param yscale: str like as in matplotlib yscale or list of str, setting the used yscale. In case of a list: the same rules
applies for bins as for hist_range. Possible options are for example 'log', 'linear', 'logit', 'sumlog', ...
:param weights: None, 1D np.ndarray or pd.Series or a list of those with the same length as df
:param suptitle: None or str containing the figure title
:return: None
"""
def to_list(it: Any) -> list: # helper for argument conversion
return it if isinstance(it, list) else [it]
def padding_up(it: list, target: Union[list, Any], value: Any = None) -> list: # helper for argument padding
return it + [value] * (len(target) - len(it))
with_quantity = bool(quantity) # in case no quantity is provided
# conversion to list
df, label, unit, hist_range, column, quantity, bins, density, yscale, weights = tuple(
map(
to_list,
[df, label, unit, hist_range, column, quantity, bins, density, yscale, weights],
)
)
# padding up to quantity dim if necessary
unit = padding_up(unit, quantity)
density = padding_up(density, quantity, len(density) == 1 and density[0])
# padding up to df dim if necessary
label = padding_up(label, df)
weights = padding_up(weights, df)
def check_dim_of(it: list) -> bool: # checking for bins, hist_range, yscale length in case to prevent strange plots
return (len(it) == 1 or len(it) == len(quantity)
or (len(it) == len(column) and len(quantity) == 1)
or len(it) == len(list(product(column, quantity))))
assert check_dim_of(hist_range) and check_dim_of(bins) and check_dim_of(yscale), \
"(hist_range/bins/yscale) have either to be None or a (two element tuple/int/str) or be a " \
"list of those types and have the same length as column if single or None quantity is " \
"given or have the same length as quantity or the length of column * quantity"
bins, hist_range, yscale = cycle(bins), cycle(hist_range), cycle(yscale)
# number of created plots and subdivision in columns and rows
n_plots = len(list(product(column, quantity)))
n_columns = len(quantity) if len(quantity) > 1 else (3 if n_plots > 3 else n_plots)
n_rows = int(np.ceil(n_plots / n_columns))
fig, ax = plt.subplots(n_rows, n_columns, figsize=(25, int(4 * n_rows)))
ax = ax if isinstance(ax, np.ndarray) else np.array(ax) # in case only a single column amd a single quantity is given
fig.suptitle(suptitle)
def get_quantity(dataframe: pd.DataFrame, _column_name: str, _quantity_name: str)-> npt.NDArray: # helper to get pd.Series...
if _quantity_name is not None: # in case of a two level index
try: # of a four vector quantity
return getattr(dataframe[_column_name].v4, _quantity_name).to_numpy()
except AttributeError: # of other quantity
return dataframe.loc[:, (_column_name, _quantity_name)].to_numpy()
else: # in case of a single level index
return dataframe.loc[:, _column_name].to_numpy()
def is_None(it: Any) -> bool: # helper: checking for None __and__ avoiding a collision with np.ndarray like objects
return isinstance(it, type(None))
for i, (_ax, (_column, (_quantity, _unit, _density))) in enumerate(
zip(ax.flatten(), product(column, zip(quantity, unit, density)))
):
empty_subplot_content = 0 # counter for setting subplots with no content to invisible
edges = None # for auto setting of first histogram edges
_bins, _hist_range, _yscale = (
next(bins), next(hist_range), next(yscale),
) # next iteration of bins, hist_range, yscale for (_column, _quantity)
_ax_ylim_low, _ax_ylim_high = [], []
for j, (_df, _label, _weights) in enumerate(zip(df, label, weights)): # in case multiple dataframes are given
_plot_values = get_quantity(_df, _column, _quantity)
if np.all(np.isnan(_plot_values)):
empty_subplot_content += 1
continue
_valid_plot_values_mask = ~np.isnan(_plot_values)
_plot_values = _plot_values[_valid_plot_values_mask]
if _weights is not None:
_weights = _weights[_valid_plot_values_mask]
hist, edges, _ = _ax.hist(
_plot_values,
histtype="step",
range=_hist_range,
density=_density,
bins=100 if not _bins and is_None(edges) else _bins if is_None(edges) else edges, # same edges
label=_label if _label else f"Label_{j}", # default or custom
weights=_weights,
)
_ax_ylim_low.append(hist[hist > 0].min())
_ax_ylim_high.append(hist.max() * (10 if _yscale == "log" else 1.2))
_ax.legend()
if empty_subplot_content == len(df):
_ax.set_visible(False) # not showing MET eta or similar not defined or emtpy NaN like quantity lists
continue
_ax.set(
ylim=(min(_ax_ylim_low), max(_ax_ylim_high)),
yscale=_yscale,
ylabel="Events",
title=f"{_column}{':' if _quantity else ''} {_quantity if _quantity else ''}",
xlabel=f"{_quantity if _quantity else _column} " # _unit if with_quantity else unit[i] if _unit or unit[i] else ''
f"{'in ' if (unit[i % len(unit)]) or (_unit and with_quantity) else ''}"
f"{_unit if (_unit and with_quantity) else (unit[i % len(unit)] if unit[i % len(unit)] else '')}",
)
plt.tight_layout()
plt.show()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment