Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added STEREO empirical model-based straylight #104

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 61 additions & 43 deletions simpunch/level0.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@
from regularizepsf import ArrayPSFTransform

from simpunch.spike import generate_spike_image
from simpunch.util import update_spacecraft_location, write_array_to_fits
from simpunch.util import (generate_stray_light, update_spacecraft_location,
write_array_to_fits)


def perform_photometric_uncalibration(input_data: NDCube, coefficient_array: np.ndarray) -> NDCube:
Expand All @@ -32,14 +33,16 @@
input_data.data[...] = new_data[...]
return input_data

def add_spikes(input_data: NDCube) -> (NDCube, np.ndarray):

def add_spikes(input_data: NDCube) -> tuple[NDCube, np.ndarray]:
"""Add spikes to images."""
spike_image = generate_spike_image(input_data.data.shape)
input_data.data[...] += spike_image
return input_data, spike_image


def create_streak_matrix(
n: int, exposure_time: float, readout_line_time: float, reset_line_time: float,
n: int, exposure_time: float, readout_line_time: float, reset_line_time: float,
) -> np.ndarray:
"""Construct the matrix that streaks an image."""
lower = np.tril(np.ones((n, n)) * readout_line_time, -1)
Expand All @@ -51,8 +54,8 @@

def apply_streaks(input_data: NDCube,
exposure_time: float = 49 * 1000,
readout_line_time: float = 163/2148,
reset_line_time: float = 163/2148) -> NDCube:
readout_line_time: float = 163 / 2148,
reset_line_time: float = 163 / 2148) -> NDCube:
"""Apply the streak matrix to the image."""
streak_matrix = create_streak_matrix(input_data.data.shape[0],
exposure_time, readout_line_time, reset_line_time)
Expand All @@ -65,8 +68,15 @@
return input_data


def add_stray_light(input_data: NDCube) -> NDCube:
def add_stray_light(input_data: NDCube,
inst: str = "WFI",
polar: str = "mzp") -> NDCube:
"""Add stray light to the image."""
straydata = generate_stray_light(input_data.data.shape, instrument=inst)
if polar == "mzp":
input_data.data[:, :] = input_data.data[:, :] + straydata[1]

Check warning on line 77 in simpunch/level0.py

View check run for this annotation

Codecov / codecov/patch

simpunch/level0.py#L77

Added line #L77 was not covered by tests
else:
input_data.data[:, :] = input_data.data[:, :] + straydata[0]
return input_data


Expand All @@ -75,20 +85,21 @@
input_data.data[...] = psf_model.apply(input_data.data)[...]
return input_data


def add_transients(input_data: NDCube,
transient_area: int = 600**2,
transient_probability:float = 0.03,
transient_brightness_range: (float, float) = (0.6, 0.8)) -> NDCube:
transient_area: int = 600 ** 2,
transient_probability: float = 0.03,
transient_brightness_range: tuple[float, float] = (0.6, 0.8)) -> NDCube:
"""Add a block of brighter transient data to simulate aurora."""
transient_image = np.zeros_like(input_data.data)
if random() < transient_probability:
width = int(np.sqrt(transient_area) * random())
height = int(transient_area / width)
i, j = int(random() * input_data.data.shape[0]), int(random() * input_data.data.shape[1])
transient_brightness = np.random.uniform(transient_brightness_range[0], transient_brightness_range[1])
transient_value = np.mean(input_data.data[i:i+width, j:j+height]) * transient_brightness
input_data.data[i:i+width, j:j+height] += transient_value
transient_image[i:i+width, j:j+height] = transient_value
transient_value = np.mean(input_data.data[i:i + width, j:j + height]) * transient_brightness
input_data.data[i:i + width, j:j + height] += transient_value
transient_image[i:i + width, j:j + height] = transient_value

Check warning on line 102 in simpunch/level0.py

View check run for this annotation

Codecov / codecov/patch

simpunch/level0.py#L100-L102

Added lines #L100 - L102 were not covered by tests
return input_data, transient_image


Expand All @@ -111,11 +122,11 @@
@task
def generate_l0_pmzp(input_file: NDCube,
path_output: str,
psf_model_path: str, # ArrayPSFTransform,
wfi_quartic_coeffs_path: str, # np.ndarray,
nfi_quartic_coeffs_path: str, # np.ndarray,
transient_probability: float=0.03,
shift_pointing: bool=False) -> None:
psf_model_path: str, # ArrayPSFTransform,
wfi_quartic_coeffs_path: str, # np.ndarray,
nfi_quartic_coeffs_path: str, # np.ndarray,
transient_probability: float = 0.03,
shift_pointing: bool = False) -> None:
"""Generate level 0 polarized synthetic data."""
input_data = load_ndcube_from_fits(input_file)
psf_model = ArrayPSFTransform.load(Path(psf_model_path))
Expand Down Expand Up @@ -146,22 +157,25 @@
output_data, transient = add_transients(output_data, transient_probability=transient_probability)
output_data = uncorrect_psf(output_data, psf_model)

inst = "WFI" \

Check warning on line 160 in simpunch/level0.py

View check run for this annotation

Codecov / codecov/patch

simpunch/level0.py#L160

Added line #L160 was not covered by tests
if input_data.meta["OBSCODE"].value != "4" else "NFI"

# TODO - look for stray light model from WFI folks? Or just use some kind of gradient with poisson noise.
output_data = add_stray_light(output_data)
output_data = add_stray_light(output_data, inst = inst, polar="mzp")

Check warning on line 164 in simpunch/level0.py

View check run for this annotation

Codecov / codecov/patch

simpunch/level0.py#L164

Added line #L164 was not covered by tests
output_data = add_deficient_pixels(output_data)
output_data = apply_streaks(output_data)
output_data = perform_photometric_uncalibration(output_data, quartic_coefficients)

if input_data.meta["OBSCODE"].value == "4":
scaling = {"gain": 4.9 * u.photon / u.DN,
"wavelength": 530. * u.nm,
"exposure": 49 * u.s,
"aperture": 49.57 * u.mm**2}
"wavelength": 530. * u.nm,
"exposure": 49 * u.s,
"aperture": 49.57 * u.mm ** 2}
else:
scaling = {"gain": 4.9 * u.photon / u.DN,
"wavelength": 530. * u.nm,
"exposure": 49 * u.s,
"aperture": 34 * u.mm ** 2}
"wavelength": 530. * u.nm,
"exposure": 49 * u.s,
"aperture": 34 * u.mm ** 2}
output_data.data[:, :] = msb_to_dn(output_data.data[:, :], output_data.wcs, **scaling)

noise = compute_noise(output_data.data)
Expand All @@ -175,7 +189,7 @@

# Set output dtype
# TODO - also check this in the output data w/r/t BITPIX
output_data.data[output_data.data > 2**10-1] = 2**10-1
output_data.data[output_data.data > 2 ** 10 - 1] = 2 ** 10 - 1

Check warning on line 192 in simpunch/level0.py

View check run for this annotation

Codecov / codecov/patch

simpunch/level0.py#L192

Added line #L192 was not covered by tests
output_data.meta["DESCRPTN"] = "Simulated " + output_data.meta["DESCRPTN"].value
output_data.meta["TITLE"] = "Simulated " + output_data.meta["TITLE"].value

Expand All @@ -192,13 +206,14 @@
write_array_to_fits(path_output + get_base_file_name(output_data) + "_transient.fits", transient)
original_wcs.to_header().tofile(path_output + get_base_file_name(output_data) + "_original_wcs.txt")


@task
def generate_l0_cr(input_file: NDCube, path_output: str,
psf_model_path: str, # ArrayPSFTransform,
wfi_quartic_coeffs_path: str, # np.ndarray,
nfi_quartic_coeffs_path: str, # np.ndarray,
psf_model_path: str, # ArrayPSFTransform,
wfi_quartic_coeffs_path: str, # np.ndarray,
nfi_quartic_coeffs_path: str, # np.ndarray,
transient_probability: float = 0.03,
shift_pointing: bool=False) -> None:
shift_pointing: bool = False) -> None:
"""Generate level 0 clear synthetic data."""
input_data = load_ndcube_from_fits(input_file)
psf_model = ArrayPSFTransform.load(Path(psf_model_path))
Expand Down Expand Up @@ -226,23 +241,25 @@
else:
output_data = input_data
original_wcs = input_data.wcs.copy()
inst = "WFI" \

Check warning on line 244 in simpunch/level0.py

View check run for this annotation

Codecov / codecov/patch

simpunch/level0.py#L244

Added line #L244 was not covered by tests
if input_data.meta["OBSCODE"].value != "4" else "NFI"
output_data, transient = add_transients(output_data, transient_probability=transient_probability)
output_data = uncorrect_psf(output_data, psf_model)
output_data = add_stray_light(output_data)
output_data = add_stray_light(output_data, inst=inst, polar="clear")

Check warning on line 248 in simpunch/level0.py

View check run for this annotation

Codecov / codecov/patch

simpunch/level0.py#L248

Added line #L248 was not covered by tests
output_data = add_deficient_pixels(output_data)
output_data = apply_streaks(output_data)
output_data = perform_photometric_uncalibration(output_data, quartic_coefficients)

if input_data.meta["OBSCODE"].value == "4":
scaling = {"gain": 4.9 * u.photon / u.DN,
"wavelength": 530. * u.nm,
"exposure": 49 * u.s,
"aperture": 49.57 * u.mm**2}
"wavelength": 530. * u.nm,
"exposure": 49 * u.s,
"aperture": 49.57 * u.mm ** 2}
else:
scaling = {"gain": 4.9 * u.photon / u.DN,
"wavelength": 530. * u.nm,
"exposure": 49 * u.s,
"aperture": 34 * u.mm ** 2}
"wavelength": 530. * u.nm,
"exposure": 49 * u.s,
"aperture": 34 * u.mm ** 2}
output_data.data[:, :] = msb_to_dn(output_data.data[:, :], output_data.wcs, **scaling)

noise = compute_noise(output_data.data)
Expand All @@ -252,7 +269,7 @@

output_data.data[:, :] = encode_sqrt(output_data.data[:, :], to_bits=10)

output_data.data[output_data.data > 2**10-1] = 2**10-1
output_data.data[output_data.data > 2 ** 10 - 1] = 2 ** 10 - 1

Check warning on line 272 in simpunch/level0.py

View check run for this annotation

Codecov / codecov/patch

simpunch/level0.py#L272

Added line #L272 was not covered by tests
output_data.meta["DESCRPTN"] = "Simulated " + output_data.meta["DESCRPTN"].value
output_data.meta["TITLE"] = "Simulated " + output_data.meta["TITLE"].value

Expand All @@ -269,9 +286,10 @@
write_array_to_fits(path_output + get_base_file_name(output_data) + "_transient.fits", transient)
original_wcs.to_header().tofile(path_output + get_base_file_name(output_data) + "_original_wcs.txt")


@flow(log_prints=True,
task_runner=DaskTaskRunner(cluster_kwargs={"n_workers": 64, "threads_per_worker": 2},
))
))
def generate_l0_all(datadir: str,
outputdir: str,
psf_model_path: str,
Expand All @@ -287,19 +305,19 @@
# Parse list of level 1 model data
files_l1 = glob.glob(datadir + "/synthetic_l1/*L1_P*_v1.fits")
files_cr = glob.glob(datadir + "/synthetic_l1/*CR*_v1.fits")
print(f"Generating based on {len(files_l1)+len(files_cr)} files.")
print(f"Generating based on {len(files_l1) + len(files_cr)} files.")

Check warning on line 308 in simpunch/level0.py

View check run for this annotation

Codecov / codecov/patch

simpunch/level0.py#L308

Added line #L308 was not covered by tests
files_l1.sort()
files_cr.sort()

futures = []
for file_l1 in files_l1:
futures.append(generate_l0_pmzp.submit(file_l1, outdir, psf_model_path, # noqa: PERF401
wfi_quartic_coeffs_path, nfi_quartic_coeffs_path,
transient_probability, shift_pointing))
wfi_quartic_coeffs_path, nfi_quartic_coeffs_path,
transient_probability, shift_pointing))

for file_cr in files_cr:
futures.append(generate_l0_cr.submit(file_cr, outdir, psf_model_path, # noqa: PERF401
wfi_quartic_coeffs_path, nfi_quartic_coeffs_path,
transient_probability, shift_pointing))
wfi_quartic_coeffs_path, nfi_quartic_coeffs_path,
transient_probability, shift_pointing))

wait(futures)
63 changes: 63 additions & 0 deletions simpunch/tests/test_stray_light.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from datetime import datetime

import numpy as np
import pytest
from astropy.nddata import StdDevUncertainty
from astropy.wcs import WCS
from ndcube import NDCube
from punchbowl.data import NormalizedMetadata

from simpunch.level0 import add_stray_light
from simpunch.util import generate_stray_light


@pytest.fixture()
def sample_ndcube() -> NDCube:
def _sample_ndcube(shape: tuple, code:str = "CR1", level:str = "0") -> NDCube:
data = np.random.random(shape).astype(np.float32) * 1e-12
sqrt_abs_data = np.sqrt(np.abs(data))
uncertainty = StdDevUncertainty(np.interp(sqrt_abs_data, (sqrt_abs_data.min(), sqrt_abs_data.max()),
(0,1)).astype(np.float32))
wcs = WCS(naxis=2)
wcs.wcs.ctype = "HPLN-ARC", "HPLT-ARC"
wcs.wcs.cunit = "deg", "deg"
wcs.wcs.cdelt = 0.1, 0.1
wcs.wcs.crpix = 0, 0
wcs.wcs.crval = 1, 1
wcs.wcs.cname = "HPC lon", "HPC lat"

meta = NormalizedMetadata.load_template(code, level)
meta["DATE-OBS"] = str(datetime(2024, 2, 22, 16, 0, 1))
meta["FILEVRSN"] = "1"
return NDCube(data=data, uncertainty=uncertainty, wcs=wcs, meta=meta)
return _sample_ndcube


def test_generate_stray_light() -> None:
shape = (2048,2048)

stray_light_wfi = generate_stray_light(shape=shape, instrument="WFI")
stray_light_nfi = generate_stray_light(shape=shape, instrument="NFI")

assert np.sum(stray_light_wfi[0]) != 0
assert np.sum(stray_light_wfi[1]) != 0
assert np.sum(stray_light_nfi[0]) != 0
assert np.sum(stray_light_nfi[1]) != 0

assert stray_light_wfi[0].shape == shape
assert stray_light_wfi[1].shape == shape
assert stray_light_nfi[0].shape == shape
assert stray_light_nfi[1].shape == shape


def test_stray_light(sample_ndcube: NDCube) -> None:
"""Test stray light addition."""
input_data = sample_ndcube((2048,2048))

input_data_array = input_data.data.copy()

stray_light_data = add_stray_light(input_data, polar="clear")

assert isinstance(stray_light_data, NDCube)
assert np.sum(stray_light_data.data - input_data_array) != 0
assert (stray_light_data.data != input_data_array).all()
53 changes: 53 additions & 0 deletions simpunch/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,56 @@
hdul = fits.HDUList([fits.PrimaryHDU(), hdu_data])
hdul.writeto(path, overwrite=overwrite, checksum=True)
hdul.close()


def generate_stray_light(shape: tuple, instrument:str="WFI")->tuple[np.ndarray, np.ndarray]:
"""
Generate stray light arrays for B and pB channels for WFI and NFI instruments.

Parameters
----------
- shape: tuple, the shape of the output array (height, width).
- instrument: str, either 'WFI' or 'NFI' specifying the formula to use.

Returns
-------
- strayarray_B: 2D numpy array, intensity for B channel.
- strayarray_pB: 2D numpy array, intensity for pB channel.
"""
strayarray_b = np.zeros(shape)
strayarray_pb = np.zeros(shape)

y, x = np.indices(shape)

if instrument == "WFI":
center = (1024, 0)
a, b, c = [-11.22425753, -0.03322594, 0.78295454] # from empirical model using STEREO data
x -= center[1]
y -= center[0]
r = np.sqrt(x ** 2 + y ** 2)
r = (r * (160 / shape[0])) + 20
def intensity_func(r:float, a:float, b:float, c:float)->float:
return a + b * r ** c
elif instrument == "NFI":
center = (1024, 1024)
a, b, c = [3796.09, -1399.18, 0.0003] # from empirical model using STEREO data
x -= center[1]
y -= center[0]
r = np.sqrt(x ** 2 + y ** 2)
r_threshold = 5.6
r = (r * (32 / center[0])) * (r > r_threshold)
def intensity_func(r:float, a:float, b:float, c:float)->float:
return a + b * np.exp(r ** c)
else:
msg = "Instrument must be 'WFI' or 'NFI'"
raise ValueError(msg)

Check warning on line 100 in simpunch/util.py

View check run for this annotation

Codecov / codecov/patch

simpunch/util.py#L99-L100

Added lines #L99 - L100 were not covered by tests

# Calculate intensity for B channel
intensity_b = intensity_func(r, a, b, c)
strayarray_b[:, :] = 10 ** intensity_b

# Calculate intensity for pB channel (2 orders of magnitude less than B)
intensity_pb = intensity_func(r, a - 2, b, c)
strayarray_pb[:, :] = 10 ** intensity_pb

return strayarray_b, strayarray_pb
Loading