import numpy as np
from astropy.stats import sigma_clipped_stats
from photutils.detection import DAOStarFinder
from scipy.interpolate import interp1d
from skimage.measure import label, regionprops
from prose import Block, Image
from prose.console_utils import info
from prose.core.source import *
__all__ = [
"DAOFindStars",
"SEDetection",
"AutoSourceDetection",
"PointSourceDetection",
"Peaks",
]
class _SourceDetection(Block):
def __init__(
self,
threshold: float = 4,
n: int = None,
sort: bool = True,
min_separation: float = None,
min_area: float = 0,
minor_length: float = 0,
name: str = None,
):
"""Base class for sources detection.
|read| :code:`Image.data`
Parameters
----------
threshold : float, optional
detection threshold for sources, by default 4
n : int, optional
number of sources to detect, by default None
sort : bool, optional
whether to sort per ADU peak value (from the greatest), by default True
min_separation : float, optional
minimum separation in pixels from one source to the other. Between two sources, greater ADU is kept, by default None
min_area : float, optional
minimum area in pixels of the sources to detect, by default 0
minor_length : float, optional
minimum length of semi-major axis of sources to detect, by default 0
name : str, optional
name of the block, by default None
"""
super().__init__(name=name, read=["data"])
self.n = n
self.sort = sort
self.min_separation = min_separation
self.threshold = threshold
self.min_area = min_area
self.minor_length = minor_length
def clean(self, sources):
peaks = np.array([s.peak for s in sources])
_sources = sources.copy()
if len(sources) > 0:
if self.sort:
idxs = np.argsort(peaks)[::-1]
_sources = sources[idxs]
if self.min_separation:
final_sources = _sources.copy()
for s in final_sources:
s.keep = True
for i, s in enumerate(final_sources):
if final_sources[i].keep:
distances = np.linalg.norm(
s.coords - final_sources.coords, axis=1
)
distances[i] == np.nan
idxs = np.flatnonzero(distances < self.min_separation)
for j in idxs[idxs > i]:
final_sources[int(j)].keep = False
_sources = _sources[np.array([s.keep for s in final_sources])]
for i, s in enumerate(_sources):
s.i = i
if self.n is not None:
_sources = _sources[0 : self.n]
return _sources
else:
return []
# TODO: obsolete, redo
def auto_threshold(self, image):
if self.verbose:
info("SegmentedPeaks threshold optimisation ...")
thresholds = np.exp(np.linspace(np.log(1.5), np.log(500), 30))
detected = [len(self._regions(image, t)) for t in thresholds]
self.threshold = interp1d(detected, thresholds, fill_value="extrapolate")(
self.n_stars
)
if self.verbose:
info(f"threshold = {self.threshold:.2f}")
def regions(self, image: Image, threshold=None):
flat_data = image.data.flatten()
median = np.nanmedian(flat_data)
if threshold is None:
threshold = self.threshold
flat_data = flat_data[np.abs(flat_data - median) < np.nanstd(flat_data)]
threshold = threshold * np.nanstd(flat_data) + median
regions = regionprops(label(image.data > threshold), image.data)
regions = [
r
for r in regions
if r.area >= self.min_area and r.axis_major_length >= self.minor_length
]
return regions
@property
def citations(self):
return super().citations + ["scikit-image"]
[docs]class AutoSourceDetection(_SourceDetection):
def __init__(
self,
threshold=4,
n=None,
sort=True,
min_separation=None,
name=None,
min_area=0,
minor_length=0,
):
"""Detect all sources.
|read| :code:`Image.data`
|write| :code:`Image.sources`
Parameters
----------
threshold : float, optional
detection threshold for sources, by default 4
n : int, optional
number of sources to detect, by default None
sort : bool, optional
whether to sort per ADU peak value (from the greatest), by default True
min_separation : float, optional
minimum separation in pixels from one source to the other. Between two sources, greater ADU is kept, by default None
min_area : str, optional
minimum area in pixels of the sources to detect, by default 0
minor_length : str, optional
minimum length of semi-major axis of sources to detect, by default 0
name : str, optional
name of the block, by default None
"""
super().__init__(
threshold=threshold,
n=n,
sort=sort,
min_separation=min_separation,
name=name,
min_area=min_area,
minor_length=minor_length,
)
def run(self, image):
regions = self.regions(image)
sources = np.array([auto_source(region) for region in regions])
image.sources = Sources(self.clean(sources))
[docs]class PointSourceDetection(_SourceDetection):
def __init__(self, unit_euler=False, min_area=3, minor_length=2, **kwargs):
"""Detect point sources (as :py:class:`~prose.core.source.PointSource`).
|read| :code:`Image.data`
|write| :code:`Image.sources`
Parameters
----------
unit_euler : bool, optional
whether to consider sources with euler number == 1, by default False
min_area : str, optional
minimum area in pixels of the sources to detect, by default 0
minor_length : str, optional
minimum length of semi-major axis of sources to detect, by default 0
threshold : float, optional
detection threshold for sources, by default 4
n : int, optional
number of sources to detect, by default None
sort : bool, optional
whether to sort per ADU peak value (from the greatest), by default True
min_separation : float, optional
minimum separation in pixels from one source to the other. Between two sources, greater ADU is kept, by default None
name : str, optional
name of the block, by default None
"""
super().__init__(min_area=min_area, minor_length=minor_length, **kwargs)
self.unit_euler = unit_euler
self.min_area = min_area
self.minor_length = minor_length
def run(self, image):
regions = self.regions(image)
if self.unit_euler:
idxs = np.flatnonzero([r.euler_number == 1 for r in regions])
regions = [regions[i] for i in idxs]
sources = Sources(
np.array([PointSource.from_region(region) for region in regions])
)
image.sources = Sources(self.clean(sources), type="PointSource")
[docs]class TraceDetection(_SourceDetection):
def __init__(self, minor_length=5, **kwargs):
"""Detect trace sources (as :py:class:`~prose.core.source.TraceSource`)
Parameters
----------
minor_length : str, optional
minimum length of semi-major axis of sources to detect, by default 0
min_area : str, optional
minimum area in pixels of the sources to detect, by default 0
threshold : float, optional
detection threshold for sources, by default 4
n : int, optional
number of sources to detect, by default None
sort : bool, optional
whether to sort per ADU peak value (from the greatest), by default True
min_separation : float, optional
minimum separation in pixels from one source to the other. Between two sources, greater ADU is kept, by default None
name : str, optional
name of the block, by default None
"""
super().__init__(minor_length=minor_length, **kwargs)
def run(self, image):
regions = self.regions(image)
sources = np.array([TraceSource.from_region(region) for region in regions])
image.sources = Sources(sources)
# backward compatibility
[docs]class SegmentedPeaks(PointSourceDetection):
def __init__(
self, unit_euler=False, min_area=3, minor_length=2, n_stars=None, **kwargs
):
"""Detect point sources (backward compatibility)
Same as :py:class:`~prose.blocks.PointSourceDetection`
"""
super().__init__(
n=n_stars, min_area=min_area, minor_length=minor_length, **kwargs
)
self.unit_euler = unit_euler
self.min_area = min_area
self.minor_length = minor_length
# TODO: document
[docs]class Peaks(Block):
def __init__(self, shape=11, **kwargs):
"""Computation of peak values for the detected stars (in ADUs).
Parameters
----------
shape : int, optional
size of the cutout image within which the peak is calculated, by default 11
"""
super().__init__(**kwargs)
self.shape = shape
def run(self, image, **kwargs):
idxs = np.arange(len(image.sources))
peaks = np.ones(len(idxs)) * -1
for j in idxs:
cut = image.cutout(image.sources.coords[j], shape=self.shape)
if cut is not None:
peaks[j] = cut.data.max()
image.peaks = peaks
class _SimplePointSourceDetection(_SourceDetection):
def __init__(self, n_stars=None, min_separation=None, sort=True, name=None):
super().__init__(n=n_stars, sort=sort, min_separation=min_separation, name=name)
def run(self, image):
coordinates, peaks = self.detect(image)
sources = np.array(
[PointSource(coords=c, peak=p) for c, p in zip(coordinates, peaks)]
)
image.sources = Sources(self.clean(sources), type="PointSource")
[docs]class DAOFindStars(_SimplePointSourceDetection):
"""
DAOPHOT stars detection with :code:`photutils` implementation.
|write| ``Image.stars_coords`` and ``Image.peaks``
Parameters
----------
sigma_clip : float, optional
sigma clipping factor used to evaluate background, by default 2.5
lower_snr : int, optional
minimum snr (as source_flux/background), by default 5
fwhm : int, optional
typical fwhm of image psf, by default 5
n_stars : int, optional
maximum number of stars to consider, by default None
min_separation : float, optional
minimum separation between sources, by default 5.0. If less than that, close sources are merged
sort : bool, optional
whether to sort stars coordinates from the highest to the lowest intensity, by default True
"""
def __init__(
self,
sigma_clip=2.5,
lower_snr=5,
fwhm=5,
n_stars=None,
min_separation=None,
sort=True,
name=None,
):
super().__init__(
n_stars=n_stars, sort=sort, min_separation=min_separation, name=name
)
self.sigma_clip = sigma_clip
self.lower_snr = lower_snr
self.fwhm = fwhm
def detect(self, image):
_, median, std = sigma_clipped_stats(image.data, sigma=self.sigma_clip)
finder = DAOStarFinder(fwhm=self.fwhm, threshold=self.lower_snr * std)
sources = finder(image.data - median)
coordinates = np.transpose(
np.array([sources["xcentroid"].data, sources["ycentroid"].data])
)
peaks = sources["peak"]
return coordinates, peaks
try:
from sep import extract
except:
raise AssertionError("sep not installed")
[docs]class SEDetection(_SimplePointSourceDetection):
def __init__(
self, threshold=2.5, n_stars=None, min_separation=None, sort=True, name=None
):
"""
Source Extractor detection.
|write| ``Image.sources`` and ``Image.peaks``
Parameters
----------
threshold : float, optional
threshold factor for which to consider pixel as potential sources, by default 1.5
n_stars : int, optional
maximum number of stars to consider, by default None
min_separation : float, optional
minimum separation between sources, by default 5.0. If less than that, close sources are merged
sort : bool, optional
whether to sort stars coordinates from the highest to the lowest intensity, by default True
"""
super().__init__(
n_stars=n_stars, sort=sort, min_separation=min_separation, name=name
)
self.threshold = threshold
def detect(self, image):
data = image.data.byteswap().newbyteorder()
sep_data = extract(image.data, self.threshold * np.median(data))
coordinates = np.array([sep_data["x"], sep_data["y"]]).T
fluxes = np.array(sep_data["flux"])
return coordinates, fluxes
@property
def citations(self):
return super().citations + ["sep"]
[docs]class AlignSources(Block):
pass # TODO