from functools import partial
from pathlib import Path
from typing import Union
import numpy as np
from astropy.io import fits
from prose.console_utils import info
from prose.core import Block, FITSImage, Image, Sources
from prose.fluxes import Fluxes
from prose.utils import easy_median
__all__ = [
"LimitSources",
"Apply",
"SortSources",
"Get",
"Calibration",
"CleanBadPixels",
"Del",
"GetFluxes",
"WriteTo",
"SelectiveStack",
]
# TODO: document and test
[docs]class SortSources(Block):
def __init__(self, verbose=False, key="cutout_sum", name=None):
"""Sort sources given a function
TODO
Parameters
----------
verbose : bool, optional
_description_, by default False
key : str, optional
_description_, by default "cutout_sum"
name : _type_, optional
_description_, by default None
Returns
-------
_type_
_description_
"""
super().__init__(name, verbose, read=["sources"])
if isinstance(key, str):
if key == "cutout_sum":
def key(cutout):
return np.nansum(cutout.data)
assert callable(key)
self.key = key
def run(self, image: Image):
keys = np.array([self.key(cutout) for cutout in image.cutouts])
idxs = np.argsort(keys)[::-1]
sources = image._sources[idxs]
for i, s in enumerate(sources):
s.i = i
image._sources = Sources(sources)
[docs]class Apply(Block):
"""Apply a function to an image.
Parameters
----------
function : callable
function to apply of the form callable(image) -> None
"""
def __init__(self, function: callable, name: str = None):
super().__init__(name=name)
self.function = function
def run(self, image):
self.function(image)
[docs]class Get(Block):
def __init__(self, *attributes, name: str = "get", arrays: bool = True, **getters):
"""Retrieve and store properties from an :py:class:`~prose.Image`.
If a list of paths is provided to a :py:class:`~prose.Sequence`, each image is
created at the beginning of the sequence, and deleted at the end, so that
computed data stored as :py:class:`prose.Image` properties are deleted at each iteration.
This :code:`Get` block provides a way to retain any data stored in images before
they are deleted.
When a sequence is finished, this block has a `values` property, a dictionary
where all retained properties are accessible by name, and consist of a list with
a length corresponding to the number of images processed. The parameters of this
dictionary are the args and kwargs provided to the block (see Example).
Parameters
----------
*attributes: str
names of properties to retain
name : str, optional
name of the block, by default "get"
arrays : bool, optional
whether to convert each array of data as a numpy array , by default True
**getters: function
name and functions
Example
-------
.. code-block:: python
from prose import example_image
from prose import blocks
# example image
image = example_image()
# running the block
block = blocks.Get(image_shape = lambda im: im.shape)
block.run(image)
# getting the output
block.image_shape
.. code-block::
[array([600, 600])]
"""
super().__init__(name=name)
new_getters = {}
def get_from_header(image, key=None):
return image.header[key]
def get(image, key=None):
return getattr(image, key)
for attr in attributes:
if "keyword:" in attr:
attr = attr.split("keyword:")[-1]
new_getters[attr.lower()] = partial(get_from_header, key=attr)
else:
new_getters[attr.lower()] = partial(get, key=attr)
getters.update(new_getters)
self.getters = getters
self.values = {name: [] for name in getters.keys()}
self.arrays = arrays
self._parallel_friendly = True
def run(self, image: Image):
for name, get in self.getters.items():
value = get(image)
self.values[name].append(value)
def terminate(self):
if self.arrays:
for key, value in self.values.items():
self.values[key] = np.array(value)
def __getitem__(self, key):
return self.values[key]
def __getattr__(self, key):
if key in self.getters.keys():
return self.values[key]
else:
raise AttributeError()
[docs]class Calibration(Block):
def __init__(
self,
darks: Union[list, np.ndarray] = None,
flats: Union[list, np.ndarray] = None,
bias: Union[list, np.ndarray] = None,
loader=FITSImage,
easy_ram: bool = True,
verbose: bool = True,
shared: bool = False,
**kwargs,
):
"""Flat, Bias and Dark calibration.
|modify|
The provided calibration images can be either:
- a list of paths to FITS files
- a list of :py:class:`~prose.Image` objects
- an array of np.ndarray images
- a single :py:class:`~prose.Image` object
- a single np.ndarray image
- an empty list, in which case the calibration is skipped
- None, in which case the calibration is skipped
Parameters
----------
darks : list or np.ndarray, optional
list of darks, by default None
flats : list or np.ndarray, optional
list of flats, by default None
bias : list or np.ndarray, optional
list of bias, by default None
loader : object, optional
loader used to load str path to :py:class:`~prose.Image`, by default :py:class:`~prose.FITSImage`
easy_ram : bool, optional
whether to compute the master median per chunks, going easy on the RAM, by default True
verbose : bool, optional
whether to log information about master calibration images building, by default True
shared : bool, optional
whether to allow the master calibration images to be shared, useful for multi-processing, by default False
"""
super().__init__(**kwargs)
self.loader = loader
self.easy_ram = easy_ram
self.shapes = {}
def check_input(value):
if value is None:
value = []
elif isinstance(value, np.ndarray):
if len(value) == 0:
value = []
elif value.ndim == 2:
value = [value]
# ndim 1 or 3
else:
value = value.tolist()
if not isinstance(value, (list, np.ndarray)):
value = [value]
return value
self.master_bias = self._produce_master(check_input(bias), "bias")
self.master_dark = self._produce_master(check_input(darks), "dark")
self.master_flat = self._produce_master(check_input(flats), "flat")
if shared:
self._share()
self.verbose = verbose
self.calibration = self._calibration_shared if shared else self._calibration
self._parallel_friendly = shared
def _produce_master(self, images, image_type):
if images is not None:
if not isinstance(images, list):
images = [images]
if len(images) == 0:
images = None
def _median(im):
if self.easy_ram:
return easy_median(im)
else:
return np.median(im, 0)
def _get_data_exposure(image):
if isinstance(image, (str, Path)):
image = self.loader(image)
if isinstance(image, Image):
image_data = image.data
image_exposure = (
image.exposure.value if image.exposure is not None else 1.0
)
elif isinstance(image, np.ndarray):
image_data = image
image_exposure = 1.0
else:
raise ValueError(
f"Unsupported image type, must be a path, Image or np.ndarray (provided {type(image)}"
)
return image_data, image_exposure
_master = []
if images is None:
if self.verbose:
info(f"No {image_type} images set")
if image_type == "bias":
master = np.array([0.0])
elif image_type == "dark":
master = np.array([0.0])
elif image_type == "flat":
master = np.array([1.0])
else:
if self.verbose:
info(f"Building master {image_type}")
for image in images:
image_data, image_exposure = _get_data_exposure(image)
if image_type == "bias":
_master.append(image_data)
elif image_type == "dark":
_dark = (image_data - self.master_bias) / image_exposure
_master.append(_dark)
elif image_type == "flat":
_flat = (
image_data
- self.master_bias
- self.master_dark * image_exposure
)
_flat /= np.mean(_flat)
_master.append(_flat)
del image_data
if len(_master) > 0:
master = _median(_master)
else:
master = None
self.shapes[image_type] = master.shape
return master
def _calibration_shared(self, image, exp_time):
bias = np.memmap(
"__bias.array", dtype="float32", mode="r", shape=self.shapes["bias"]
)
dark = np.memmap(
"__dark.array", dtype="float32", mode="r", shape=self.shapes["dark"]
)
flat = np.memmap(
"__flat.array", dtype="float32", mode="r", shape=self.shapes["flat"]
)
with np.errstate(divide="ignore", invalid="ignore"):
return (image - (dark * exp_time + bias)) / flat
def _calibration(self, image, exp_time):
with np.errstate(divide="ignore", invalid="ignore"):
return (
image - (self.master_dark * exp_time + self.master_bias)
) / self.master_flat
def run(self, image):
data = image.data
exposure = image.exposure.value if image.exposure is not None else 1.0
calibrated_image = self.calibration(data, exposure)
calibrated_image[calibrated_image < 0] = np.nan
calibrated_image[~np.isfinite(calibrated_image)] = -1
image.data = calibrated_image
def _share(self):
for imtype in ["bias", "dark", "flat"]:
data = self.__dict__[f"master_{imtype}"]
m = np.memmap(
f"__{imtype}.array", dtype="float32", mode="w+", shape=data.shape
)
if data.ndim == 2:
m[:, :] = data[:, :]
else:
m[:] = data[:]
del self.__dict__[f"master_{imtype}"]
@property
def citations(self):
return "astropy", "numpy"
[docs]class CleanBadPixels(Block):
def __init__(
self,
bad_pixels_map=None,
darks=None,
flats=None,
min_flat=0.6,
loader=Image,
**kwargs,
):
super().__init__(**kwargs)
self.loader = loader
assert (
darks is not None or bad_pixels_map is not None
), "bad_pixels_map or darks must be specified"
if darks is not None:
info("buidling bad pixels map")
if darks is not None:
max_dark = self.loader(darks[0]).data
min_dark = self.loader(darks[0]).data
for im in darks:
data = self.loader(im).data
max_dark = np.max([max_dark, data], axis=0)
min_dark = np.min([min_dark, data], axis=0)
master_max_dark = self.loader(data=max_dark).data
master_min_dark = self.loader(data=min_dark).data
theshold = 3 * np.std(master_max_dark)
median = np.median(master_max_dark)
hots = np.abs(master_max_dark) - median > theshold
deads = master_min_dark < median / 2
self.bad_pixels = np.where(hots | deads)
self.bad_pixels_map = np.zeros_like(master_min_dark)
if flats is not None:
_flats = []
for flat in flats:
data = self.loader(flat).data
_flats.append(data / np.mean(data))
master_flat = easy_median(_flats)
master_flat = self.clean(master_flat)
bad_flats = np.where(master_flat < min_flat)
if len(bad_flats) == 2:
self.bad_pixels = (
np.hstack([self.bad_pixels[0], bad_flats[0]]),
np.hstack([self.bad_pixels[1], bad_flats[1]]),
)
self.bad_pixels_map[self.bad_pixels] = 1
elif bad_pixels_map is not None:
if isinstance(bad_pixels_map, (str, Path)):
bad_pixels_map = Image(bad_pixels_map).data
elif isinstance(bad_pixels_map, Image):
bad_pixels_map = bad_pixels_map.data
else:
bad_pixels_map = bad_pixels_map
self.bad_pixels_map = bad_pixels_map
self.bad_pixels = np.where(bad_pixels_map == 1)
def clean(self, data):
data[self.bad_pixels] = np.nan
data[data < 0] = np.nan
nans = np.array(np.where(np.isnan(data))).T
padded_data = np.pad(data.copy(), (1, 1), constant_values=np.nan)
for i, j in nans + 1:
mean = np.nanmean(
[
padded_data[i, j - 1],
padded_data[i, j + 1],
padded_data[i - 1, j],
padded_data[i + 1, j],
]
)
padded_data[i, j] = mean
data[i - 1, j - 1] = mean
return data
def run(self, image):
image.data = self.clean(image.data.copy())
[docs]class Del(Block):
def __init__(self, *names, name="del"):
"""Remove a property from an Image
If the property is in `self.computed`, remove it from there.
In general this is use in multi-processing sequences to avoid large image properties to be copied in-between processes
Parameters
----------
*names: str
properties to be deleted from image
name : str, optional
name of the block, by default "del"
"""
super().__init__(name=name)
self.names = names
def run(self, image):
for name in self.names:
if name in image.computed:
del image.computed[name]
else:
setattr(image, name, None)
[docs]class LimitSources(Block):
def __init__(self, min: int = 4, max: int = 10000, name=None):
"""Limit number of sources. If not in between min and max sources, image is discarded
Parameters
----------
min : int, optional
minimum number of sources, by default 4
max : int, optional
maximum number of sources, by default 10000
"""
super().__init__(name=name)
self.min = min
self.max = max
self._parallel_friendly = True
def run(self, image):
n = len(image.sources)
if n < self.min or n > self.max:
image.discard = True
[docs]class GetFluxes(Get):
def __init__(self, *args, time: str = "jd", name: str = None, **kwargs):
"""A conveniant class to get fluxes and background from aperture and annulus blocks
|read| :code:`Image.aperture`, :code:`Image.annulus` and :code:`Image.{time}`
Parameters
----------
time : str, optional
The image property corresponding to time, by default 'jd'
name: str, optional
Name of the block
*args, **kwargs:
args and kwargs of :py:class:`prose.blocks.Get`
"""
self._time_key = time
get_fluxes = lambda im: im.aperture["fluxes"]
def get_bkg(im):
if "annulus" in im.computed.keys():
return im.annulus["median"]
else:
return np.zeros(len(im.sources))
def get_time(im):
if self._time_key in im.computed.keys():
return getattr(im, self._time_key)
elif im.jd is not None:
return im.jd
else:
return im.i
def get_aperture(im):
return im.aperture["radii"]
def get_error(im):
_area = np.pi * im.aperture["radii"] ** 2
_signal = im.aperture["fluxes"] - (
im.annulus["median"][:, None] * _area[None, :]
)
# TODO : figure out the correct CCD equation for error computation
_squarred_error = _signal + _area[None, :] * (
im.read_noise**2 + (im.gain / 2) ** 2 + im.annulus["median"][:, None]
)
return np.sqrt(_squarred_error)
super().__init__(
*args,
_time=get_time,
_bkg=get_bkg,
_fluxes=get_fluxes,
_apertures=get_aperture,
_errors=get_error,
name=name,
**kwargs,
)
self.fluxes = None
self._parallel_friendly = True
def terminate(self):
super().terminate()
area = np.pi * (self._apertures**2)
if len(self._fluxes) == 0:
raise ValueError(
"block has empty fluxes (check if stars are present in image or if image has been discarded)"
)
raw_fluxes = (self._fluxes - self._bkg[:, :, None] * area[:, None, :]).T
time = self._time
data = {"bkg": np.mean(self._bkg, -1)}
data.update({key: value for key, value in self.values.items() if key[0] != "_"})
self.fluxes = Fluxes(
time=time,
fluxes=raw_fluxes,
data=data,
apertures=self._apertures,
errors=self._errors.T,
)
[docs]class WriteTo(Block):
def __init__(
self, destination, label="processed", imtype=True, overwrite=False, name=None
):
"""Write image to FITS file
Parameters
----------
destination : str
destination folder (folder and parents created if not existing)
label : str, optional
added at the end of filename as {original_path}_{label}.fits, by default "processed"
imtype : bool, optional
If bool, whether to set image imtype as label (`image.header["IMTYPE"] = label`). If a `str`, label to set for imtype (`image.header["IMTYPE"] = imtype`) , by default True
overwrite : bool, optional
whether to overwrite existing file, by default False
name : str, optional
name of the block, by default None
"""
super().__init__(name=name)
self.destination = Path(destination)
self.label = label
self.overwrite = overwrite
if isinstance(imtype, bool):
if imtype:
self.imtype = self.label
else:
self.imtype = None
else:
assert isinstance(imtype, str), "imtype must be a bool or a str"
self.imtype = imtype
self.files = []
def run(self, image):
self.destination.mkdir(exist_ok=True, parents=True)
new_hdu = fits.PrimaryHDU(image.data)
new_hdu.header = image.header
if self.imtype is not None:
image.header[image.telescope.keyword_image_type] = self.imtype
fits_new_path = self.destination / (
Path(image.metadata["path"]).stem + f"_{self.label}.fits"
)
new_hdu.writeto(fits_new_path, overwrite=self.overwrite)
self.files.append(fits_new_path)
[docs]class SelectiveStack(Block):
def __init__(self, n=5, name=None):
"""Build a median stack image from the `n` best-FWHM images
|read| :code:`Image.fwhm`
Parameters
----------
n : int, optional
number of images to use, by default 5
name : str, optional
name of the blocks, by default None
"""
super().__init__(name=name)
self.n = n
self._images = []
self._sigmas = []
def run(self, image: Image):
sigma = image.fwhm
if len(self._images) < self.n:
self._images.append(image)
self._sigmas.append(sigma)
else:
i = np.argmax(self._sigmas)
if self._sigmas[i] > sigma:
self._sigmas[i] = sigma
self._images[i] = image
def terminate(self):
self.stack = Image(easy_median([im.data for im in self._images]))