diff --git a/src/graphnet/data/extractors/icecube/i3calorimetry.py b/src/graphnet/data/extractors/icecube/i3calorimetry.py index 09f98e73b..f0af73aab 100644 --- a/src/graphnet/data/extractors/icecube/i3calorimetry.py +++ b/src/graphnet/data/extractors/icecube/i3calorimetry.py @@ -1,6 +1,6 @@ """Extract all the visible particles entering the volume.""" -from typing import Dict, Any, TYPE_CHECKING, Tuple, List +from typing import Dict, Any, TYPE_CHECKING, Tuple, Union, List from .utilities.gcd_hull import GCD_hull from .i3extractor import I3Extractor @@ -8,14 +8,19 @@ import numpy as np from graphnet.utilities.imports import has_icecube_package +from copy import deepcopy +from collections import deque if has_icecube_package() or TYPE_CHECKING: from icecube import ( icetray, dataclasses, MuonGun, + simclasses, ) # pyright: reportMissingImports=false + DARK = dataclasses.I3Particle.ParticleShape.Dark + class I3Calorimetry(I3Extractor): """Event level energy labeling for IceCube data. @@ -32,7 +37,6 @@ def __init__( extractor_name: str = "I3Calorimetry", daughters: bool = False, highest_energy_primary: bool = False, - cascade_deposited_only: bool = True, **kwargs: Any, ) -> None: """Create a ConvexHull object from the GCD file. @@ -42,29 +46,10 @@ def __init__( mctree: Name of the I3MCTree in the frame. mmctracklist: Name of the MMCTrackList in the frame. extractor_name: Name of the extractor. - daughters: If True, only calculate energies for particles - that are daughters of the primary. - highest_energy_primary: If True, takes into account only the - primary with the highest energy. - NOTE: Only makes a difference if daughters is False - and the event is not a Corsika event. - cascade_deposited_only: If True, consider only energies from - cascades that are marked as visible. If False the total - energy of a cascade is counted. - - Variable explanation: - - e_entrance_track: Total energy of tracks entering the hull. - - e_deposited_track: Total energy deposited by tracks in the hull. - - e_cascade: Total energy of cascade particles in the hull. - - e_visible: Total energy of particles entering the hull. - NOTE: if daughters is True, this is the total visible energy - of daughter particles of the primary particles. If this is 0 - that means that all the light in the detector comes from - particles that are daughters of coincident primaries. - - fraction_primary: Fraction of `e_visible` compared to - the primary energy. - - fraction_cascade: Fraction of the total energy that is - deposited by cascade particles compared to the total energy. + daughters: If True, only consider particles that are + daughters of the primary. + highest_energy_primary: If True, only consider particles that are + daughters of the highest energy primary. """ # Member variable(s) self.hull = hull @@ -72,59 +57,38 @@ def __init__( self.mmctracklist = mmctracklist self.daughters = daughters self.highest_energy_primary = highest_energy_primary - self.cascade_deposited_only = cascade_deposited_only # Base class constructor super().__init__(extractor_name=extractor_name, **kwargs) def __call__(self, frame: "icetray.I3Frame") -> Dict[str, Any]: """Extract all the visible particles entering the volume.""" output = {} + # copy the original mctree because we will be modifying it + tree_copy = deepcopy(frame[self.mctree]) if self.frame_contains_info(frame): - primaries = self.get_primaries( - frame, - self.daughters, - self.highest_energy_primary, + primary_energy = sum( + [ + p.energy + for p in self.check_primary_energy( + frame, + self.get_primaries( + frame, + self.daughters, + highest_energy_primary=self.highest_energy_primary, + ), + ) + ] ) - if not len(primaries) == 0: - - MMCTrackList = frame[self.mmctracklist] - # Filter tracks that are not daughters of the desired - if self.daughters: - temp_MMCTrackList = [] - for track in MMCTrackList: - for p in primaries: - if frame[self.mctree].is_in_subtree( - p.id, track.GetI3Particle().id - ): - temp_MMCTrackList.append(track) - break - MMCTrackList = temp_MMCTrackList - - # Create a lookup dict for the tracks - track_lookup = {} - for track in MuonGun.Track.harvest( - frame[self.mctree], MMCTrackList - ): - track_lookup[track.id] = track - - e_cascade, e_dep_track, e_ent_track = self.get_energies( - frame, primaries, track_lookup - ) + e_entrance_track, e_deposited_track = self.total_track_energy( + frame + ) - primary_energy = sum([p.energy for p in primaries]) - else: - e_ent_track = np.nan - e_dep_track = np.nan - e_cascade = np.nan - primary_energy = np.nan + e_deposited_cascade = self.total_cascade_energy(frame) - e_total = e_ent_track + e_cascade + e_total = e_entrance_track + e_deposited_cascade - # In case all particles are considered and - # there is no energy deposited in the hull, - # we warn the user. if all( ( not self.daughters, @@ -137,53 +101,42 @@ def __call__(self, frame: "icetray.I3Frame") -> Dict[str, Any]: "Think about increasing the padding." f"\nCurrent padding: {self.hull.padding}" f"\nTotal energy: {e_total}" - f"\nTrack energy: {e_ent_track}" - f"\nCascade energy: {e_cascade}" + f"\nTrack energy: {e_entrance_track}" + f"\nCascade energy: {e_deposited_cascade}" f"\nEvent header: {frame['I3EventHeader']}" ) - # Check only in the case that there were primaries - if not len(primaries) == 0 and (not np.isnan(e_total)): - - # total energy should always be less than the primary energy - assert e_total <= ( - primary_energy * (1 + 1e-6) + if self.daughters: + assert e_total <= (primary_energy * (1 + 1e-6)) or ( + e_total - primary_energy < 0.5 ), "Total energy on entrance is greater than primary energy\ \nTotal energy: {}\ \nPrimary energy: {}\ \nTrack energy: {}\ \nCascade energy: {}\ - {}".format( + {}".format( # allow for differences due to mass -> kinetic energy conversion and numerical precision e_total, primary_energy, - e_ent_track, - e_cascade, + e_entrance_track, + e_deposited_cascade, frame["I3EventHeader"], ) - assert ( - primary_energy > 0 - ), "Primary energy is 0, this should not happen.\ - \nTotal energy: {}\ - \nTrack energy: {}\ - \nCascade energy: {}\ - {}".format( - e_total, - e_ent_track, - e_cascade, - frame["I3EventHeader"], - ) - fraction_primary = e_total / primary_energy - cascade_fraction = None if e_total > 0: - cascade_fraction = e_cascade / e_total + cascade_fraction = e_deposited_cascade / e_total + if primary_energy > 0: + fraction_primary = e_total / primary_energy + else: + fraction_primary = None output.update( { - "e_entrance_track_" + self._extractor_name: e_ent_track, - "e_deposited_track_" + self._extractor_name: e_dep_track, - "e_cascade_" + self._extractor_name: e_cascade, + "e_entrance_track_" + + self._extractor_name: e_entrance_track, + "e_deposited_track_" + + self._extractor_name: e_deposited_track, + "e_cascade_" + self._extractor_name: e_deposited_cascade, "e_visible_" + self._extractor_name: e_total, "fraction_primary_" + self._extractor_name: fraction_primary, @@ -193,146 +146,159 @@ def __call__(self, frame: "icetray.I3Frame") -> Dict[str, Any]: ) output = {k: v for k, v in output.items() if k not in self._exclude} + # restore original mctree + frame.Delete(self.mctree) + frame[self.mctree] = tree_copy return output - def get_energies( - self, - frame: "icetray.I3Frame", - particles: List["dataclasses.I3Particle"], - track_lookup: Dict["icetray.I3ParticleID", "icetray.I3Particle"], - ) -> Tuple[float, float, float]: - """Get the total energy of cascade particles on entrance.""" - e_cascade = 0 - e_dep_track = 0 - e_ent_track = 0 + def frame_contains_info(self, frame: "icetray.I3Frame") -> bool: + """Check if the frame contains the necessary information.""" + return self.mctree in frame and self.mmctracklist in frame - if len(particles) == 0: - return e_cascade, e_dep_track, e_ent_track - - for particle in particles: - length = particle.length - if length != length: - length = 0 - # If the particle is a track in the MMCTrackList take the - # energy at the entrance and exit of the hull. - # NOTE: We do not consider daughters of tracks, - # because they are already included in the track energy. - if particle.is_track & (particle.id in track_lookup): - track = track_lookup[particle.id] - - # Find distance to entrance and exit from sampling volume - intersections = self.hull.surface.intersection( - track.pos, track.dir - ) - # Get the corresponding energies + def total_track_energy( + self, frame: "icetray.I3Frame" + ) -> Tuple[float, float]: + """Get the total energy of track particles on entrance.""" + e_entrance = 0 + e_deposited = 0 + primaries = self.get_primaries( + frame, self.daughters, self.highest_energy_primary + ) + primaries = self.check_primary_energy(frame, primaries) + + MMCTrackList = frame[self.mmctracklist] + if self.daughters: + MMCTrackList_filtered = [] + for track in MMCTrackList: try: - e0 = track.get_energy(intersections.first) - e1 = track.get_energy(intersections.second) - - # Catch MuonGun errors - except RuntimeError as e: - if ( - "sum of losses is smaller than " - "energy at last checkpoint" in str(e) - ): - hdr = frame["I3EventHeader"] - e.add_note(f"Error in MuonGun track in event {hdr}") - self.warning(f"Skipping bad event {hdr}: {e}") - e0 = np.nan - e1 = np.nan - e_cascade = np.nan - continue # skip this frame - else: - raise # re-raise unexpected errors - - e_dep_track += e0 - e1 - e_ent_track += e0 - # if the particle is not in the hull, but has daughters, - # we add the energies of the daughters. - elif not self.is_in_hull(particle): - daughters = dataclasses.I3MCTree.get_daughters( - frame[self.mctree], particle - ) - if len(daughters) == 0: - continue - ( - e_cascade, - e_dep_track, - e_ent_track, - ) = tuple( - np.add( - (e_cascade, e_dep_track, e_ent_track), - self.get_energies( - frame, - daughters, - track_lookup, - ), - ) - ) - # If the particle is a cascade in the hull, we add its energy. - elif particle.is_cascade: - if self.cascade_deposited_only: - # Check wether the cascade is made up of smaller segments - # in this case the shape is dark and we want to count - # the energy of its daughters. if ( - particle.shape - != dataclasses.I3Particle.ParticleShape.Dark + frame[self.mctree].get_primary(track.GetI3Particle()) + in primaries ): - e_cascade += particle.energy - else: - ( - e_cascade, - e_dep_track, - e_ent_track, - ) = tuple( - np.add( - (e_cascade, e_dep_track, e_ent_track), - self.get_energies( - frame, - dataclasses.I3MCTree.get_daughters( - frame[self.mctree], particle - ), - track_lookup, - ), - ) + MMCTrackList_filtered.append(track) + except RuntimeError as e: + if "particle not found" in str(e): + # log warning with event header + self.warning( + f"Could not find primary for track {track.GetI3Particle()}" + f" in event {frame['I3EventHeader']}: {e}" ) - else: - # In this case we consider the total cascade - # energy and therefore do not look at the daughters - e_cascade += particle.energy - # The particle is in the hull and not a track in the MMCTrackList, - # or a cascade, so we look at its daughters. - # Could be a NuMu interacting within the hull. - else: - ( - e_cascade, - e_dep_track, - e_ent_track, - ) = tuple( - np.add( - (e_cascade, e_dep_track, e_ent_track), - self.get_energies( - frame, - dataclasses.I3MCTree.get_daughters( - frame[self.mctree], particle - ), - track_lookup, - ), - ) - ) + # continue to next track + else: + raise e + + MMCTrackList = simclasses.I3MMCTrackList(MMCTrackList_filtered) + + track_list = np.array( + MuonGun.Track.harvest(frame[self.mctree], MMCTrackList) + ) + + while len(track_list) > 0: + track = track_list[0] + track_list = track_list[1:] + try: + particle = frame[self.mctree].get_particle(track.id) + except RuntimeError: + # If the particle does not exist in the mctree, that means a partcle further up was processed and therefore it should not be counted + continue + + # Find distance to entrance and exit from sampling volume + intersections = self.hull.surface.intersection( + track.pos, track.dir + ) - return e_cascade, e_dep_track, e_ent_track + # Check if the track actually enters the volume + if not ( + np.isfinite(intersections.first) + and (intersections.first < particle.length) + ): + continue - def frame_contains_info(self, frame: "icetray.I3Frame") -> bool: - """Check if the frame contains the necessary information.""" - return self.mctree in frame and self.mmctracklist in frame + # Get the corresponding energies + try: + e0 = track.get_energy(intersections.first) + e1 = track.get_energy(intersections.second) - def is_in_hull(self, particle: "dataclasses.I3Particle") -> bool: - """Check if a particle is in the hull.""" - pos = np.array(particle.pos) - direc = np.array([particle.dir.x, particle.dir.y, particle.dir.z]) - length = particle.length if particle.length is not None else 0 - pos = pos + direc * length + except RuntimeError as e: + if ( + "sum of losses is smaller than " + "energy at last checkpoint" in str(e) + ): + hdr = frame["I3EventHeader"] + e.add_note(f"Error in MuonGun track in event {hdr}") + self.warning( + f"Skipping bad track {hdr}: {e}" + f"\nTotal energy of offending particle: {particle.energy}" + ) + continue + else: + raise + + # Accumulate + e_deposited += e0 - e1 + e_entrance += e0 + # get descendant ids + # erase particle and children from mctree + frame[self.mctree].erase(track.id) + + # Sanity check ensuring no double counting + if self.daughters: + assert e_entrance <= sum( + [p.energy for p in primaries] + ), "Energy on entrance is greater than primary energy" + assert e_deposited <= sum( + [p.energy for p in primaries] + ), "Energy deposited is greater than primary energy" + return e_entrance, e_deposited + + def total_cascade_energy( + self, + frame: "icetray.I3Frame", + ) -> float: + """Get the total energy of cascade particles on entrance.""" + particles = deque( + self.get_primaries( + frame, self.daughters, self.highest_energy_primary + ) + ) - return self.hull.point_in_hull(pos) + if len(particles) == 0: + return 0.0 + + pos_list, direc_list, length_list, cascade_bool, energies = ( + [], + [], + [], + [], + [], + ) + + mctree = frame[self.mctree] + while len(particles) > 0: + p = particles.popleft() + p_children = mctree.get_daughters(p) + if len(p_children) > 0: + particles.extend(p_children) + continue + if p.is_track or p.shape == DARK: + continue + + pos_list.append([p.pos.x, p.pos.y, p.pos.z]) + direc_list.append([p.dir.x, p.dir.y, p.dir.z]) + length_list.append(p.length) + cascade_bool.append(p.is_cascade) + energies.append(p.energy) + + if len(energies) == 0: + return 0.0 + + length = np.array(length_list).astype(float) + length[np.isnan(length)] = 0 + pos = np.asarray(pos_list) + direc = np.asarray(direc_list) + cascade_bool = np.array(cascade_bool) + energies = np.array(energies) + pos = (pos.T + direc.T * length).T + in_hull = self.hull.point_in_hull(pos) + + return np.sum(energies[cascade_bool & in_hull])