Skip to content

Commit

Permalink
Merge branch 'cositools:develop' into fast_ts_map_galactic
Browse files Browse the repository at this point in the history
  • Loading branch information
Yong2Sheng authored Jul 31, 2024
2 parents a540003 + b3f85ab commit 3a560da
Show file tree
Hide file tree
Showing 66 changed files with 17,957 additions and 10,392 deletions.
8 changes: 5 additions & 3 deletions .github/workflows/unit_tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ jobs:
pip install pytest pytest-cov
pytest tests --junitxml=junit/test-results.xml --cov=cosipy --cov-report=xml --cov-report=html
- name: Codecov
uses: codecov/[email protected]
env:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true

298 changes: 190 additions & 108 deletions cosipy/image_deconvolution/RichardsonLucy.py

Large diffs are not rendered by default.

135 changes: 135 additions & 0 deletions cosipy/image_deconvolution/RichardsonLucySimple.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import os
import copy
import numpy as np
import logging
logger = logging.getLogger(__name__)

from histpy import Histogram

from .deconvolution_algorithm_base import DeconvolutionAlgorithmBase

class RichardsonLucySimple(DeconvolutionAlgorithmBase):
"""
A class for the original RichardsonLucy algorithm.
Basically, this class can be used for testing codes.
An example of parameter is as follows.
iteration_max: 100
minimum_flux:
value: 0.0
unit: "cm-2 s-1 sr-1"
background_normalization_optimization: True
"""

def __init__(self, initial_model, dataset, mask, parameter):

DeconvolutionAlgorithmBase.__init__(self, initial_model, dataset, mask, parameter)

self.do_bkg_norm_optimization = parameter.get('background_normalization_optimization', False)

def initialization(self):
"""
initialization before running the image deconvolution
"""
# clear counter
self.iteration_count = 0

# clear results
self.results.clear()

# copy model
self.model = copy.deepcopy(self.initial_model)

# calculate exposure map
self.summed_exposure_map = self.calc_summed_exposure_map()

# mask setting
if self.mask is None and np.any(self.summed_exposure_map.contents == 0):
self.mask = Histogram(self.model.axes, contents = self.summed_exposure_map.contents > 0)
self.model = self.model.mask_pixels(self.mask)
logger.info("There are zero-exposure pixels. A mask to ignore them was set.")

# calculate summed background models for M-step
if self.do_bkg_norm_optimization:
self.dict_summed_bkg_model = {}
for key in self.dict_bkg_norm.keys():
self.dict_summed_bkg_model[key] = self.calc_summed_bkg_model(key)

def pre_processing(self):
"""
pre-processing for each iteration
"""
pass

def Estep(self):
"""
E-step in RL algoritm
"""

self.expectation_list = self.calc_expectation_list(self.model, dict_bkg_norm = self.dict_bkg_norm)
logger.debug("The expected count histograms were updated with the new model map.")

def Mstep(self):
"""
M-step in RL algorithm.
"""

ratio_list = [ data.event / expectation for data, expectation in zip(self.dataset, self.expectation_list) ]

# delta model
sum_T_product = self.calc_summed_T_product(ratio_list)
self.delta_model = self.model * (sum_T_product/self.summed_exposure_map - 1)

# background normalization optimization
if self.do_bkg_norm_optimization:
for key in self.dict_bkg_norm.keys():

sum_bkg_T_product = self.calc_summed_bkg_model_product(key, ratio_list)
sum_bkg_model = self.dict_summed_bkg_model[key]

self.dict_bkg_norm[key] = self.dict_bkg_norm[key] * (sum_bkg_T_product / sum_bkg_model)

def post_processing(self):
"""
Post-processing.
"""
self.model += self.delta_model
self.model[:] = np.where(self.model.contents < self.minimum_flux, self.minimum_flux, self.model.contents)

if self.mask is not None:
self.model = self.model.mask_pixels(self.mask)

def register_result(self):
"""
Register results at the end of each iteration.
"""

this_result = {"iteration": self.iteration_count,
"model": copy.deepcopy(self.model),
"delta_model": copy.deepcopy(self.delta_model),
"background_normalization": copy.deepcopy(self.dict_bkg_norm)}

# show intermediate results
logger.info(f' background_normalization: {this_result["background_normalization"]}')

# register this_result in self.results
self.results.append(this_result)

def check_stopping_criteria(self):
"""
If iteration_count is smaller than iteration_max, the iterative process will continue.
Returns
-------
bool
"""
if self.iteration_count < self.iteration_max:
return False
return True

def finalization(self):
"""
finalization after running the image deconvolution
"""
pass
13 changes: 11 additions & 2 deletions cosipy/image_deconvolution/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
from .data_loader import DataLoader
from .modelmap import ModelMap
from .image_deconvolution import ImageDeconvolution

from .image_deconvolution_data_interface_base import ImageDeconvolutionDataInterfaceBase
from .dataIF_COSI_DC2 import DataIF_COSI_DC2

from .model_base import ModelBase
from .allskyimage import AllSkyImageModel

from .deconvolution_algorithm_base import DeconvolutionAlgorithmBase
from .RichardsonLucy import RichardsonLucy
from .RichardsonLucySimple import RichardsonLucySimple

from .exposure_table import SpacecraftAttitudeExposureTable
from .coordsys_conversion_matrix import CoordsysConversionMatrix
201 changes: 201 additions & 0 deletions cosipy/image_deconvolution/allskyimage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
import astropy.units as u
import numpy as np
import healpy as hp
import copy

import logging
logger = logging.getLogger(__name__)

from histpy import Histogram, Axes, Axis, HealpixAxis

from cosipy.threeml.custom_functions import get_integrated_spectral_model

from .model_base import ModelBase

class AllSkyImageModel(ModelBase):
"""
Photon flux maps in given energy bands. 2-dimensional histogram.
Attributes
----------
nside : int
Healpix NSIDE parameter.
energy_edges : :py:class:`np.array`
Bin edges for energies. We recommend to use a Quantity array with the unit of keV.
scheme : str, default 'ring'
Healpix scheme. Either 'ring', 'nested'.
coordsys : str or :py:class:`astropy.coordinates.BaseRepresentation`, default is 'galactic'
Instrinsic coordinates of the map. The default is 'galactic'.
label_image : str, default 'lb'
The label name of the healpix axis.
label_energy : str, default 'Ei'
The label name of the energy axis. The default is 'Ei'.
"""

def __init__(self,
nside,
energy_edges,
scheme = 'ring',
coordsys = 'galactic',
label_image = 'lb',
label_energy = 'Ei',
unit = '1/(s*cm*cm*sr)'
):

if energy_edges.unit != u.keV:

logger.warning(f"The unit of the given energy_edges is {energy_edges.unit}. It will be converted to keV.")
energy_edges = energy_edges.to('keV')

image_axis = HealpixAxis(nside = nside,
scheme = scheme,
coordsys = coordsys,
label = label_image)

energy_axis = Axis(edges = energy_edges, label = label_energy, scale = "log")

axes = Axes([image_axis, energy_axis])

super().__init__(axes, sparse = False, unit = unit)

@classmethod
def open(cls, filename, name = 'hist'):
"""
Open a file
Parameters
----------
filename: str
Returns
-------
py:class:`AllSkyImageModel`
"""

hist = Histogram.open(filename, name)

allskyimage = AllSkyImageModel(nside = hist.axes[0].nside,
energy_edges = hist.axes[1].edges,
scheme = hist.axes[0].scheme,
coordsys = hist.axes[0].coordsys.name,
label_image = hist.axes[0].label,
label_energy = hist.axes[1].label,
unit = hist.unit)

allskyimage[:] = hist.contents

del hist
return allskyimage

@classmethod
def instantiate_from_parameters(cls, parameter):
"""
Generate an instance of this class
Parameters
----------
parameter : py:class:`yayc.Configurator`
Returns
-------
py:class:`AllSkyImageModel`
Notes
-----
The parameters should be given like this:
nside: 8
energy_edges:
value: [100., 200., 500., 1000., 2000., 5000.]
unit: "keV"
scheme: "ring"
coordinate: "galactic"
unit: "cm-2 s-1 sr-1"
"""

new = cls(nside = parameter['nside'],
energy_edges = parameter['energy_edges']['value'] * u.Unit(parameter['energy_edges']['unit']),
scheme = parameter['scheme'],
coordsys = parameter['coordinate'],
unit = u.Unit(parameter['unit']))

return new

def set_values_from_parameters(self, parameter):
"""
Set the values accordinng to the specified algorithm.
Parameters
----------
parameter : py:class:`yayc.Configurator`
Parameters for the specified algorithm.
Notes
-----
Currently algorithm_name can be only 'flat'. All of the pixel values in each energy bins will set to the given value.
parameter should be {'values': [ flux value at 1st energy bin (without unit), flux value at 2nd energy bin, ...]}.
An example of contents in parameter is like this:
algorithm: "flat"
parameter:
value: [1.0e-2, 1.0e-2, 1.0e-2, 1.0e-2, 1.0e-2]
unit: "cm-2 s-1 sr-1"
"""

algorithm_name = parameter['algorithm']
algorithm_parameter = parameter['parameter']

if algorithm_name == "flat":
unit = u.Unit(algorithm_parameter['unit'])
for idx, value in enumerate(algorithm_parameter['value']):
self[:,idx] = value * unit
# elif algorithm_name == ...
# ...

def set_values_from_extendedmodel(self, extendedmodel):
"""
Set the values accordinng to the given astromodels.ExtendedSource.
Parameters
----------
extendedmodel : :py:class:`astromodels.ExtendedSource`
Extended source model
"""

integrated_flux = get_integrated_spectral_model(spectrum = extendedmodel.spectrum.main.shape,
eaxis = self.axes[1])

npix = self.axes[0].npix
coords = self.axes[0].pix2skycoord(np.arange(npix))

normalized_map = extendedmodel.spatial_shape(coords.l.deg, coords.b.deg) / u.sr

self[:] = np.tensordot(normalized_map, integrated_flux.contents, axes = 0)

def smoothing(self, fwhm = None, sigma = None):
"""
Smooth a map with a Gaussian filter
Parameters
----------
fwhm: :py:class:`astropy.units.quantity.Quantity`
The FWHM of the Gaussian (with a unit of deg or rad). Default: 0 deg
sigma: :py:class:`astropy.units.quantity.Quantity`
The sigma of the Gaussian (with a unit of deg or rad). Override fwhm.
"""

if fwhm is None:
fwhm = 0.0 * u.deg

if sigma is not None:
fwhm = 2.354820 * sigma

allskyimage_new = copy.deepcopy(self)

for i in range(self.axes['Ei'].nbins):
allskyimage_new[:,i] = hp.smoothing(self[:,i].value, fwhm = fwhm.to('rad').value) * self.unit

return allskyimage_new
4 changes: 1 addition & 3 deletions cosipy/image_deconvolution/coordsys_conversion_matrix.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,7 @@ def time_binning_ccm(cls, full_detector_response, orientation, time_intervals, n
quiet = True,
save = False)

time_diff = filtered_orientation.get_time_delta()

dwell_time_map = filtered_orientation.get_dwell_map(response = full_detector_response.filename,
dts = time_diff,
src_path = pixel_movement,
save = False)

Expand Down Expand Up @@ -222,4 +219,5 @@ def open(cls, filename, name = 'hist'):

return new

# TODO
# def calc_exposure_map(self, full_detector_response): #once the response file format is fixed, I will implement this function
Loading

0 comments on commit 3a560da

Please sign in to comment.