{array<1>{real}}
+ │ │ ├── [1mcumulative_length[0m · array<1>{real}
+ │ │ └── [1mflattened_data[0m · array<1>{real}
+ │ ├── [1mdistance_to_nplus_surface_mm[0m · array<1>{array<1>{real}}
+ │ │ ├── [1mcumulative_length[0m · array<1>{real}
+ │ │ └── [1mflattened_data[0m · array<1>{real}
+ │ ├── [1medep[0m · array<1>{array<1>{real}}
+ │ │ ├── [1mcumulative_length[0m · array<1>{real}
+ │ │ └── [1mflattened_data[0m · array<1>{real}
+ │ ├── [1menergy_sum[0m · array<1>{real}
+ │ ├── [1menergy_sum_deadlayer[0m · array<1>{real}
+ │ ├── [1menergy_sum_smeared[0m · array<1>{real}
+ │ ├── [1mevtid[0m · array<1>{real}
+ │ ├── [1mglobal_evtid[0m · array<1>{real}
+ │ ├── [1mrpos_loc[0m · array<1>{array<1>{real}}
+ │ │ ├── [1mcumulative_length[0m · array<1>{real}
+ │ │ └── [1mflattened_data[0m · array<1>{real}
+ │ ├── [1mt0[0m · array<1>{real}
+ │ ├── [1mtime[0m · array<1>{array<1>{real}}
+ │ │ ├── [1mcumulative_length[0m · array<1>{real}
+ │ │ └── [1mflattened_data[0m · array<1>{real}
+ │ └── [1mzpos_loc[0m · array<1>{array<1>{real}}
+ │ ├── [1mcumulative_length[0m · array<1>{real}
+ │ └── [1mflattened_data[0m · array<1>{real}
+ └── [1mdet002[0m · HDF5 group
+ └── [1mhit[0m · table{edep,time,t0,evtid,global_evtid,distance_to_nplus_surface_mm,activeness,rpos_loc,zpos_loc,energy_sum,energy_sum_deadlayer,energy_sum_smeared}
+ ├── [1mactiveness[0m · array<1>{array<1>{real}}
+ │ ├── [1mcumulative_length[0m · array<1>{real}
+ │ └── [1mflattened_data[0m · array<1>{real}
+ ├── [1mdistance_to_nplus_surface_mm[0m · array<1>{array<1>{real}}
+ │ ├── [1mcumulative_length[0m · array<1>{real}
+ │ └── [1mflattened_data[0m · array<1>{real}
+ ├── [1medep[0m · array<1>{array<1>{real}}
+ │ ├── [1mcumulative_length[0m · array<1>{real}
+ │ └── [1mflattened_data[0m · array<1>{real}
+ ├── [1menergy_sum[0m · array<1>{real}
+ ├── [1menergy_sum_deadlayer[0m · array<1>{real}
+ ├── [1menergy_sum_smeared[0m · array<1>{real}
+ ├── [1mevtid[0m · array<1>{real}
+ ├── [1mglobal_evtid[0m · array<1>{real}
+ ├── [1mrpos_loc[0m · array<1>{array<1>{real}}
+ │ ├── [1mcumulative_length[0m · array<1>{real}
+ │ └── [1mflattened_data[0m · array<1>{real}
+ ├── [1mt0[0m · array<1>{real}
+ ├── [1mtime[0m · array<1>{array<1>{real}}
+ │ ├── [1mcumulative_length[0m · array<1>{real}
+ │ └── [1mflattened_data[0m · array<1>{real}
+ └── [1mzpos_loc[0m · array<1>{array<1>{real}}
+ ├── [1mcumulative_length[0m · array<1>{real}
+ └── [1mflattened_data[0m · array<1>{real}
+
+
+The new format is a factor of x17 times smaller than the input file due
+to the removal of many *step* based fields which use a lot of memory and
+due to the removal of the *vertices* table and the LAr hits. So we can
+easily read the whole file into memory. We use *awkward* to analyse the
+output files.
+
+.. code:: python
+
+ data_det001 = lh5.read_as("det001/hit", "output/hit/output.lh5", "ak")
+ data_det002 = lh5.read_as("det002/hit", "output/hit/output.lh5", "ak")
+
+.. code:: python
+
+ data_det001
+
+
+
+
+.. raw:: html
+
+ [{edep: [0.0826, 0.00863, ..., 32.3], time: [1.32e+15, ...], t0: 1.32e+15, ...},
+ {edep: [0.103, 0.0256, ..., 37.6, 6.44], time: [...], t0: 1.24e+15, ...},
+ {edep: [0.0824, 0.00863, ..., 16.8], time: [2.21e+14, ...], t0: 2.21e+14, ...},
+ {edep: [0.101, 0.0802, ..., 20.9], time: [9.09e+14, ...], t0: 9.09e+14, ...},
+ {edep: [0.00332, 0.0171, ..., 45, 27.7], time: [...], t0: 4.26e+13, ...},
+ {edep: [0.0845, 0.00863, ..., 27.9], time: [6.86e+14, ...], t0: 6.86e+14, ...},
+ {edep: [0.0065, 0.255, ..., 41.5, 2.69], time: [...], t0: 1.24e+14, ...},
+ {edep: [0.0388, 0.188, ..., 1.1, 41.5], time: [...], t0: 6.48e+14, ...},
+ {edep: [0.00332, 0.116, ..., 16.2], time: [4.39e+14, ...], t0: 4.39e+14, ...},
+ {edep: [0.00615, 0.0204, ..., 22.1], time: [7.11e+14, ...], t0: 7.11e+14, ...},
+ ...,
+ {edep: [0.19, 0.0171, ..., 42.2, 10.9], time: [...], t0: 1.1e+15, ...},
+ {edep: [0.0118, 0.0303, ..., 34.5, 21.4], time: [...], t0: 1.51e+15, ...},
+ {edep: [0.0204, 0.152, ..., 2.79, 51.9], time: [...], t0: 9.73e+14, ...},
+ {edep: [0.118, 0.0254, ..., 41.2, 38.6], time: [...], t0: 9.67e+14, ...},
+ {edep: [0.0824, 0.0254, ..., 34.6, 18.9], time: [...], t0: 6.64e+14, ...},
+ {edep: [0.148, 0.0802, ..., 40.9, 24], time: [...], t0: 5.56e+14, ...},
+ {edep: [0.022, 0.0148, ..., 34.8, 11.9], time: [...], t0: 6.52e+14, ...},
+ {edep: [0.0155, 0.118, ..., 0.458, 9.65], time: [...], t0: 3.97e+14, ...},
+ {edep: [0.0065, 0.00615, ..., 13.7], time: [3.98e+14, ...], t0: 3.98e+14, ...}]
+ --------------------------------------------------------------------------------
+ type: 835793 * {
+ edep: var * float64,
+ time: var * float64,
+ t0: float64,
+ evtid: float64,
+ global_evtid: float64,
+ distance_to_nplus_surface_mm: var * float64,
+ activeness: var * int64,
+ rpos_loc: var * float64,
+ zpos_loc: var * float64,
+ energy_sum: float64,
+ energy_sum_deadlayer: float64,
+ energy_sum_smeared: float64
+ }
+
+
+
+Part 4) Steps in a standard processing chain
+--------------------------------------------
+
+The next part of the tutorial gives more details on each step of the
+processing chain.
+
+4.1) Windowing
+~~~~~~~~~~~~~~
+
+We can compare the decay index (“evtid” in the “stp” file) to the index
+of the “hit”, the row of the hit table. We see that only some decays
+correspond to “hits” in the detector, as we expect. We also see that a
+single decay does not often produce multiple hits. This is also expected
+since the probability of detection is fairly low.
+
+.. code:: python
+
+ plt.scatter(
+ np.sort(data_det001.global_evtid), np.arange(len(data_det001)), marker=".", alpha=1
+ )
+ plt.xlabel("Decay index (evtid)")
+ plt.ylabel("Hit Index")
+ plt.grid()
+ plt.xlim(0, 1000)
+ plt.ylim(0, 100)
+
+
+
+
+.. parsed-literal::
+
+ (0.0, 100.0)
+
+
+
+
+.. image:: images/output_20_1.png
+
+
+However, we can use some array manipulation to extract decay index with
+multiple hits, by plotting the times we see the effect of the windowing.
+
+.. code:: python
+
+ def plot_times(times: ak.Array, xrange=None, sub_zero=False, **kwargs):
+ fig, ax = plt.subplots()
+ for idx, _time in enumerate(times):
+ if sub_zero:
+ _time = _time - ak.min(_time)
+ h = hist.new.Reg(
+ 100,
+ (ak.min(times) / 1e9),
+ (ak.max(times) / 1e9) + 1,
+ name="Time since event start [s]",
+ ).Double()
+ h.fill(_time / 1e9)
+ h.plot(**kwargs, label=f"Hit {idx}")
+ ax.legend()
+ ax.set_yscale("log")
+ if xrange is not None:
+ ax.set_xlim(*xrange)
+
+
+.. code:: python
+
+ unique, counts = np.unique(data_det001.global_evtid, return_counts=True)
+
+.. code:: python
+
+ plot_times(
+ data_det001[data_det001.global_evtid == unique[counts > 1][1]].time,
+ histtype="step",
+ yerr=False,
+ )
+
+
+
+
+.. image:: images/output_24_0.png
+
+
+4.2) Distance to surface and dead layer
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+One of the important step in the post-processing of HPGe detector
+simulations is the detector activeness mapping. Energy deposited close
+to the surface of the Germanium detector will result in incomplete
+charge collection and a degraded signal. To account for this we added a
+processor to compute the distance to the detector surface (based on
+``legendhpges.base.HPGe.distance_to_surface()``)
+
+For the steps in the detector we extracted in the processing chain the
+local r and z coordinates and we can plot maps of the distance to the
+detector surface and the activeness for each step. We select only events
+within 5 mm of the surface for the first plots. We can see that the
+processor works as expected.
+
+.. code:: python
+
+ def plot_map(field, scale="BuPu", clab="Distance [mm]"):
+ fig, axs = plt.subplots(1, 2, figsize=(12, 4), sharey=True)
+ n = 100000
+ for idx, (data, config) in enumerate(
+ zip(
+ [data_det001, data_det002],
+ ["cfg/metadata/BEGe.json", "cfg/metadata/Coax.json"],
+ )
+ ):
+ reg = pg4.geant4.Registry()
+ hpge = legendhpges.make_hpge(config, registry=reg)
+
+ legendhpges.draw.plot_profile(hpge, split_by_type=True, axes=axs[idx])
+ rng = np.random.default_rng()
+ r = rng.choice(
+ [-1, 1], p=[0.5, 0.5], size=len(ak.flatten(data.rpos_loc))
+ ) * ak.flatten(data.rpos_loc)
+ z = ak.flatten(data.zpos_loc)
+ c = ak.flatten(data[field])
+ cut = c < 5
+
+ s = axs[idx].scatter(
+ r[cut][0:n],
+ z[cut][0:n],
+ c=c[cut][0:n],
+ marker=".",
+ label="gen. points",
+ cmap=scale,
+ )
+ # axs[idx].axis("equal")
+
+ if idx == 0:
+ axs[idx].set_ylabel("Height [mm]")
+ c = plt.colorbar(s)
+ c.set_label(clab)
+
+ axs[idx].set_xlabel("Radius [mm]")
+
+
+.. code:: python
+
+ plot_map("distance_to_nplus_surface_mm")
+
+
+.. parsed-literal::
+
+ [32mroot [INFO] genericpolycone.antlr>[0m
+ [32mroot [INFO] genericpolyhedra.antlr>[0m
+ [32mroot [INFO] visualisation.Mesh.getBoundingBox> [-36.98, -36.98, 0.0] [36.98, 36.98, 29.46][0m
+ [32mroot [INFO] box.pycsgmesh> getBoundingBoxMesh[0m
+ [32mroot [INFO] genericpolycone.antlr>[0m
+ [32mroot [INFO] genericpolyhedra.antlr>[0m
+ [32mroot [INFO] visualisation.Mesh.getBoundingBox> [-38.25, -38.25, 0.0] [38.25, 38.25, 84.0][0m
+ [32mroot [INFO] box.pycsgmesh> getBoundingBoxMesh[0m
+
+
+
+.. image:: images/output_27_1.png
+
+
+.. code:: python
+
+ plot_map("activeness", clab="Activeness", scale="viridis")
+
+
+.. parsed-literal::
+
+ [32mroot [INFO] genericpolycone.antlr>[0m
+ [32mroot [INFO] genericpolyhedra.antlr>[0m
+ [32mroot [INFO] visualisation.Mesh.getBoundingBox> [-36.98, -36.98, 0.0] [36.98, 36.98, 29.46][0m
+ [32mroot [INFO] box.pycsgmesh> getBoundingBoxMesh[0m
+ [32mroot [INFO] genericpolycone.antlr>[0m
+ [32mroot [INFO] genericpolyhedra.antlr>[0m
+ [32mroot [INFO] visualisation.Mesh.getBoundingBox> [-38.25, -38.25, 0.0] [38.25, 38.25, 84.0][0m
+ [32mroot [INFO] box.pycsgmesh> getBoundingBoxMesh[0m
+
+
+
+.. image:: images/output_28_1.png
+
+
+We can also plot a histogram of the distance to the surface.
+
+.. code:: python
+
+ def plot_distances(axes, distances, xrange=None, label=" ", **kwargs):
+ h = hist.new.Reg(100, *xrange, name="Distance to n+ surface [mm]").Double()
+ h.fill(distances)
+ h.plot(**kwargs, label=label)
+ axes.legend()
+ axes.set_yscale("log")
+ if xrange is not None:
+ ax.set_xlim(*xrange)
+
+
+.. code:: python
+
+ fig, ax = plt.subplots()
+ plot_distances(
+ ax,
+ ak.flatten(data_det001.distance_to_nplus_surface_mm),
+ xrange=(0, 35),
+ label="BEGe",
+ histtype="step",
+ yerr=False,
+ )
+ plot_distances(
+ ax,
+ ak.flatten(data_det002.distance_to_nplus_surface_mm),
+ xrange=(0, 35),
+ label="Coax",
+ histtype="step",
+ yerr=False,
+ )
+
+
+
+
+.. image:: images/output_31_0.png
+
+
+4.3) Summed energies
+~~~~~~~~~~~~~~~~~~~~
+
+Our processing chain also sums the energies of the hits, both before and
+after weighting by the activeness.
+
+.. code:: python
+
+ def plot_energy(axes, energy, bins=400, xrange=None, label=" ", log_y=True, **kwargs):
+ h = hist.new.Reg(bins, *xrange, name="energy [keV]").Double()
+ h.fill(energy)
+ h.plot(**kwargs, label=label)
+ axes.legend()
+ if log_y:
+ axes.set_yscale("log")
+ if xrange is not None:
+ axes.set_xlim(*xrange)
+
+.. code:: python
+
+ fig, ax = plt.subplots()
+ ax.set_title("BEGe energy spectrum")
+ plot_energy(
+ ax, data_det001.energy_sum, yerr=False, label="True energy", xrange=(0, 4000)
+ )
+ plot_energy(
+ ax,
+ data_det001.energy_sum_deadlayer,
+ yerr=False,
+ label="Energy after dead layer",
+ xrange=(0, 4000),
+ )
+
+
+
+.. image:: images/output_34_0.png
+
+
+.. code:: python
+
+ fig, ax = plt.subplots()
+ ax.set_title("COAX energy spectrum")
+ plot_energy(
+ ax, data_det002.energy_sum, yerr=False, label="True energy", xrange=(0, 4000)
+ )
+ plot_energy(
+ ax,
+ data_det002.energy_sum_deadlayer,
+ yerr=False,
+ label="Energy after dead layer",
+ xrange=(0, 4000),
+ )
+
+
+
+.. image:: images/output_35_0.png
+
+
+4.4) Smearing
+~~~~~~~~~~~~~
+
+The final step in the processing chain smeared the energies by the
+energy resolution. This represents a general class of processors based
+on ‘’heuristic’’ models. Other similar processors could be implemented
+in a similar way. It would also be simple to use instead an energy
+dependent resolution curve. To see the effect we have to zoom into the
+2615 keV peak.
+
+.. code:: python
+
+ fig, axs = plt.subplots()
+ plot_energy(
+ axs,
+ data_det001.energy_sum_smeared,
+ yerr=False,
+ label="BEGe",
+ xrange=(2600, 2630),
+ log_y=False,
+ bins=150,
+ density=True,
+ )
+ plot_energy(
+ axs,
+ data_det002.energy_sum_smeared,
+ yerr=False,
+ label="COAX",
+ xrange=(2600, 2630),
+ log_y=False,
+ bins=150,
+ density=True,
+ )
+
+
+
+.. image:: images/output_37_0.png
+
+
+We see clearly the worse energy resolution for the COAX detector. > **To
+Do**: add a gaussian fit of this.
+
+Part 5) Adding a new processor
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The next part of the tutorial describes how to add a new processor to
+the chain. We use as an example spatial *clustering* of steps. This will
+be added later.
diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst
new file mode 100644
index 0000000..45c8065
--- /dev/null
+++ b/docs/source/tutorial.rst
@@ -0,0 +1,10 @@
+Basic Tutorial
+==============
+
+
+.. toctree::
+ :maxdepth: 2
+ :caption: Contents:
+
+ notebooks/reboost_hpge_tutorial_hit
+ notebooks/reboost_hpge_tutorial_evt
diff --git a/pyproject.toml b/pyproject.toml
index 75976e7..25f75e9 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -30,12 +30,17 @@ classifiers = [
]
requires-python = ">=3.9"
dependencies = [
+ "awkward",
"colorlog",
+ "tqdm",
"numpy",
"scipy",
"numba",
"legend-pydataobj>=1.9.0",
+ "pyg4ometry",
+ "pylegendtestdata",
"legend-pygeom-optics>=0.6.5",
+ "legend-pygeom-hpges",
"hist",
"particle",
"pandas",
@@ -72,6 +77,7 @@ test = [
[project.scripts]
reboost-optical = "reboost.optical.cli:optical_cli"
+reboost-hpge = "reboost.hpge.cli:hpge_cli"
[tool.setuptools]
include-package-data = true
diff --git a/src/reboost/__init__.py b/src/reboost/__init__.py
index 8e49755..c0d8185 100644
--- a/src/reboost/__init__.py
+++ b/src/reboost/__init__.py
@@ -1,6 +1,6 @@
from __future__ import annotations
-from reboost import optical
+from reboost import hpge, optical
from reboost._version import version as __version__
-__all__ = ["__version__", "optical"]
+__all__ = ["__version__", "optical", "hpge"]
diff --git a/src/reboost/hpge/__init__.py b/src/reboost/hpge/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/src/reboost/hpge/cli.py b/src/reboost/hpge/cli.py
new file mode 100644
index 0000000..03ba0c0
--- /dev/null
+++ b/src/reboost/hpge/cli.py
@@ -0,0 +1,148 @@
+from __future__ import annotations
+
+import argparse
+import logging
+
+import colorlog
+
+from . import utils
+
+
+def hpge_cli() -> None:
+ parser = argparse.ArgumentParser(
+ prog="reboost-hpge",
+ description="%(prog)s command line interface",
+ )
+
+ parser.add_argument(
+ "--verbose",
+ "-v",
+ action="store_true",
+ help="""Increase the program verbosity""",
+ )
+
+ parser.add_argument(
+ "--bufsize",
+ action="store",
+ type=int,
+ default=int(5e6),
+ help="""Row count for input table buffering (only used if applicable). default: %(default)e""",
+ )
+
+ # step 1: hit tier
+ subparsers = parser.add_subparsers(dest="command", required=True)
+ hit_parser = subparsers.add_parser("hit", help="build hit file from remage raw file")
+
+ hit_parser.add_argument(
+ "--num",
+ "-n",
+ help="Number of events to process, if not set process all",
+ default=None,
+ type=int,
+ required=False,
+ )
+
+ hit_parser.add_argument(
+ "--start",
+ "-s",
+ help="First event to process (default 0)",
+ default=0,
+ type=int,
+ required=False,
+ )
+
+ hit_parser.add_argument(
+ "--proc_chain",
+ help="JSON or YAML file that contains the processing chain",
+ required=True,
+ )
+ hit_parser.add_argument(
+ "--pars",
+ help="JSON or YAML file that contains the pars",
+ required=True,
+ )
+ hit_parser.add_argument(
+ "--gdml",
+ help="GDML file used for Geant4",
+ default=None,
+ required=False,
+ )
+
+ hit_parser.add_argument(
+ "--meta_path",
+ help="Path to metadata (diodes folder)",
+ default=None,
+ required=False,
+ )
+ hit_parser.add_argument("--infield", help="input LH5 field", required=False, default="hit")
+ hit_parser.add_argument(
+ "--outfield", help="output LH5 field name", required=False, default="hit"
+ )
+ parser.add_argument(
+ "--merge_input",
+ "-m",
+ action="store_true",
+ default=True,
+ help="""Merge input lh5 files into a single output""",
+ )
+ hit_parser.add_argument(
+ "input", help="input hit LH5 files (can include wildcars) ", nargs="+", metavar="INPUT_HIT"
+ )
+ hit_parser.add_argument("output", help="output evt LH5 file", metavar="OUTPUT_EVT")
+
+ args = parser.parse_args()
+
+ handler = colorlog.StreamHandler()
+ handler.setFormatter(
+ colorlog.ColoredFormatter("%(log_color)s%(name)s [%(levelname)s] %(message)s")
+ )
+ logger = logging.getLogger("reboost.hpge")
+ logger.addHandler(handler)
+
+ if args.verbose:
+ logger.setLevel(logging.DEBUG)
+ else:
+ logger.setLevel(logging.INFO)
+
+ if args.command == "hit":
+ # is the import here a good idea?
+ logger.info("...running raw->hit tier")
+ from reboost.hpge.hit import build_hit
+
+ pars = utils.load_dict(args.pars, None)
+ proc_config = utils.load_dict(args.proc_config, None)
+
+ # check the processing chain
+ for req_field in ["channels", "outputs", "step_group", "operations"]:
+ if req_field not in proc_config:
+ msg = f"error proc chain config must contain the field {req_field}"
+ raise ValueError(msg)
+
+ msg = f"""
+ Running build_hit with:
+ - output file :{args.output}
+ - input_file(s) :{args.input}
+ - output field :{args.outfield}
+ - input field :{args.infield}
+ - proc_config :{args.proc_config}
+ - pars :{args.pars}
+ - buffer :{args.bufsize}
+ - gdml file :{args.gdml}
+ - metadata_path :{args.meta_path}
+ - merge input :{args.merge_input}
+ """
+
+ logger.info(msg)
+
+ build_hit(
+ args.output,
+ args.input,
+ out_field=args.outfield,
+ in_field=args.infield,
+ proc_config=proc_config,
+ pars=pars,
+ buffer=args.bufsize,
+ gdml=args.gdml,
+ metadata_path=args.meta_path,
+ merge_input=args.merge_input,
+ )
diff --git a/src/reboost/hpge/evt.py b/src/reboost/hpge/evt.py
new file mode 100644
index 0000000..17ebbc4
--- /dev/null
+++ b/src/reboost/hpge/evt.py
@@ -0,0 +1,167 @@
+from __future__ import annotations
+
+import logging
+
+import awkward as ak
+import numpy as np
+from lgdo import Table
+from lgdo.lh5 import LH5Iterator, write
+from pygama.evt.build_evt import evaluate_expression
+from pygama.evt.utils import TCMData
+
+from . import utils
+
+log = logging.getLogger(__name__)
+
+
+def build_evt(
+ hit_file: str, tcm_file: str, evt_file: str | None, config: dict, buffer: int = int(5e6)
+) -> ak.Array | None:
+ """Generates the event tier from the hit and tcm.
+
+
+ Parameters
+ ----------
+ hit_file
+ path to the hit tier file
+ tcm_file
+ path to the tcm tier file
+ evt_file
+ path to the evt tier (output) file, if `None` the :class:`Table` is returned in memory
+ config
+ dictionary of the configuration. For example:
+
+ .. code-block:: json
+
+ {
+ "channels": {
+ "geds_on":[ "det001", "det002"],
+ "geds_ac":["det003"]
+ },
+ "outputs": [
+ "energy",
+ "multiplicity"
+ ],
+ "operations": {
+ "energy_id": {
+ "channels": "geds_on",
+ "aggregation_mode": "gather",
+ "query": "hit.energy > 25",
+ "expression": "tcm.channel_id"
+ },
+ "energy": {
+ "aggregation_mode": "keep_at_ch:evt.energy_id",
+ "expression": "hit.energy > 25",
+ "channels": "geds_on"
+ },
+ "multiplicity": {
+ "channels": "geds_on",
+ "aggregation_mode": "sum",
+ "expression": "hit.energy > 25",
+ "initial": 0
+ }
+ }
+ }
+
+ Must contain:
+ - "channels": dictionary of channel groupings
+ - "outputs": fields for the output file
+ - "operations": operations to perform see :func:`pygama.evt.build_evt.evaluate_expression` for more details.
+
+ buffer
+ number of events to process simultaneously
+
+ Returns
+ -------
+ ak.Array of the evt tier data (if the data is not saved to disk)
+ """
+
+ msg = "... beginning the evt tier processing"
+ log.info(msg)
+
+ # create the objects needed for evaluate expression
+
+ file_info = {
+ "hit": (hit_file, "hit", "det{:03}"),
+ "evt": (evt_file, "evt"),
+ }
+
+ # iterate through the TCM
+
+ out_ak = ak.Array([])
+ mode = "of"
+
+ # get channel groupings
+ channels = {}
+ for group, info in config["channels"].items():
+ if isinstance(info, str):
+ channels[group] = [info]
+
+ elif isinstance(info, list):
+ channels[group] = info
+
+ for tcm_lh5, _, n_rows_read in LH5Iterator(tcm_file, "tcm", buffer_len=buffer):
+ tcm_lh5_sel = tcm_lh5
+ tcm_ak = tcm_lh5_sel.view_as("ak")[:n_rows_read]
+
+ tcm = TCMData(
+ id=np.array(ak.flatten(tcm_ak.array_id)),
+ idx=np.array(ak.flatten(tcm_ak.array_idx)),
+ cumulative_length=np.array(np.cumsum(ak.num(tcm_ak.array_id, axis=-1))),
+ )
+
+ n_rows = len(tcm.cumulative_length)
+ out_tab = Table(size=n_rows)
+
+ for name, info in config["operations"].items():
+ msg = f"computing field {name}"
+ log.debug(msg)
+
+ defaultv = info.get("initial", np.nan)
+ if isinstance(defaultv, str) and (defaultv in ["np.nan", "np.inf", "-np.inf"]):
+ defaultv = eval(defaultv)
+
+ channels_use = utils.get_channels_from_groups(info.get("channels", []), channels)
+ channels_exclude = utils.get_channels_from_groups(
+ info.get("exclude_channels", []), channels
+ )
+
+ if "aggregation_mode" not in info:
+ field = out_tab.eval(
+ info["expression"].replace("evt.", ""), info.get("parameters", {})
+ )
+ else:
+ field = evaluate_expression(
+ file_info,
+ tcm,
+ channels_use,
+ table=out_tab,
+ mode=info["aggregation_mode"],
+ expr=info["expression"],
+ query=info.get("query", None),
+ sorter=info.get("sort", None),
+ channels_skip=channels_exclude,
+ default_value=defaultv,
+ n_rows=n_rows,
+ )
+
+ msg = f"field {field}"
+ log.debug(msg)
+ out_tab.add_field(name, field)
+
+ # remove fields if necessary
+ existing_cols = list(out_tab.keys())
+ for col in existing_cols:
+ if col not in config["outputs"]:
+ out_tab.remove_column(col, delete=True)
+
+ # write
+ if evt_file is not None:
+ write(out_tab, "evt", evt_file, wo_mode=mode)
+ mode = "append"
+ else:
+ out_ak = ak.concatenate((out_ak, out_tab.view_as("ak")))
+
+ if evt_file is None:
+ return out_ak
+ return None
diff --git a/src/reboost/hpge/hit.py b/src/reboost/hpge/hit.py
new file mode 100644
index 0000000..b17104b
--- /dev/null
+++ b/src/reboost/hpge/hit.py
@@ -0,0 +1,397 @@
+from __future__ import annotations
+
+import logging
+
+import awkward as ak
+import pyg4ometry
+from lgdo import Array, ArrayOfEqualSizedArrays, Table, VectorOfVectors, lh5
+
+from . import utils
+
+log = logging.getLogger(__name__)
+
+
+def step_group(data: Table, group_config: dict) -> Table:
+ """Performs a grouping of geant4 steps to build the `hit` table.
+
+ Parameters
+ ----------
+ data
+ `stp` table from remage.
+ group_config
+ dict with the configuration describing the step grouping.
+ For example
+
+ .. code-block:: json
+
+ {
+ "description": "group steps by time and evtid.",
+ "expression": "reboost.hpge.processors.group_by_time(stp,window=10)"
+ }
+
+ this will then evaluate the function chosen by `expression` resulting in a new Table.
+
+ """
+
+ # group to create hit table
+
+ group_func, globs = utils.get_function_string(
+ group_config["expression"],
+ )
+ locs = {"stp": data}
+
+ msg = f"running step grouping with {group_func} and globals {globs.keys()} and locals {locs.keys()}"
+ log.debug(msg)
+ return eval(group_func, globs, locs)
+
+
+def get_locals(
+ local_info: dict,
+ pars_dict: dict | None,
+ detector: str | None = None,
+ meta_path: str | None = None,
+ reg: pyg4ometry.geant4.Registry | None = None,
+) -> dict:
+ """Compute any local variables needed for processing.
+
+ Parameters
+ ----------
+ local_info
+ config file block of the local objects to compute. For example:
+
+ ..code-block::
+
+ {"hpge": "reboost.hpge.utils.get_hpge(meta_path=meta,pars=pars,detector=detector)"}
+
+ pars_dict
+ dictionary of parameters
+ detector
+ remage name for the dete
+ meta_path
+ path to thhe diodes folder of the metadata
+ reg
+ geant4 registry of the experiment
+
+ Returns
+ -------
+ dictionary of local variables to pass to eval
+ """
+ local_dict = {}
+
+ # get parameters named tuple
+ if pars_dict is not None:
+ pars_tuple = utils.dict2tuple(pars_dict)
+ local_dict = {"pars": pars_tuple}
+
+ objs = {"pars": pars_tuple, "detector": detector, "meta": meta_path, "reg": reg}
+
+ for name, eval_str in local_info.items():
+ proc_func, globs = utils.get_function_string(eval_str)
+
+ msg = f"extracting local object with {eval_str} "
+ log.debug(msg)
+ obj = eval(proc_func, globs, objs)
+ local_dict = local_dict | {name: obj}
+ return local_dict
+
+
+def eval_expression(
+ table: Table, info: dict, local_dict: dict | None
+) -> Array | ArrayOfEqualSizedArrays | VectorOfVectors:
+ """Evaluate an expression returning an LGDO object.
+
+ Parameters
+ ----------
+ table
+ hit table, with columns possibly used in the operations.
+ info
+ `dict` containing the information on the expression. Must contain `mode` and `expressions` keys. For example:
+
+ .. code-block:: json
+
+ {
+ "mode": "eval",
+ "expression":"ak.sum(hit.edep,axis=-1)"
+ }
+
+ Variables preceded by `hit` will be taken from the supplied table. Mode can be either `eval`,
+ in which case a simple expression is based (only involving numpy, awkward or inbuilt python functions),
+ or `function` in which case an arbitrary function is passed (for example defined in processors).
+
+ There are several objects passed to the evaluation as 'locals' determined by the `local_dict`
+
+ local_dict
+ mapping of local variables for the evaluation
+
+ Returns
+ -------
+ a new column for the hit table either :class:`Array`, :class:`ArrayOfEqualSizedArrays` or :class:`VectorOfVectors`.
+
+ Note
+ ----
+ In future the passing of local variables (pars,hpge,reg) to the evaluation should be make more generic.
+ """
+ if local_dict is None:
+ local_dict = {}
+
+ if info["mode"] == "eval":
+ # replace hit.
+ expr = info["expression"].replace("hit.", "")
+
+ msg = f"evaluating table with command {expr} and local_dict {local_dict.keys()}"
+ log.debug(msg)
+
+ col = table.eval(expr, local_dict)
+
+ elif info["mode"] == "function":
+ proc_func, globs = utils.get_function_string(info["expression"])
+
+ # add hit table to locals
+ local_dict = {"hit": table} | local_dict
+
+ msg = f"evaluating table with command {info['expression']} and local_dict {local_dict.keys()} and global dict {globs.keys()}"
+ log.debug(msg)
+ col = eval(proc_func, globs, local_dict)
+
+ else:
+ msg = "mode is not recognised."
+ raise ValueError(msg)
+ return col
+
+
+def build_hit(
+ file_out: str,
+ list_file_in: list,
+ out_field: str,
+ in_field: str,
+ proc_config: dict,
+ pars: dict,
+ n_evtid: int | None = None,
+ start_evtid: int = 0,
+ buffer: int = 1000000,
+ gdml: str | None = None,
+ metadata_path: str | None = None,
+ merge_input_files: bool = True,
+) -> None:
+ """
+ Read incrementally the files compute something and then write output
+
+ Parameters
+ ----------
+ file_out
+ output file path
+ list_file_in
+ list of input files
+ out_field
+ lh5 group name for output
+ in_field
+ lh5 group name for input
+ proc_config
+ the configuration file for the processing. Must contain the fields `channels`, `outputs`, `step_group` and operations`.
+ Optionally can also contain the `locals` field to extract other non-JSON serializable objects used by the processors.
+ For example:
+
+ .. code-block:: json
+
+ {
+ "channels": [
+ "det000",
+ "det001"
+ ],
+ "outputs": [
+ "t0",
+ "truth_energy_sum",
+ "smeared_energy_sum",
+ "evtid"
+ ],
+ "step_group": {
+ "description": "group steps by time and evtid.",
+ "expression": "reboost.hpge.processors.group_by_time(stp,window=10)"
+ },
+ "locals": {
+ "hpge": "reboost.hpge.utils.get_hpge(meta_path=meta,pars=pars,detector=detector)"
+ },
+ "operations": {
+ "t0": {
+ "description": "first time in the hit.",
+ "mode": "eval",
+ "expression": "ak.fill_none(ak.firsts(hit.time,axis=-1),np.nan)"
+ },
+ "truth_energy_sum": {
+ "description": "truth summed energy in the hit.",
+ "mode": "eval",
+ "expression": "ak.sum(hit.edep,axis=-1)"
+ },
+ "smeared_energy_sum": {
+ "description": "summed energy after convolution with energy response.",
+ "mode": "function",
+ "expression": "reboost.hpge.processors.smear_energies(hit.truth_energy_sum,reso=pars.reso)"
+ }
+
+ }
+ }
+
+ pars
+ a dictionary of parameters, must have a field per channel consisting of a `dict` of parameters. For example:
+
+ .. code-block:: json
+
+ {
+ "det000": {
+ "reso": 1,
+ "fccd": 0.1,
+ "phy_vol_name":"det_phy",
+ "meta_name": "icpc.json"
+ }
+ }
+
+ this should also contain the channel mappings needed by reboost. These are:
+ - `phy_vol_name`: is the name of the physical volume,
+ - `meta_name` : is the name of the JSON file with the metadata.
+
+ If these keys are not present both will be set to the remage output table name.
+
+ start_evtid
+ first `evtid` to read, defaults to 0.
+ n_evtid
+ number of `evtid` to read, if `None` all steps are read (the default).
+ buffer
+ length of buffer
+ gdml
+ path to the input gdml file.
+ metadata_path
+ path to the folder with the metadata (i.e. the `hardware.detectors.germanium.diodes` folder of `legend-metadata`)
+ merge_input_files
+ boolean flag to merge all input files into a single output.
+
+ Note
+ ----
+ - The operations can depend on the outputs of previous steps, so operations order is important.
+ - It would be better to have a cleaner way to supply metadata and detector maps.
+ """
+
+ # get the gdml file
+ with utils.debug_logging(logging.CRITICAL):
+ reg = pyg4ometry.gdml.Reader(gdml).getRegistry() if gdml is not None else None
+
+ # get info on the files to read in a nice named tuple
+ finfo = utils.get_selected_files(
+ table=in_field,
+ file_list=list_file_in,
+ n_evtid=n_evtid,
+ start_evtid=start_evtid,
+ )
+
+ # loop over input files
+ for first_evtid, file_idx, file_in in zip(
+ finfo.file_start_global_evtids, finfo.file_indices, finfo.file_list
+ ):
+ # get the evtids in the file: may use a lot of memory
+ vertices = lh5.read_as(f"{in_field}/vertices/evtid", file_in, "np")
+
+ for ch_idx, d in enumerate(proc_config["channels"]):
+ msg = f"...running hit tier for {file_in} and {d}"
+ log.info(msg)
+
+ # get local variables
+
+ local_dict = get_locals(
+ proc_config.get("locals", {}),
+ pars_dict=pars.get(d, {}),
+ meta_path=metadata_path,
+ detector=d,
+ reg=reg,
+ )
+
+ is_first_chan = bool(ch_idx == 0)
+ is_first_file = bool(file_idx == 0)
+
+ # flag to overwrite input file
+ delete_input = (is_first_chan & (merge_input_files is False)) | (
+ is_first_chan & is_first_file
+ )
+
+ msg = (
+ f"...begin processing with {file_in} to {file_out} and delete input {delete_input}"
+ )
+ log.debug(msg)
+
+ # number of iterations (used to handle last iteration)
+ it, entries, max_idx = utils.get_iterator(
+ file=file_in, lh5_table=f"{in_field}/{d}", buffer=buffer
+ )
+ buffer_rows = None
+
+ # iterate over the LH5 file
+ for idx, (lh5_obj, _, n_rows) in enumerate(it):
+ msg = f"... processed {idx} chunks out of {max_idx}"
+ log.debug(msg)
+
+ # convert to awkward
+ ak_obj = lh5_obj.view_as("ak")
+
+ # fix for a bug in lh5 iterator
+ if idx == max_idx:
+ ak_obj = ak_obj[:n_rows]
+
+ # handle the buffers
+ obj, buffer_rows, mode = utils._merge_arrays(
+ ak_obj, buffer_rows, idx=idx, max_idx=max_idx, delete_input=delete_input
+ )
+
+ # add global evtid
+ obj = utils.get_global_evtid(first_evtid, obj, vertices)
+
+ # check if the chunk can be skipped, does lack of sorting break this?
+
+ if not utils.get_include_chunk(
+ obj._global_evtid,
+ start_glob_evtid=finfo.first_global_evtid,
+ end_glob_evtid=finfo.last_global_evtid,
+ ):
+ continue
+
+ # select just the correct global evtid objects
+ obj = obj[
+ (obj._global_evtid >= finfo.first_global_evtid)
+ & (obj._global_evtid <= finfo.last_global_evtid)
+ ]
+
+ # rename the fields
+ obj = ak.with_field(
+ obj,
+ obj["evtid"],
+ "_evtid",
+ )
+ obj = ak.without_field(obj, "evtid")
+
+ # convert back to a table, should work
+ data = Table(obj)
+
+ # group steps into hits
+ grouped = step_group(data, proc_config["step_group"])
+
+ # processors
+ for name, info in proc_config["operations"].items():
+ msg = f"... adding column {name}"
+ log.debug(msg)
+
+ col = eval_expression(grouped, info, local_dict=local_dict)
+ grouped.add_field(name, col)
+
+ # remove unwanted columns
+ log.debug("... removing unwanted columns")
+
+ existing_cols = list(grouped.keys())
+ for col in existing_cols:
+ if col not in proc_config["outputs"]:
+ grouped.remove_column(col, delete=True)
+
+ # write lh5 file
+
+ file_out_tmp = (
+ f"{file_out.split('.')[0]}_{file_idx}.lh5"
+ if (merge_input_files is False)
+ else file_out
+ )
+ lh5.write(grouped, f"{d}/{out_field}", file_out_tmp, wo_mode=mode)
diff --git a/src/reboost/hpge/processors.py b/src/reboost/hpge/processors.py
new file mode 100644
index 0000000..d390364
--- /dev/null
+++ b/src/reboost/hpge/processors.py
@@ -0,0 +1,223 @@
+from __future__ import annotations
+
+import awkward as ak
+import legendhpges
+import lgdo
+import numpy as np
+from lgdo import Array, Table, VectorOfVectors
+from numpy.typing import ArrayLike
+
+
+def sort_data(obj: ak.Array, *, time_name: str = "time", evtid_name: str = "_evtid") -> ak.Array:
+ """Sort the data by evtid then time.
+
+ Parameters
+ ----------
+ obj
+ array of records containing fields `time` and `evtid`
+
+ Returns
+ -------
+ sorted awkward array
+ """
+ indices = np.lexsort((obj[time_name], obj[evtid_name]))
+ return obj[indices]
+
+
+def group_by_evtid(data: Table) -> Table:
+ """Simple grouping by evtid.
+
+ Takes the input `stp` :class:`LGOD.Table` from remage and defines groupings of steps (i.e the
+ `cumulative_length` for a vector of vectors). This then defines the output table (also :class:`LGDO.Table`),
+ on which processors can add fields.
+
+ Parameters
+ ----------
+ data
+ LGDO Table which must contain the `evtid` field.
+
+ Returns
+ -------
+ LGDO table of :class:`VectorOfVector` for each field.
+
+ Note
+ ----
+ The input table must be sorted (by `evtid`).
+ """
+
+ # convert to awkward
+ obj_ak = data.view_as("ak")
+
+ # sort input
+ obj_ak = sort_data(obj_ak)
+
+ # extract cumulative lengths
+ counts = ak.run_lengths(obj_ak._evtid)
+ cumulative_length = np.cumsum(counts)
+
+ # build output table
+ out_tbl = Table(size=len(cumulative_length))
+
+ for f in obj_ak.fields:
+ out_tbl.add_field(
+ f, VectorOfVectors(cumulative_length=cumulative_length, flattened_data=obj_ak[f])
+ )
+ return out_tbl
+
+
+def group_by_time(
+ data: Table | ak.Array,
+ window: float = 10,
+ time_name: str = "time",
+ evtid_name: str = "_evtid",
+ fields: list | None = None,
+) -> lgdo.Table:
+ """Grouping of steps by `evtid` and `time`.
+
+ Takes the input `stp` :class:`LGOD.Table` from remage and defines groupings of steps (i.e the
+ `cumulative_length` for a vector of vectors). This then defines the output table (also :class:`LGDO.Table`),
+ on which processors can add fields.
+
+ The windowing is based on defining a new group when the `evtid` changes or when the time increases by `> window`,
+ which is in units of us.
+
+
+ Parameters
+ ----------
+ data
+ LGDO Table or ak.Array which must contain the time_name and evtid_name fields
+ window
+ time window in us used to search for coincident hits
+ time_name
+ name of the timing field
+ evtid_name
+ name of the evtid field
+ fields
+ names of fields to include in the output table, if None includes all
+
+ Returns
+ -------
+ LGDO table of :class:`VectorOfVector` for each field.
+
+ Note
+ ----
+ The input table must be sorted (first by `evtid` then `time`).
+ """
+
+ obj = data.view_as("ak") if isinstance(data, Table) else data
+ obj = sort_data(obj, time_name=time_name, evtid_name=evtid_name)
+
+ # get difference
+ time_diffs = np.diff(obj[time_name])
+ index_diffs = np.diff(obj[evtid_name])
+
+ # index of thhe last element in each run
+ time_change = (time_diffs > window * 1000) & (index_diffs == 0)
+ index_change = index_diffs > 0
+
+ # cumulative length is just the index of changes plus 1
+ cumulative_length = np.array(np.where(time_change | index_change))[0] + 1
+
+ # add the las grouping
+ cumulative_length = np.append(cumulative_length, len(obj[time_name]))
+
+ # build output table
+ out_tbl = Table(size=len(cumulative_length))
+
+ fields = obj.fields if fields is None else fields
+ for f in fields:
+ out_tbl.add_field(
+ f, VectorOfVectors(cumulative_length=cumulative_length, flattened_data=obj[f])
+ )
+
+ return out_tbl
+
+
+def smear_energies(truth_energy: Array, reso: float = 2) -> Array:
+ """Smearing of energies.
+
+ Parameters
+ ----------
+ truth_energy
+ Array of energies to be smeared.
+ reso
+ energy resolution (sigma).
+
+ Returns
+ -------
+ New array after sampling from a Gaussian with mean :math:`energy_i` and sigma `reso` for every element of `truth_array`.
+
+ """
+
+ flat_energy = truth_energy
+ rng = np.random.default_rng()
+
+ return Array(rng.normal(loc=flat_energy, scale=np.ones_like(flat_energy) * reso))
+
+
+def distance_to_surface(
+ positions_x: VectorOfVectors,
+ positions_y: VectorOfVectors,
+ positions_z: VectorOfVectors,
+ hpge: legendhpges.base.HPGe,
+ det_pos: ArrayLike,
+ surface_type: str | None = None,
+ unit: str = "mm",
+) -> Array:
+ """Computes the distance from each step to the detector surface.
+
+ Parameters
+ ----------
+ positions_x
+ Global x positions for each step.
+ positions_y
+ Global y positions for each step.
+ positions_z
+ Global z positions for each step.
+ hpge
+ HPGe object.
+ det_pos
+ position of the detector origin, must be a 3 component array corresponding to `(x,y,z)`.
+ surface_type
+ string of which surface to use, can be `nplus`, `pplus` `passive` or None (in which case the distance to any surface is calculated).
+ unit
+ unit for the hit tier positions table.
+
+ Returns
+ -------
+ VectorOfVectors with the same shape as `positions_x/y/z` of the distance to the surface.
+
+ Note
+ ----
+ `positions_x/positions_y/positions_z` must all have the same shape.
+
+ """
+ factor = np.array([1, 100, 1000])[unit == np.array(["mm", "cm", "m"])][0]
+
+ # compute local positions
+ pos = []
+ sizes = []
+ for idx, pos_tmp in enumerate([positions_x, positions_y, positions_z]):
+ local_pos_tmp = ak.Array(pos_tmp) * factor - det_pos[idx]
+ local_pos_flat_tmp = ak.flatten(local_pos_tmp).to_numpy()
+ pos.append(local_pos_flat_tmp)
+ sizes.append(ak.num(local_pos_tmp, axis=1))
+
+ if not ak.all(sizes[0] == sizes[1]) or not ak.all(sizes[0] == sizes[2]):
+ msg = "all position vector of vector must have the same shape"
+ raise ValueError(msg)
+
+ size = sizes[0]
+ # restructure the positions
+ local_positions = np.vstack(pos).T
+
+ # get indices
+
+ surface_indices = (
+ np.where(np.array(hpge.surfaces) == surface_type) if surface_type is not None else None
+ )
+
+ # distance calc itself
+ distances = hpge.distance_to_surface(local_positions, surface_indices=surface_indices)
+
+ return VectorOfVectors(ak.unflatten(distances, size))
diff --git a/src/reboost/hpge/tcm.py b/src/reboost/hpge/tcm.py
new file mode 100644
index 0000000..99b951c
--- /dev/null
+++ b/src/reboost/hpge/tcm.py
@@ -0,0 +1,113 @@
+from __future__ import annotations
+
+import logging
+import re
+
+import awkward as ak
+from lgdo import Table, lh5
+
+from . import processors
+
+log = logging.getLogger(__name__)
+
+
+def build_tcm(
+ hit_file: str,
+ out_file: str,
+ channels: list[str],
+ time_name: str = "t0",
+ idx_name: str = "global_evtid",
+ time_window_in_us: float = 10,
+) -> None:
+ """Build the (Time Coincidence Map) TCM from the hit tier.
+
+ Parameters
+ ----------
+ hit_file
+ path to hit tier file.
+ out_file
+ output path for tcm.
+ channels
+ list of channel names to include.
+ time_name
+ name of the hit tier field used for time grouping.
+ idx_name
+ name of the hit tier field used for index grouping.
+ time_window_in_us
+ time window used to define the grouping.
+
+ """
+
+ hash_func = r"\d+"
+
+ msg = "start building time-coincidence map"
+ log.info(msg)
+
+ chan_ids = [re.search(hash_func, channel).group() for channel in channels]
+
+ hit_data = []
+ for channel in channels:
+ hit_data.append(
+ lh5.read(f"{channel}/hit", hit_file, field_mask=[idx_name, time_name]).view_as("ak")
+ )
+ tcm = get_tcm_from_ak(
+ hit_data, chan_ids, window=time_window_in_us, time_name=time_name, idx_name=idx_name
+ )
+
+ if tcm is not None:
+ lh5.write(tcm, "tcm", out_file, wo_mode="of")
+
+
+def get_tcm_from_ak(
+ hit_data: list[ak.Array],
+ channels: list[int],
+ *,
+ window: float = 10,
+ time_name: str = "t0",
+ idx_name: str = "global_evtid",
+) -> Table:
+ """Builds a time-coincidence map from a hit of hit data Tables.
+
+ - build an ak.Array of the data merging channels with fields base on "time_name", and "idx_name" and adding a field `rawid` from the channel idx, also add the row (`hit_idx`)
+ - sorts this array by "idx_name" then "time_name" fields
+ - group by "idx_name" and "time_name" based on the window parameter
+
+ Parameters
+ ----------
+ hit_data
+ list of hit tier data for each channel
+ channels
+ list of channel indices
+ window
+ time window for selecting coincidences (in us)
+ time_name
+ name of the field for time information
+ idx_name
+ name of the decay index field
+
+ Returns
+ -------
+ an LGDO.VectorOfVectors containing the time-coincidence map
+ """
+ # build ak_obj for sorting
+ sort_objs = []
+
+ for ch_idx, data_tmp in zip(channels, hit_data):
+ obj_tmp = ak.copy(data_tmp)
+ obj_tmp = obj_tmp[[time_name, idx_name]]
+ hit_idx = ak.local_index(obj_tmp)
+
+ obj_tmp = ak.with_field(obj_tmp, hit_idx, "array_idx")
+
+ obj_tmp["array_id"] = int(ch_idx)
+ sort_objs.append(obj_tmp)
+
+ obj_tot = ak.concatenate(sort_objs)
+
+ return processors.group_by_time(
+ obj_tot,
+ time_name=time_name,
+ evtid_name=idx_name,
+ window=window,
+ fields=["array_id", "array_idx"],
+ )
diff --git a/src/reboost/hpge/utils.py b/src/reboost/hpge/utils.py
new file mode 100644
index 0000000..4cb4265
--- /dev/null
+++ b/src/reboost/hpge/utils.py
@@ -0,0 +1,477 @@
+from __future__ import annotations
+
+import copy
+import importlib
+import itertools
+import json
+import logging
+import re
+from collections import namedtuple
+from contextlib import contextmanager
+from pathlib import Path
+from typing import NamedTuple
+
+import awkward as ak
+import legendhpges
+import numpy as np
+import pyg4ometry
+import yaml
+from lgdo import lh5
+from lgdo.lh5 import LH5Iterator
+from numpy.typing import ArrayLike, NDArray
+
+log = logging.getLogger(__name__)
+
+reg = pyg4ometry.geant4.Registry()
+
+
+class FileInfo(NamedTuple):
+ """NamedTuple storing the information on the input files"""
+
+ file_list: list[str]
+ """list of strings of the selected files."""
+
+ file_indices: list[int]
+ """list of integers of the indices of the files."""
+
+ file_start_global_evtids: list[int]
+ """list of integers of the first global evtid for each file."""
+
+ first_global_evtid: int
+ """first global evtid to process."""
+
+ last_global_evtid: int
+ """Last global evtid to process."""
+
+
+def get_channels_from_groups(names: list | str | None, groupings: dict | None = None) -> list:
+ """Get a list of channels from a list of groups"""
+
+ if names is None:
+ channels_e = []
+ elif isinstance(names, str):
+ channels_e = groupings[names]
+ elif isinstance(names, list):
+ channels_e = list(itertools.chain.from_iterable([groupings[e] for e in names]))
+ else:
+ msg = f"names {names} must be list or str or `None`"
+ raise ValueError(msg)
+
+ return channels_e
+
+
+def get_selected_files(
+ file_list: list[str], table: str, n_evtid: int | None, start_evtid: int
+) -> FileInfo:
+ """Get the files to read based on removing those with global evtid out of the selected range.
+
+ - expands wildcards,
+ - extracts number of evtid per file,
+ - removes files outside of the range `low_evtid` to `high_evtid`.
+
+ Parameters
+ ----------
+ file_list
+ list of files to process (can include wildcards)
+ table
+ lh5 input field
+ n_evtid
+ number of simulation events to process.
+ high_global_evtid
+ start_evtid: first (global) simulation event to process.
+
+ Returns
+ -------
+ `FileInfo` object with information on the files.
+ """
+ # expand wildcards
+ expanded_list_file_in = get_file_list(path=file_list)
+
+ n_sim = get_num_simulated(expanded_list_file_in, table=table)
+
+ # first global index of each file
+ cum_nsim = np.concatenate([[0], np.cumsum(n_sim)])
+
+ low_global_evtid, high_global_evtid = get_global_evtid_range(
+ start_evtid=start_evtid, n_evtid=n_evtid, n_tot_sim=cum_nsim[-1]
+ )
+ file_indices = np.array(
+ get_files_to_read(
+ cum_nsim, start_glob_evtid=low_global_evtid, end_glob_evtid=high_global_evtid
+ )
+ )
+
+ # select just the necessary files
+ file_list_sel = np.array(expanded_list_file_in)[file_indices]
+ start_evtid_sel = cum_nsim[file_indices]
+
+ return FileInfo(
+ file_list_sel, file_indices, start_evtid_sel, low_global_evtid, high_global_evtid
+ )
+
+
+def get_num_simulated(file_list: list, table: str = "hit") -> int:
+ """Loop over a list of files and extract the number of simulated events.
+
+ Based on the size of the `vertices` tables.
+
+ Parameters
+ ----------
+ file_list
+ list of input files (each must contain the vertices table)
+ table
+ name of the lh5 input table.
+ """
+ n = []
+ for file in file_list:
+ it = lh5.LH5Iterator(file, f"{table}/vertices", buffer_len=int(5e6))
+ n.append(it._get_file_cumlen(0))
+
+ msg = f"files contain {n} events"
+ log.info(msg)
+ return n
+
+
+def get_num_evtid_hit_tier(hit_file: str, channels: list[str], idx_name: str) -> int:
+ """Get the number of evtid in the hit file.
+
+ Parameters
+ ----------
+ hit_file
+ path to the hit file
+ channels
+ list of channel names
+ idx_name
+ name of the index field
+
+ Returns
+ -------
+ largest evtid in the files.
+
+ Note
+ ----
+ To avoid reading the full file into memory we assume the hit tier files are sorted by "idx_name". This should always be the case.
+ """
+
+ n_max = 0
+ for channel in channels:
+ # use the iterator to get the file size
+ it = lh5.LH5Iterator(hit_file, f"{channel}/hit/", buffer_len=int(5e6))
+ entries = it._get_file_cumentries(0)
+
+ ob_ak = lh5.read(f"{channel}/hit/", hit_file, idx=[entries - 1]).view_as("ak")
+
+ max_evtid = ob_ak[idx_name][0]
+
+ n_max = max_evtid if (max_evtid > n_max) else n_max
+
+ return n_max
+
+
+def get_file_list(path: str | list[str]) -> NDArray:
+ """Get list of files to read.
+
+ Parameters
+ ----------
+ path
+ either a string or a list of strings containing files paths to process. May contain wildcards which are expanded.
+
+ Returns
+ -------
+ sorted array of files, after expanding wildcards, removing duplicates and sorting.
+
+ """
+
+ if isinstance(path, str):
+ path = [path]
+
+ path_out_list = []
+
+ for ptmp in path:
+ ptmp_path = Path(ptmp)
+ dir_tmp = ptmp_path.parent
+ pattern_tmp = ptmp_path.name
+
+ path_out_list.extend(dir_tmp.glob(pattern_tmp))
+ path_out_list_str = [str(ptmp) for ptmp in path_out_list]
+ return np.array(np.sort(np.unique(path_out_list_str)))
+
+
+def get_global_evtid_range(
+ start_evtid: int, n_evtid: int | None, n_tot_sim: int
+) -> tuple[int, int]:
+ """Get the global evtid range"""
+
+ # some checks
+ if (n_evtid is not None) and (start_evtid + n_evtid > n_tot_sim):
+ msg = "Index are out of the range of the simulation."
+ raise ValueError(msg)
+
+ start_glob_index = start_evtid
+ end_glob_index = start_evtid + n_evtid - 1 if (n_evtid is not None) else n_tot_sim - 1
+ return start_glob_index, end_glob_index
+
+
+def get_files_to_read(cum_n_sim: ArrayLike, start_glob_evtid: int, end_glob_evtid: int) -> NDArray:
+ """Get the index of files to read based on the number of evtid to read and the start evtid.
+
+ Parameters
+ ----------
+ cum_n_sim
+ cumulative list of the number of evtid per file.
+ start_glob_evtid
+ first global evtid to include.
+ end_glob_evtid
+ last global evtid to process.
+
+ Returns
+ -------
+ array of the indices of files to process.
+ """
+ # find which files to read
+
+ file_indices = []
+ cum_n_sim = np.array(cum_n_sim)
+
+ for i, (low, high) in enumerate(zip(cum_n_sim, cum_n_sim[1:] - 1)):
+ if (high >= start_glob_evtid) & (low <= end_glob_evtid):
+ file_indices.append(i)
+ return np.array(file_indices)
+
+
+def get_global_evtid(first_evtid: int, obj: ak.Array, vertices: ArrayLike) -> ak.Array:
+ """Adds a global evtid field to the array.
+
+ The global evtid is the index of the decay in the order the files are read in. This is obtained
+ from the first_evtid, defined as the number of decays in the previous files, plus the row of the
+ vertices array corresponding to this evtid. In this way we obtain an index both sorted and unique
+ over all the input files, this enables easy selection of some chunks.
+
+ Parameters
+ ----------
+ first_evtid
+ number of decays in the previous files
+ obj
+ awkward array of the input data
+ vertices
+ array of the vertex evtids
+
+ Returns
+ -------
+ the obj with the global_evtid field added
+ """
+ vertices = np.array(vertices)
+ indices = np.searchsorted(vertices, np.array(obj.evtid))
+
+ if np.any(vertices[indices] != np.array(obj.evtid)):
+ msg = "Some of the evtids in the obj do not correspond to rows in the input"
+ raise ValueError(msg)
+
+ return ak.with_field(obj, first_evtid + indices, "_global_evtid")
+
+
+def get_include_chunk(
+ global_evtid: ak.Array,
+ start_glob_evtid: int,
+ end_glob_evtid: int,
+) -> bool:
+ """Check if a chunk can be skipped based on evtid range.
+
+ Parameters
+ ----------
+ global_evtid
+ awkward array of the (local) evtid in the chunk
+ start_glob_evtid
+ first global evtid to include.
+ end_glob_evtid
+ last global evtid to process.
+ Returns
+ -------
+ boolean flag of whether to include in the chunk.
+
+ """
+ low = np.min(np.array(global_evtid))
+ high = np.max(np.array(global_evtid))
+ return (high >= start_glob_evtid) & (low <= end_glob_evtid)
+
+
+def get_hpge(meta_path: str | None, pars: NamedTuple, detector: str) -> legendhpges.HPGe:
+ """Extract the :class:`legendhpges.HPGe` object from metadata.
+
+ Parameters
+ ----------
+ meta_path
+ path to the folder with the `diodes` metadata.
+ pars
+ named tuple of parameters.
+ detector
+ remage output name for the detector
+
+ Returns
+ -------
+ hpge
+ the `legendhpges` object for the detector.
+ """
+ with debug_logging(logging.CRITICAL):
+ reg = pyg4ometry.geant4.Registry()
+ if meta_path is not None:
+ meta_name = pars.meta_name if ("meta_name" in pars._fields) else f"{detector}.json"
+ meta_dict = Path(meta_path) / Path(meta_name)
+ return legendhpges.make_hpge(meta_dict, registry=reg)
+ return None
+
+
+def get_phy_vol(
+ reg: pyg4ometry.geant4.Registry | None, pars: NamedTuple, detector: str
+) -> pyg4ometry.geant4.PhysicalVolume:
+ """Extract the :class:`pyg4ometry.geant4.PhysicalVolume` object from GDML
+
+ Parameters
+ ----------
+ reg
+ Geant4 registry from GDML
+ pars
+ named tuple of parameters.
+ detector
+ remage output name for the detector.
+
+ Returns
+ -------
+ phy_vol
+ the `pyg4ometry.geant4.PhysicalVolume` object for the detector
+ """
+
+ with debug_logging(logging.CRITICAL):
+ if reg is not None:
+ phy_name = pars.phy_vol_name if ("phy_vol_name" in pars._fields) else f"{detector}"
+ return reg.physicalVolumeDict[phy_name]
+
+ return None
+
+
+@contextmanager
+def debug_logging(level):
+ logger = logging.getLogger("root")
+ old_level = logger.getEffectiveLevel()
+ logger.setLevel(level)
+ try:
+ yield
+ finally:
+ logger.setLevel(old_level)
+
+
+def dict2tuple(dictionary: dict) -> namedtuple:
+ return namedtuple("parameters", dictionary.keys())(**dictionary)
+
+
+def get_function_string(expr: str) -> tuple[str, dict]:
+ """Get a function call to evaluate."""
+
+ args_str = re.search(r"\((.*)\)$", expr.strip()).group(1)
+
+ # get module and function names
+ func_call = expr.strip().split("(")[0]
+ subpackage, func = func_call.rsplit(".", 1)
+ package = subpackage.split(".")[0]
+ importlib.import_module(subpackage, package=__package__)
+
+ # declare imported package as globals (see eval() call later)
+ globs = {
+ package: importlib.import_module(package),
+ }
+
+ call_str = f"{func_call}({args_str})"
+
+ return call_str, globs
+
+
+def _merge_arrays(
+ ak_obj: ak.Array,
+ buffer_rows: ak.Array | None,
+ idx: int,
+ max_idx: int,
+ delete_input: bool = False,
+) -> tuple[ak.Array, ak.Array, str]:
+ """Merge awkward arrays with a buffer and simultaneously remove the last rows.
+
+ This function is used since when iterating through the rows of an Array it will
+ sometimes happen to split some events. This concatenates rows in the buffer onto the start of the data.
+ This also removes the last rows of each chunk and saves them into a buffer.
+
+ Parameters
+ ----------
+ obj
+ array of data
+ buffer_rows
+ buffer to concatenate with.
+ idx
+ integer index used control the behaviour for the first and last chunk.
+ max_idx
+ largest index.
+ delete_input
+ flag to delete the input files.
+
+ Returns
+ -------
+ (obj,buffer,mode) tuple of the data, the buffer for the next chunk and the IO mode for file writing.
+ """
+ counts = ak.run_lengths(ak_obj.evtid)
+ rows = ak.num(ak_obj, axis=-1)
+ end_rows = counts[-1]
+
+ if max_idx == 0:
+ mode = "of" if (delete_input) else "append"
+ obj = ak_obj
+ elif idx == 0:
+ mode = "of" if (delete_input) else "append"
+ obj = ak_obj[0 : rows - end_rows]
+ buffer_rows = copy.deepcopy(ak_obj[rows - end_rows :])
+ elif idx != max_idx:
+ mode = "append"
+ obj = ak.concatenate((buffer_rows, ak_obj[0 : rows - end_rows]))
+ buffer_rows = copy.deepcopy(ak_obj[rows - end_rows :])
+ else:
+ mode = "append"
+ obj = ak.concatenate((buffer_rows, ak_obj))
+ buffer_rows = None
+
+ return obj, buffer_rows, mode
+
+
+def get_iterator(file: str, buffer: int, lh5_table: str, **kwargs) -> tuple[LH5Iterator, int, int]:
+ """Get information on the iterator (number of index and entries)."""
+
+ it = LH5Iterator(file, lh5_table, buffer_len=buffer, **kwargs)
+ entries = it._get_file_cumentries(0)
+
+ # number of blocks is ceil of entries/buffer,
+ # shift by 1 since idx starts at 0
+ max_idx = int(np.ceil(entries / buffer)) - 1
+
+ return it, entries, max_idx
+
+
+__file_extensions__ = {"json": [".json"], "yaml": [".yaml", ".yml"]}
+
+
+def load_dict(fname: str, ftype: str | None = None) -> dict:
+ """Load a text file as a Python dict."""
+ fname = Path(fname)
+
+ # determine file type from extension
+ if ftype is None:
+ for _ftype, exts in __file_extensions__.items():
+ if fname.suffix in exts:
+ ftype = _ftype
+
+ msg = f"loading {ftype} dict from: {fname}"
+ log.debug(msg)
+
+ with fname.open() as f:
+ if ftype == "json":
+ return json.load(f)
+ if ftype == "yaml":
+ return yaml.safe_load(f)
+
+ msg = f"unsupported file format {ftype}"
+ raise NotImplementedError(msg)
diff --git a/tests/configs/geom.gdml b/tests/configs/geom.gdml
new file mode 100644
index 0000000..62ebc8d
--- /dev/null
+++ b/tests/configs/geom.gdml
@@ -0,0 +1,132 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/tests/test_hpge_distance_to_surface.py b/tests/test_hpge_distance_to_surface.py
new file mode 100644
index 0000000..1114903
--- /dev/null
+++ b/tests/test_hpge_distance_to_surface.py
@@ -0,0 +1,41 @@
+from __future__ import annotations
+
+import awkward as ak
+import pytest
+from legendhpges import make_hpge
+from legendtestdata import LegendTestData
+from lgdo import types
+
+from reboost.hpge.processors import distance_to_surface
+
+
+@pytest.fixture(scope="session")
+def test_data_configs():
+ ldata = LegendTestData()
+ ldata.checkout("5f9b368")
+ return ldata.get_path("legend/metadata/hardware/detectors/germanium/diodes")
+
+
+def test_distance_to_surface(test_data_configs):
+ gedet = make_hpge(test_data_configs + "/V99000A.json")
+ dist = [100, 0, 0]
+
+ pos = ak.Array(
+ {
+ "xloc": [[0, 100, 200], [100], [700, 500, 200]],
+ "yloc": [[100, 0, 0], [200], [100, 300, 200]],
+ "zloc": [[700, 10, 20], [100], [300, 100, 0]],
+ }
+ )
+
+ # check just the shape
+ assert ak.all(
+ ak.num(distance_to_surface(pos.xloc, pos.yloc, pos.zloc, gedet, dist, None), axis=1)
+ == [3, 1, 3]
+ )
+
+ # check it can be written
+
+ assert isinstance(
+ distance_to_surface(pos.xloc, pos.yloc, pos.zloc, gedet, dist, None), types.LGDO
+ )
diff --git a/tests/test_hpge_hit.py b/tests/test_hpge_hit.py
new file mode 100644
index 0000000..806c6da
--- /dev/null
+++ b/tests/test_hpge_hit.py
@@ -0,0 +1,396 @@
+from __future__ import annotations
+
+import pathlib
+
+import awkward as ak
+import numpy as np
+import pyg4ometry
+import pytest
+from legendhpges.base import HPGe
+from legendtestdata import LegendTestData
+from lgdo import Table, lh5
+
+from reboost.hpge import hit
+
+configs = pathlib.Path(__file__).parent.resolve() / pathlib.Path("configs")
+
+
+@pytest.fixture(scope="session")
+def test_data_configs():
+ ldata = LegendTestData()
+ ldata.checkout("5f9b368")
+ return ldata.get_path("legend/metadata/hardware/detectors/germanium/diodes")
+
+
+def test_get_locals(test_data_configs):
+ local_info = {"hpge": "reboost.hpge.utils.get_hpge(meta_path=meta,pars=pars,detector=detector)"}
+
+ local_dict = hit.get_locals(
+ local_info,
+ pars_dict={"meta_name": "V99000A.json"},
+ detector="det001",
+ meta_path=test_data_configs,
+ )
+
+ assert isinstance(local_dict["hpge"], HPGe)
+
+
+def test_eval():
+ in_arr = ak.Array(
+ {
+ "evtid": [[1, 1, 1], [2, 2], [10, 10], [11], [12, 12, 12]],
+ "time": [[0, 0, 0], [0, 0], [0, 0], [0], [0, 0, 0]],
+ "edep": [[100, 150, 300], [0, 2000], [10, 20], [19], [100, 200, 300]],
+ }
+ )
+ tab = Table(in_arr)
+ basic_eval = {"mode": "eval", "expression": "ak.sum(hit.edep,axis=-1)"}
+
+ assert ak.all(
+ hit.eval_expression(tab, basic_eval, {}).view_as("ak") == [550, 2000, 30, 19, 600]
+ )
+
+ tab.add_field("e_sum", hit.eval_expression(tab, basic_eval, {}))
+ bad_eval = {"mode": "sum", "expression": "ak.sum(hit.edep,axis=-1)"}
+
+ with pytest.raises(ValueError):
+ hit.eval_expression(tab, bad_eval, {})
+
+ func_eval = {
+ "mode": "function",
+ "expression": "reboost.hpge.processors.smear_energies(hit.e_sum,reso=pars.reso)",
+ }
+ pars = {"reso": 2}
+ # test get locals
+ local_dict = hit.get_locals({}, pars_dict=pars)
+ assert np.size(hit.eval_expression(tab, func_eval, local_dict).view_as("np")) == 5
+
+
+def test_eval_with_hpge(test_data_configs):
+ local_info = {"hpge": "reboost.hpge.utils.get_hpge(meta_path=meta,pars=pars,detector=detector)"}
+ local_dict = hit.get_locals(
+ local_info,
+ pars_dict={"meta_name": "V99000A.json"},
+ detector="det001",
+ meta_path=test_data_configs,
+ )
+
+ pos = ak.Array(
+ {
+ "xloc": [[0, 100, 200], [100], [700, 500, 200]],
+ "yloc": [[100, 0, 0], [200], [100, 300, 200]],
+ "zloc": [[700, 10, 20], [100], [300, 100, 0]],
+ }
+ )
+ tab = Table(pos)
+
+ func_eval = {
+ "mode": "function",
+ "expression": "reboost.hpge.processors.distance_to_surface(hit.xloc, hit.yloc, hit.zloc, hpge, [0,0,0], None)",
+ }
+
+ assert ak.all(ak.num(hit.eval_expression(tab, func_eval, local_dict), axis=1) == [3, 1, 3])
+
+
+def test_eval_with_hpge_and_phy_vol(test_data_configs):
+ gdml_path = configs / pathlib.Path("geom.gdml")
+
+ gdml = pyg4ometry.gdml.Reader(gdml_path).getRegistry()
+
+ local_info = {
+ "hpge": "reboost.hpge.utils.get_hpge(meta_path=meta,pars=pars,detector=detector)",
+ "phy_vol": "reboost.hpge.utils.get_phy_vol(reg=reg,pars=pars,detector=detector)",
+ }
+ local_dict = hit.get_locals(
+ local_info,
+ pars_dict={"meta_name": "V99000A.json", "phy_vol_name": "det_phy_1"},
+ detector="det001",
+ meta_path=test_data_configs,
+ reg=gdml,
+ )
+
+ pos = ak.Array(
+ {
+ "xloc": [[0, 100, 200], [100], [700, 500, 200]],
+ "yloc": [[100, 0, 0], [200], [100, 300, 200]],
+ "zloc": [[700, 10, 20], [100], [300, 100, 0]],
+ }
+ )
+ tab = Table(pos)
+
+ func_eval = {
+ "mode": "function",
+ "expression": "reboost.hpge.processors.distance_to_surface(hit.xloc, hit.yloc, hit.zloc, hpge, phy_vol.position.eval(), None)",
+ }
+
+ assert ak.all(ak.num(hit.eval_expression(tab, func_eval, local_dict), axis=1) == [3, 1, 3])
+
+
+# test the full processing chain
+@pytest.fixture
+def test_reboost_input_file(tmp_path):
+ # make it large enough to be multiple groups
+ rng = np.random.default_rng()
+ evtid_1 = np.sort(rng.integers(int(1e4), size=(int(1e5))))
+ time_1 = rng.uniform(low=0, high=1, size=(int(1e5)))
+ edep_1 = rng.uniform(low=0, high=1000, size=(int(1e5)))
+ pos_x_1 = rng.uniform(low=-50, high=50, size=(int(1e5)))
+ pos_y_1 = rng.uniform(low=-50, high=50, size=(int(1e5)))
+ pos_z_1 = rng.uniform(low=-50, high=50, size=(int(1e5)))
+
+ # make it not divide by the buffer len
+ evtid_2 = np.sort(rng.integers(int(1e4), size=(3004)))
+ time_2 = rng.uniform(low=0, high=1, size=(3004))
+ edep_2 = rng.uniform(low=0, high=1000, size=(3004))
+ pos_x_2 = rng.uniform(low=-50, high=50, size=(3004))
+ pos_y_2 = rng.uniform(low=-50, high=50, size=(3004))
+ pos_z_2 = rng.uniform(low=-50, high=50, size=(3004))
+
+ vertices_1 = ak.Array({"evtid": np.arange(int(1e4))})
+ vertices_2 = ak.Array({"evtid": np.arange(int(1e4))})
+
+ arr_1 = ak.Array(
+ {
+ "evtid": evtid_1,
+ "time": time_1,
+ "edep": edep_1,
+ "xloc": pos_x_1,
+ "yloc": pos_y_1,
+ "zloc": pos_z_1,
+ }
+ )
+ arr_2 = ak.Array(
+ {
+ "evtid": evtid_2,
+ "time": time_2,
+ "edep": edep_2,
+ "xloc": pos_x_2,
+ "yloc": pos_y_2,
+ "zloc": pos_z_2,
+ }
+ )
+
+ lh5.write(Table(vertices_1), "hit/vertices", tmp_path / "file1.lh5", wo_mode="of")
+ lh5.write(Table(vertices_2), "hit/vertices", tmp_path / "file2.lh5", wo_mode="of")
+
+ lh5.write(Table(arr_1), "hit/det001", tmp_path / "file1.lh5", wo_mode="append")
+ lh5.write(Table(arr_2), "hit/det001", tmp_path / "file2.lh5", wo_mode="append")
+
+ return tmp_path
+
+
+def test_build_hit(test_reboost_input_file):
+ # first just one file no pars
+
+ proc_config = {
+ "channels": [
+ "det001",
+ ],
+ "outputs": ["t0", "_evtid"],
+ "step_group": {
+ "description": "group steps by time and evtid.",
+ "expression": "reboost.hpge.processors.group_by_time(stp,window=10)",
+ },
+ "operations": {
+ "t0": {
+ "description": "first time in the hit.",
+ "mode": "eval",
+ "expression": "ak.fill_none(ak.firsts(hit.time,axis=-1),np.nan)",
+ },
+ "truth_energy_sum": {
+ "description": "truth summed energy in the hit.",
+ "mode": "eval",
+ "expression": "ak.sum(hit.edep,axis=-1)",
+ },
+ },
+ }
+
+ # build hit for file 1 and 2 separately and then also merging them
+ for output_file, input_file in zip(
+ ["out_1.lh5", "out_2.lh5", "out_merge.lh5"], ["file1.lh5", "file2.lh5", "file*.lh5"]
+ ):
+ hit.build_hit(
+ str(test_reboost_input_file / output_file),
+ [str(test_reboost_input_file / input_file)],
+ in_field="hit",
+ out_field="hit",
+ proc_config=proc_config,
+ pars={},
+ buffer=10000,
+ )
+
+ tab_1 = lh5.read("det001/hit", str(test_reboost_input_file / "out_1.lh5")).view_as("ak")
+ tab_2 = lh5.read("det001/hit", str(test_reboost_input_file / "out_2.lh5")).view_as("ak")
+ tab_merge = lh5.read("det001/hit", str(test_reboost_input_file / "out_merge.lh5")).view_as("ak")
+
+ # check lengths
+ assert len(ak.flatten(tab_1._evtid, axis=-1)) == int(1e5)
+ assert len(ak.flatten(tab_2._evtid, axis=-1)) == 3004
+ assert len(ak.flatten(tab_merge._evtid, axis=-1)) == 3004 + int(1e5)
+
+ # one call but write both files
+ hit.build_hit(
+ str(test_reboost_input_file / "out.lh5"),
+ [str(test_reboost_input_file / "file*.lh5")],
+ in_field="hit",
+ out_field="hit",
+ proc_config=proc_config,
+ pars={},
+ buffer=10000,
+ merge_input_files=False,
+ )
+
+ tab_1 = lh5.read("det001/hit", str(test_reboost_input_file / "out_0.lh5")).view_as("ak")
+ tab_2 = lh5.read("det001/hit", str(test_reboost_input_file / "out_1.lh5")).view_as("ak")
+
+ # check lengths
+ assert len(ak.flatten(tab_1._evtid, axis=-1)) == int(1e5)
+ assert len(ak.flatten(tab_2._evtid, axis=-1)) == 3004
+
+ # test with a smaller buffer
+ hit.build_hit(
+ str(test_reboost_input_file / "out_small_buffer.lh5"),
+ [str(test_reboost_input_file / "file*.lh5")],
+ in_field="hit",
+ out_field="hit",
+ proc_config=proc_config,
+ pars={},
+ buffer=1000,
+ )
+
+ tab_small = lh5.read(
+ "det001/hit", str(test_reboost_input_file / "out_small_buffer.lh5")
+ ).view_as("ak")
+
+ # buffer does not affect results
+ assert ak.all(ak.flatten(tab_merge._evtid) == ak.flatten(tab_small._evtid))
+
+
+def test_build_hit_some_row(test_reboost_input_file):
+ proc_config = {
+ "channels": [
+ "det001",
+ ],
+ "outputs": ["t0", "_evtid"],
+ "step_group": {
+ "description": "group steps by time and evtid.",
+ "expression": "reboost.hpge.processors.group_by_time(stp,window=10)",
+ },
+ "operations": {
+ "t0": {
+ "description": "first time in the hit.",
+ "mode": "eval",
+ "expression": "ak.fill_none(ak.firsts(hit.time,axis=-1),np.nan)",
+ },
+ "truth_energy_sum": {
+ "description": "truth summed energy in the hit.",
+ "mode": "eval",
+ "expression": "ak.sum(hit.edep,axis=-1)",
+ },
+ },
+ }
+
+ # test asking to read too many rows
+ with pytest.raises(ValueError):
+ hit.build_hit(
+ str(test_reboost_input_file / "out.lh5"),
+ [str(test_reboost_input_file / "file1.lh5")],
+ n_evtid=int(1e7),
+ in_field="hit",
+ out_field="hit",
+ proc_config=proc_config,
+ pars={},
+ buffer=10000,
+ )
+
+ # test reading the data in two goes gives the same result
+ for n_ev, s_ev, out in zip(
+ [int(1e3), int(1e4 - 1e3), int(1e4), int(1e4)],
+ [0, int(1e3), 0, 100],
+ ["out_some_rows.lh5", "out_rest_rows.lh5", "out_all_file_one.lh5", "out_mix.lh5"],
+ ):
+ # test read only some events
+ hit.build_hit(
+ str(test_reboost_input_file / out),
+ [
+ str(test_reboost_input_file / "file1.lh5"),
+ str(test_reboost_input_file / "file2.lh5"),
+ ],
+ n_evtid=n_ev,
+ start_evtid=s_ev,
+ in_field="hit",
+ out_field="hit",
+ proc_config=proc_config,
+ pars={},
+ buffer=100000,
+ )
+
+ tab_some = lh5.read("det001/hit", str(test_reboost_input_file / "out_some_rows.lh5")).view_as(
+ "ak"
+ )
+ tab_rest = lh5.read("det001/hit", str(test_reboost_input_file / "out_rest_rows.lh5")).view_as(
+ "ak"
+ )
+
+ tab_1 = lh5.read("det001/hit", str(test_reboost_input_file / "out_all_file_one.lh5")).view_as(
+ "ak"
+ )
+
+ tab_merge = ak.concatenate((tab_some, tab_rest))
+ assert ak.all(ak.all(tab_merge._evtid == tab_1._evtid, axis=-1))
+
+
+def test_build_hit_with_locals(test_reboost_input_file, test_data_configs):
+ proc_config = {
+ "channels": [
+ "det001",
+ ],
+ "outputs": ["t0", "_evtid", "distance_to_surface"],
+ "step_group": {
+ "description": "group steps by time and evtid.",
+ "expression": "reboost.hpge.processors.group_by_time(stp,window=10)",
+ },
+ "locals": {
+ "hpge": "reboost.hpge.utils.get_hpge(meta_path=meta,pars=pars,detector=detector)",
+ "phy_vol": "reboost.hpge.utils.get_phy_vol(reg=reg,pars=pars,detector=detector)",
+ },
+ "operations": {
+ "t0": {
+ "description": "first time in the hit.",
+ "mode": "eval",
+ "expression": "ak.fill_none(ak.firsts(hit.time,axis=-1),np.nan)",
+ },
+ "truth_energy_sum": {
+ "description": "truth summed energy in the hit.",
+ "mode": "eval",
+ "expression": "ak.sum(hit.edep,axis=-1)",
+ },
+ "smear_energy_sum": {
+ "description": "summed energy after convolution with energy response.",
+ "mode": "function",
+ "expression": "reboost.hpge.processors.smear_energies(hit.truth_energy_sum,reso=pars.reso)",
+ },
+ "distance_to_surface": {
+ "description": "distance to the nplus surface",
+ "mode": "function",
+ "expression": "reboost.hpge.processors.distance_to_surface(hit.xloc, hit.yloc, hit.zloc, hpge, phy_vol.position.eval(), None)",
+ },
+ },
+ }
+ gdml_path = configs / pathlib.Path("geom.gdml")
+ meta_path = test_data_configs
+
+ # complete check on the processing chain including parameters / local variables
+
+ hit.build_hit(
+ str(test_reboost_input_file / "out.lh5"),
+ [str(test_reboost_input_file / "file*.lh5")],
+ in_field="hit",
+ out_field="hit",
+ proc_config=proc_config,
+ pars={"det001": {"reso": 1, "meta_name": "V99000A.json", "phy_vol_name": "det_phy_1"}},
+ buffer=100000,
+ merge_input_files=False,
+ metadata_path=meta_path,
+ gdml=gdml_path,
+ )
diff --git a/tests/test_hpge_processors.py b/tests/test_hpge_processors.py
new file mode 100644
index 0000000..167912f
--- /dev/null
+++ b/tests/test_hpge_processors.py
@@ -0,0 +1,12 @@
+from __future__ import annotations
+
+import numpy as np
+
+from reboost.hpge import processors
+
+
+def test_smear():
+ truth = np.array([1, 2, 3, 4, 5])
+ smeared = processors.smear_energies(truth, reso=0.01)
+
+ assert np.size(smeared) == 5
diff --git a/tests/test_hpge_step_group.py b/tests/test_hpge_step_group.py
new file mode 100644
index 0000000..69141cd
--- /dev/null
+++ b/tests/test_hpge_step_group.py
@@ -0,0 +1,89 @@
+from __future__ import annotations
+
+import awkward as ak
+import numpy as np
+from lgdo import Table
+
+from reboost.hpge import hit, processors
+
+
+def test_evtid_group():
+ in_arr_evtid = ak.Array(
+ {"_evtid": [1, 1, 1, 2, 2, 10, 10, 11, 12, 12, 12], "time": np.zeros(11)}
+ )
+
+ in_tab = Table(in_arr_evtid)
+
+ out = processors.group_by_evtid(in_tab)
+ out_ak = out.view_as("ak")
+ assert ak.all(out_ak._evtid == [[1, 1, 1], [2, 2], [10, 10], [11], [12, 12, 12]])
+ assert ak.all(out_ak.time == [[0, 0, 0], [0, 0], [0, 0], [0], [0, 0, 0]])
+
+ # test the eval in build hit also
+
+ out_eval = hit.step_group(
+ in_tab,
+ {
+ "description": "group steps by time and evtid.",
+ "expression": "reboost.hpge.processors.group_by_evtid(stp)",
+ },
+ )
+
+ out_eval_ak = out_eval.view_as("ak")
+
+ assert ak.all(out_ak._evtid == out_eval_ak._evtid)
+ assert ak.all(out_ak.time == out_eval_ak.time)
+
+
+def test_time_group():
+ # time units are ns
+ in_arr_evtid = ak.Array(
+ {
+ "_evtid": [1, 1, 1, 2, 2, 2, 2, 2, 11, 12, 12, 12, 15, 15, 15, 15, 15],
+ "time": [
+ 0,
+ -2000,
+ 3000,
+ 0,
+ 100,
+ 1200,
+ 17000,
+ 17010,
+ 0,
+ 0,
+ 0,
+ -5000,
+ 150,
+ 151,
+ 152,
+ 3000,
+ 3100,
+ ],
+ }
+ )
+
+ in_tab = Table(in_arr_evtid)
+
+ # 1us =1000ns
+ out = processors.group_by_time(in_tab, window=1)
+ out_ak = out.view_as("ak")
+ assert ak.all(
+ out_ak._evtid
+ == [[1], [1], [1], [2, 2], [2], [2, 2], [11], [12], [12, 12], [15, 15, 15], [15, 15]]
+ )
+ assert ak.all(
+ out_ak.time
+ == [
+ [-2000],
+ [0],
+ [3000],
+ [0, 100],
+ [1200],
+ [17000, 17010],
+ [0],
+ [-5000],
+ [0, 0],
+ [150, 151, 152],
+ [3000, 3100],
+ ]
+ )
diff --git a/tests/test_hpge_utils.py b/tests/test_hpge_utils.py
new file mode 100644
index 0000000..cd1f2fd
--- /dev/null
+++ b/tests/test_hpge_utils.py
@@ -0,0 +1,297 @@
+from __future__ import annotations
+
+import json
+import pathlib
+
+import awkward as ak
+import numpy as np
+import pyg4ometry
+import pytest
+import yaml
+from legendhpges.base import HPGe
+from legendtestdata import LegendTestData
+from lgdo import Array, Table, lh5
+
+from reboost.hpge.utils import (
+ _merge_arrays,
+ dict2tuple,
+ get_file_list,
+ get_files_to_read,
+ get_global_evtid,
+ get_global_evtid_range,
+ get_hpge,
+ get_include_chunk,
+ get_num_evtid_hit_tier,
+ get_num_simulated,
+ get_phy_vol,
+ load_dict,
+)
+
+configs = pathlib.Path(__file__).parent.resolve() / pathlib.Path("configs")
+
+
+def test_merge():
+ ak_obj = ak.Array({"evtid": [1, 1, 1, 1, 2, 2, 3], "edep": [100, 50, 1000, 20, 100, 200, 10]})
+ bufer_rows = ak.Array({"evtid": [1, 1], "edep": [60, 50]})
+
+ # should only remove the last element
+ merged_idx_0, buffer_0, mode = _merge_arrays(ak_obj, None, 0, 100, True)
+
+ assert ak.all(merged_idx_0.evtid == [1, 1, 1, 1, 2, 2])
+ assert ak.all(merged_idx_0.edep == [100, 50, 1000, 20, 100, 200])
+
+ assert ak.all(buffer_0.evtid == [3])
+ assert ak.all(buffer_0.edep == [10])
+
+ # delete input file
+ assert mode == "of"
+
+ # if delete input is false it should be appended
+ _, _, mode = _merge_arrays(ak_obj, None, 0, 100, False)
+ assert mode == "append"
+
+ # now if idx isn't 0 or the max_idx should add the buffer and remove the end
+
+ merged_idx, buffer, mode = _merge_arrays(ak_obj, bufer_rows, 2, 100, True)
+
+ assert ak.all(merged_idx.evtid == [1, 1, 1, 1, 1, 1, 2, 2])
+ assert ak.all(merged_idx.edep == [60, 50, 100, 50, 1000, 20, 100, 200])
+
+ assert ak.all(buffer.evtid == [3])
+ assert ak.all(buffer.edep == [10])
+
+ assert mode == "append"
+
+ # now for the final index just adds the buffer
+
+ merged_idx_end, buffer_end, mode = _merge_arrays(ak_obj, bufer_rows, 100, 100, True)
+
+ assert ak.all(merged_idx_end.evtid == [1, 1, 1, 1, 1, 1, 2, 2, 3])
+ assert ak.all(merged_idx_end.edep == [60, 50, 100, 50, 1000, 20, 100, 200, 10])
+
+ assert buffer_end is None
+
+
+@pytest.fixture
+def file_fixture(tmp_path):
+ # Create a simple YAML file
+ data = {"det": 1}
+ yaml_file = tmp_path / "data.yaml"
+ with pathlib.Path.open(yaml_file, "w") as yf:
+ yaml.dump(data, yf)
+
+ json_file = tmp_path / "data.json"
+ with pathlib.Path.open(json_file, "w") as jf:
+ json.dump(data, jf)
+
+ # Create a simple TXT file
+ txt_file = tmp_path / "data.txt"
+ with pathlib.Path.open(txt_file, "w") as tf:
+ tf.write("Some text.\n")
+
+ # Return paths for the test functions
+ return {"yaml_file": yaml_file, "json_file": json_file, "txt_file": txt_file}
+
+
+def test_read(file_fixture):
+ json_dict = load_dict(file_fixture["json_file"], None)
+ assert json_dict["det"] == 1
+
+ yaml_dict = load_dict(file_fixture["yaml_file"], None)
+ assert yaml_dict["det"] == 1
+
+ with pytest.raises(NotImplementedError):
+ load_dict(file_fixture["txt_file"], None)
+
+
+@pytest.fixture
+def file_list(tmp_path):
+ # make a list of files
+ for i in range(5):
+ data = {"det": i}
+
+ # make a json file
+ json_file = tmp_path / f"data_{i}.json"
+ with pathlib.Path.open(json_file, "w") as jf:
+ json.dump(data, jf)
+
+ # and a text file
+ txt_file = tmp_path / f"data_{i}.txt"
+ with pathlib.Path.open(txt_file, "w") as tf:
+ tf.write("Some text.\n")
+ return pathlib.Path(tmp_path)
+
+
+def test_get_file_list(file_list):
+ first_file_list = get_file_list(str(pathlib.Path(file_list) / "data_0.json"))
+ assert len(first_file_list) == 1
+
+ json_file_list = get_file_list(str(pathlib.Path(file_list) / "data*.json"))
+ assert len(json_file_list) == 5
+ json_file_list_repeat = get_file_list(
+ [str(pathlib.Path(file_list) / "data*.json"), str(pathlib.Path(file_list) / "data*.json")]
+ )
+ assert len(json_file_list_repeat) == 5
+ all_file_list = get_file_list([str(pathlib.Path(file_list) / "data*")])
+ assert len(all_file_list) == 10
+
+
+@pytest.fixture(scope="session")
+def test_data_configs():
+ ldata = LegendTestData()
+ ldata.checkout("5f9b368")
+ return ldata.get_path("legend/metadata/hardware/detectors/germanium/diodes")
+
+
+def test_get_hpge(test_data_configs):
+ # specify name in pars
+ hpge = get_hpge(str(test_data_configs), dict2tuple({"meta_name": "C99000A.json"}), "det001")
+ assert isinstance(hpge, HPGe)
+
+ # now read without metaname
+ hpge_ic = get_hpge(str(test_data_configs), dict2tuple({}), "V99000A")
+ assert isinstance(hpge_ic, HPGe)
+
+
+def test_get_phy_vol():
+ gdml_path = configs / pathlib.Path("geom.gdml")
+
+ gdml = pyg4ometry.gdml.Reader(gdml_path).getRegistry()
+
+ # read with the det_phy_vol_name
+ phy = get_phy_vol(gdml, dict2tuple({"phy_vol_name": "det_phy_1"}), "det001")
+
+ assert isinstance(phy, pyg4ometry.geant4.PhysicalVolume)
+
+ # read without
+ phy = get_phy_vol(gdml, dict2tuple({}), "det_phy_0")
+ assert isinstance(phy, pyg4ometry.geant4.PhysicalVolume)
+
+
+@pytest.fixture
+def test_lh5_files(tmp_path):
+ n1 = 14002
+ tab1 = Table(size=n1)
+ tab1.add_field("a", Array(np.ones(n1)))
+ lh5.write(tab1, "hit/vertices", tmp_path / "file1.lh5", wo_mode="of")
+
+ n2 = 25156
+ tab2 = Table(size=n2)
+ tab2.add_field("a", Array(np.ones(n2)))
+
+ lh5.write(tab2, "hit/vertices", tmp_path / "file2.lh5", wo_mode="of")
+
+ n3 = int(1e7)
+ tab3 = Table(size=n3)
+ tab3.add_field("a", Array(np.ones(n3)))
+
+ lh5.write(tab3, "hit/vertices", tmp_path / "file3.lh5", wo_mode="of")
+
+ return tmp_path
+
+
+def test_get_n_sim(test_lh5_files):
+ # single file
+ n1 = get_num_simulated([str(test_lh5_files / "file1.lh5")])
+ assert n1 == [14002]
+
+ # two files
+ n12 = get_num_simulated([str(test_lh5_files / "file1.lh5"), str(test_lh5_files / "file2.lh5")])
+ assert n12 == [14002, 25156]
+
+ # length > buffer
+ n123 = get_num_simulated(
+ [
+ str(test_lh5_files / "file1.lh5"),
+ str(test_lh5_files / "file2.lh5"),
+ str(test_lh5_files / "file3.lh5"),
+ ]
+ )
+ assert n123 == [14002, 25156, int(1e7)]
+
+
+@pytest.fixture
+def test_hit_file(tmp_path):
+ n1 = 10
+ tab_ch1 = Table(size=n1)
+ tab_ch1.add_field("global_evtid", Array(np.array([1, 4, 5, 6, 7, 9, 11, 12, 15, 17])))
+ lh5.write(tab_ch1, "det001/hit", tmp_path / "file1.lh5", wo_mode="of")
+
+ n2 = 5
+ tab_ch2 = Table(size=n2)
+ tab_ch2.add_field("global_evtid", Array(np.array([7, 8, 12, 13, 14])))
+ lh5.write(tab_ch2, "det002/hit", tmp_path / "file1.lh5", wo_mode="append")
+ return tmp_path / "file1.lh5"
+
+
+def test_num_evtid_hit_tier(test_hit_file):
+ # both channels
+ assert get_num_evtid_hit_tier(str(test_hit_file), ["det001", "det002"], "global_evtid") == 17
+
+ # one at a time
+ assert get_num_evtid_hit_tier(str(test_hit_file), ["det001"], "global_evtid") == 17
+ assert get_num_evtid_hit_tier(str(test_hit_file), ["det002"], "global_evtid") == 14
+
+
+def test_read_some_index(test_hit_file):
+ pass
+
+
+def test_global_evtid_range():
+ # raise exception if n_evtid is too large
+ with pytest.raises(ValueError):
+ get_global_evtid_range(100, 1000, 200)
+
+ # test that we get the right ranges
+ assert get_global_evtid_range(200, 5, 2000) == (200, 204)
+ assert get_global_evtid_range(200, None, 2000) == (200, 1999)
+
+
+def test_get_global_evtid():
+ # single file
+ first_evtid = 0
+ vertices = [0, 1, 2, 3, 4, 5]
+ input_evtid = [2, 3, 4, 5, 5, 5]
+ obj = ak.Array({"evtid": input_evtid})
+ assert np.all(get_global_evtid(first_evtid, obj, vertices)._global_evtid == input_evtid)
+
+ # now if we only have some vertices
+ vertices = [0, 2, 4, 6, 8, 10]
+ input_evtid = [4, 6, 8, 10, 10, 10]
+ obj = ak.Array({"evtid": input_evtid})
+ assert np.all(
+ get_global_evtid(first_evtid, obj, vertices)._global_evtid == np.array(input_evtid) / 2.0
+ )
+
+
+def test_get_files_to_read():
+ n_sim = [1000, 1200, 200, 5000]
+ n_sim = np.concatenate([[0], np.cumsum(n_sim)])
+
+ # read all evtid and thus all files
+ assert np.all(get_files_to_read(n_sim, start_glob_evtid=0, end_glob_evtid=7399) == [0, 1, 2, 3])
+
+ # all of file 0
+ assert np.all(get_files_to_read(n_sim, start_glob_evtid=0, end_glob_evtid=999) == [0])
+
+ # and some of file 1
+ assert np.all(get_files_to_read(n_sim, start_glob_evtid=0, end_glob_evtid=1000) == [0, 1])
+
+ # some of file 0, 1 and 2
+ assert np.all(get_files_to_read(n_sim, start_glob_evtid=0, end_glob_evtid=2200) == [0, 1, 2])
+
+ # only file 1 and 2
+ assert np.all(get_files_to_read(n_sim, start_glob_evtid=1000, end_glob_evtid=2300) == [1, 2])
+
+ # only file 3
+ assert np.all(get_files_to_read(n_sim, start_glob_evtid=2400, end_glob_evtid=5000) == [3])
+
+
+def test_skip_chunk():
+ evtid = ak.Array([4, 5, 10, 30])
+
+ assert get_include_chunk(evtid, start_glob_evtid=0, end_glob_evtid=35)
+ assert get_include_chunk(evtid, start_glob_evtid=0, end_glob_evtid=4)
+ assert get_include_chunk(evtid, start_glob_evtid=30, end_glob_evtid=33)
+ assert not get_include_chunk(evtid, start_glob_evtid=0, end_glob_evtid=3)
+ assert not get_include_chunk(evtid, start_glob_evtid=31, end_glob_evtid=100)