Skip to content

Commit

Permalink
Tests for flat option
Browse files Browse the repository at this point in the history
  • Loading branch information
melanieclarke committed Jan 9, 2025
1 parent 8db16c3 commit 084ee4a
Show file tree
Hide file tree
Showing 3 changed files with 168 additions and 2 deletions.
3 changes: 2 additions & 1 deletion jwst/clean_flicker_noise/clean_flicker_noise.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,13 +1043,14 @@ def _read_flat_file(input_model, flat_filename):
return None

# Open the provided flat as FlatModel
log.debug('Dividing by flat data prior to fitting')
flat = datamodels.FlatModel(flat_filename)

# Extract subarray from reference data, if necessary
if ref_matches_sci(input_model, flat):
flat_data = flat.data
else:
log.info("Extracting matching subarray from flat")
log.debug("Extracting matching subarray from flat")
sub_flat = get_subarray_model(input_model, flat)
flat_data = sub_flat.data
sub_flat.close()
Expand Down
122 changes: 122 additions & 0 deletions jwst/clean_flicker_noise/tests/test_clean_flicker_noise.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,31 @@ def make_small_rateints_model(shape=(3, 5, 10, 10)):
return ratemodel


def make_flat_model(model, shape=(10, 10), value=None):
# make a flat model with appropriate size and metadata
flat = datamodels.FlatModel()
if value is None:
flat.data = np.arange(shape[0] * shape[1], dtype=float).reshape(shape)
else:
flat.data = np.full(shape, value)

# add required metadata
flat.meta.description = 'test'
flat.meta.reftype = 'test'
flat.meta.author = 'test'
flat.meta.pedigree = 'test'
flat.meta.useafter = 'test'

# copy any other matching metadata
flat.update(model)

# make sure shape keys match input
flat.meta.subarray.xsize = shape[1]
flat.meta.subarray.ysize = shape[0]

return flat


def make_nirspec_ifu_model(shape=(2048, 2048)):
hdul = create_nirspec_ifu_file(grating='PRISM', filter='CLEAR',
gwa_xtil=0.35986012, gwa_ytil=0.13448857,
Expand Down Expand Up @@ -846,3 +871,100 @@ def test_do_correction_save_intermediate(save_type, input_type):
assert noise is None

model.close()


@pytest.mark.parametrize('input_type', ['rate', 'rateints', 'ramp'])
def test_do_correction_with_flat_unity(tmp_path, input_type, log_watcher):
# make input data
shape = (3, 5, 20, 20)
if input_type == 'rate':
model = make_small_rate_model(shape)
elif input_type == 'rateints':
model = make_small_rateints_model(shape)
else:
model = make_small_ramp_model(shape)

# make a flat image matching the input data
flat = make_flat_model(model, shape=shape[-2:], value=1.0)
flat_file = str(tmp_path / 'flat.fits')
flat.save(flat_file)

log_watcher.message = 'Dividing by flat'
cleaned, _, _, _, status = cfn.do_correction(model, flat_filename=flat_file, background_method=None)
log_watcher.assert_seen()
assert status == 'COMPLETE'

# output is flat with uniform flat, background is perfectly removed
assert np.all(cleaned.data == 0.0)

model.close()
flat.close()


@pytest.mark.parametrize('apply_flat', [True, False])
@pytest.mark.parametrize('input_type', ['rate', 'rateints', 'ramp'])
def test_do_correction_with_flat_structure(tmp_path, log_watcher, input_type, apply_flat):
# make input data
shape = (3, 5, 20, 20)
if input_type == 'rate':
model = make_small_rate_model(shape)
elif input_type == 'rateints':
model = make_small_rateints_model(shape)
else:
model = make_small_ramp_model(shape)

# make a flat image matching the input data
flat = make_flat_model(model, shape=shape[-2:])
if apply_flat:
flat_file = str(tmp_path / 'flat.fits')
flat.save(flat_file)
else:
flat_file = None

# multiply the data by the flat to mock real structure
model.data *= flat.data

log_watcher.message = 'Dividing by flat'
cleaned, _, _, _, status = cfn.do_correction(model, flat_filename=flat_file)
assert status == 'COMPLETE'

if apply_flat:
log_watcher.assert_seen()

# output is the same as input: flat structure is not removed
assert np.all(cleaned.data == model.data)
else:
log_watcher.assert_not_seen()

# output is not the same as input: flat structure is fit as background/noise
assert not np.all(cleaned.data == model.data)

model.close()
flat.close()


def test_do_correction_with_flat_subarray(tmp_path, log_watcher):
# make input data
shape = (3, 5, 20, 20)
model = make_small_rate_model(shape)

# make a flat image larger than the input data
flat_shape = (50, 50)
flat = make_flat_model(model, shape=flat_shape)
flat_file = str(tmp_path / 'flat.fits')
flat.save(flat_file)

# multiply the data by the flat to mock real structure
model.data *= flat.data[:20, :20]

log_watcher.message = 'Extracting matching subarray'
cleaned, _, _, _, status = cfn.do_correction(model, flat_filename=flat_file)
assert status == 'COMPLETE'
log_watcher.assert_seen()

# output is the same as input: flat structure is not removed by the
# cleaning process
assert np.all(cleaned.data == model.data)

model.close()
flat.close()
45 changes: 44 additions & 1 deletion jwst/clean_flicker_noise/tests/test_clean_flicker_noise_step.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,24 @@
import logging
import os

import pytest
from stdatamodels.jwst import datamodels

from jwst.clean_flicker_noise import CleanFlickerNoiseStep
from .test_clean_flicker_noise import make_small_ramp_model
from jwst.clean_flicker_noise.tests.test_clean_flicker_noise import (
make_small_ramp_model, make_nirspec_fs_model)
from jwst.tests.helpers import LogWatcher


@pytest.fixture
def log_watcher(monkeypatch):
# Set a log watcher to check for a log message at any level
# in CleanFlickerNoiseStep
watcher = LogWatcher('')
logger = logging.getLogger('stpipe.CleanFlickerNoiseStep')
for level in ['debug', 'info', 'warning', 'error']:
monkeypatch.setattr(logger, level, watcher)
return watcher


@pytest.mark.parametrize('skip', [True, False])
Expand Down Expand Up @@ -80,3 +94,32 @@ def test_save_noise(tmp_path):
input_model.close()
cleaned.close()
noise_model.close()


def test_apply_flat(log_watcher):
input_model = make_small_ramp_model()

log_watcher.message = 'Using FLAT'
cleaned = CleanFlickerNoiseStep.call(input_model, skip=False, apply_flat_field=True)
log_watcher.assert_seen()

# Flat file was used, but flat_field step was not applied
assert cleaned.meta.ref_file.flat.name is not None
assert cleaned.meta.cal_step.flat_field is None

input_model.close()
cleaned.close()


def test_apply_flat_not_available(log_watcher):
input_model = make_nirspec_fs_model()

log_watcher.message = 'Flat correction is not available'
cleaned = CleanFlickerNoiseStep.call(input_model, skip=False, apply_flat_field=True)
log_watcher.assert_seen()

# Flat file was not used but step proceeded
assert cleaned.meta.ref_file.flat.name == 'N/A'

input_model.close()
cleaned.close()

0 comments on commit 084ee4a

Please sign in to comment.