Source code for prose.fluxes

import pickle
import warnings
from copy import deepcopy
from dataclasses import asdict, dataclass
from functools import partial
from pathlib import Path
from typing import Union

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from prose import utils


def weights(
    fluxes: np.ndarray, tolerance: float = 1e-3, max_iteration: int = 200, bins: int = 5
):
    """Returns the weights computed using Broeg 2005

    Parameters
    ----------
    fluxes : np.ndarray
        fluxes matrix with dimensions (star, flux)
    tolerance : float, optional
        the minimum standard deviation of weights difference to attain (meaning weights are stable), by default 1e-3
    max_iteration : int, optional
        maximum number of iterations to compute weights, by default 200
    bins : int, optional
        binning size (in number of points) to compute the white noise, by default 5

    Returns
    -------
    np.ndarray
        Broeg weights
    """

    # normalize
    dfluxes = fluxes / np.expand_dims(np.nanmean(fluxes, -1), -1)

    def weight_function(fluxes):
        return 1 / np.std(fluxes, axis=-1)

    i = 0
    evolution = 1e25
    lcs = None
    weights = None
    last_weights = np.zeros(dfluxes.shape[0 : len(dfluxes.shape) - 1])

    # Broeg 2004 algorithm to find weights of comp stars
    # --------------------------------------------------
    while evolution > tolerance and i < max_iteration:
        if i == 0:
            weights = weight_function(dfluxes)
            mask = np.where(~np.isfinite(weights))
        else:
            # This metric is preferred from std to optimize over white noise and not red noise
            weights = weight_function(lcs)

        weights[~np.isfinite(weights)] = 0

        evolution = np.abs(
            np.nanmean(weights, axis=-1) - np.nanmean(last_weights, axis=-1)
        )

        last_weights = weights
        lcs, _ = diff(dfluxes, weights=weights)

        i += 1

    weights[0, mask] = 0

    return weights[0]


def diff(fluxes: np.ndarray, weights: np.ndarray = None, errors: np.ndarray = None):
    """Returns differential fluxes.

    If weights are specified, they are used to produce an artificial light curve by which all fluxes are differentiated (see Broeg 2005).
    `fluxes` must not contain NaNs.

    Parameters
    ----------
    fluxes : np.ndarray
        fluxes matrix with dimensions (star, flux) or (aperture, star, flux)
    weights :np.ndarray, optional
        weights matrix with dimensions (star) or (aperture, star), by default None which simply returns normalized fluxes
    errors: np.ndarray, optionnal
        errors matrix with the same dimensions as fluxes, (star, flux) or (aperture, star, flux)

    Returns
    -------
    np.ndarray
        Differential fluxes if weights is provided, else normalized fluxes
    """
    diff_fluxes = fluxes / np.expand_dims(np.nanmean(fluxes, -1), -1)
    if weights is not None:
        # not to divide flux by itself
        sub = np.expand_dims((~np.eye(fluxes.shape[-2]).astype(bool)).astype(int), 0)
        weighted_fluxes = diff_fluxes * np.expand_dims(weights, -1)
        # see broeg 2005
        artificial_light_curve = (sub @ weighted_fluxes) / np.expand_dims(
            weights @ sub[0], -1
        )
        diff_fluxes = diff_fluxes / artificial_light_curve
        if errors is not None:
            diff_errors = errors / np.expand_dims(np.nanmean(fluxes, -1), -1)
            weighted_errors = diff_errors**2 * np.expand_dims(weights, -1) ** 2
            squarred_art_error = (sub @ weighted_errors) / np.expand_dims(
                weights**2 @ sub[0], -1
            )
            diff_errors = np.sqrt(diff_errors**2 + squarred_art_error)
        else:
            diff_errors = None

    return diff_fluxes, diff_errors


def auto_diff_1d(fluxes, i=None, errors=None):
    w = weights(fluxes)

    # to retain minimal set of comparison stars
    if i is not None:
        idxs = np.argsort(w)[::-1]
        white_noise = utils.binned_nanstd(fluxes)
        last_white_noise = 1e10

        def best_weights(j):
            _w = w.copy()
            _w[idxs[j::]] = 0.0
            _w[i] = 0.0
            return _w

        for j in range(w.shape[-1]):
            _w = best_weights(j)
            _df, _ = diff(fluxes, _w)
            _white_noise = np.take(white_noise(_df), i, axis=-1)[0]
            if not np.isfinite(_white_noise):
                continue
            if _white_noise < last_white_noise:
                last_white_noise = _white_noise
            else:
                break

        w = best_weights(j - 1)

    df, derr = diff(fluxes, w, errors)
    df = df.reshape(fluxes.shape)

    if derr is not None:
        derr = derr.reshape(errors.shape)

    return df, w, derr


def auto_diff(fluxes: np.array, i: int = None, errors=None):
    if errors is not None:
        if fluxes.ndim == 3:
            auto_diffs = [auto_diff_1d(f, i, e) for f, e in zip(fluxes, errors)]
            w = [a[1] for a in auto_diffs]
            fluxes = np.array([a[0] for a in auto_diffs])
            errors = np.array([a[2] for a in auto_diffs])
            return fluxes, np.array(w), errors
        else:
            return auto_diff_1d(fluxes, i, errors)
    else:
        if fluxes.ndim == 3:
            auto_diffs = [auto_diff_1d(f, i) for f in fluxes]
            w = [a[1] for a in auto_diffs]
            fluxes = np.array([a[0] for a in auto_diffs])
            return fluxes, np.array(w), None
        else:
            return auto_diff_1d(fluxes, i)


def optimal_flux(diff_fluxes, method="stddiff", sigma=4):
    fluxes = diff_fluxes.copy()
    fluxes = fluxes[
        ...,
        np.all(
            (fluxes - np.median(fluxes, 1)[..., None])
            < sigma * np.std(fluxes, 1)[..., None],
            0,
        ),
    ]
    if method == "binned":
        white_noise = utils.binned_nanstd(fluxes)
        criterion = white_noise(fluxes)
    elif method == "stddiff":
        criterion = utils.std_diff_metric(fluxes)
    elif method == "stability":
        criterion = utils.stability_aperture(fluxes)
    else:
        raise ValueError("{} is not a valid method".format(method))

    i = np.argmin(criterion)
    return i


[docs]@dataclass class Fluxes: """Photometric fluxes, from single to multiple stars and apertures. Can hold other measurements time-series, errors and apertures properties. """ fluxes: np.ndarray """Fluxes either as 1, 2, or 3 dimensional arrays, with following dimensions - 1: (time) - 2: (star, time) - 3: (aperture, star, time) """ time: np.ndarray = None """Array of observed time""" errors: np.ndarray = None """Errors with same shape as :code:`fluxes`""" data: dict = None """A dict of data time-series, each with the same shape as :code:`time`""" apertures: np.ndarray = None """Apertures radii""" weights: np.ndarray = None """Fluxes weights (from differential photometry)""" target: int = None """Index of selected target""" aperture: int = None """Index of selected aperture""" metadata: dict = None """Metadata""" @property def _is_target_aperture_set(self): """Check if target and aperture are set depending on `fluxes` dimensions. Returns ------- bool whether target and aperture are set """ if self.ndim == 1: return True if self.ndim == 2: return self.target is not None else: return self.target is not None and self.aperture is not None def __post_init__(self): assert self.fluxes.ndim in [1, 2, 3], "fluxes must be 1, 2 or 3 dimensional" if self.data is None: self.data = {} if self.ndim == 1: self.target = 0 self.aperture = 0 self.fluxes = self.fluxes.copy()[None, None, :] if self.errors is not None: self.errors = self.errors.copy()[None, None, :] elif self.ndim == 2: self.aperture = 0 self.fluxes = self.fluxes.copy()[None, :] if self.errors is not None: self.errors = self.errors.copy()[None, :] if self.metadata is None: self.metadata = {} def _target_attr(self, name, full=False): assert self.__dict__[name] is not None, f"{name} not provided" assert self.target is not None, "target must be set" if full: return self.__dict__[name][:, self.target] else: assert self.aperture is not None, "aperture must be set" return self.__dict__[name][self.aperture, self.target] @property def flux(self) -> np.array: """Main flux Returns ------- np.array Main flux """ return self._target_attr("fluxes") @property def error(self) -> np.array: """Error of the target flux""" return self._target_attr("errors") @property def shape(self): """shape of fluxes""" return self.fluxes.shape @property def ndim(self): """Number of dimensions of fluxes""" return self.fluxes.ndim @property def comparisons(self): """Comparison stars indices ordered from most to less weighted""" if self.weights is None: return None else: if self.aperture is None: raise ValueError("aperture must be set") idxs = np.argsort(self.weights[self.aperture])[::-1] return idxs[np.flatnonzero(self.weights[self.aperture][idxs] > 0.0)]
[docs] def vander(consant=True, **kwargs): pass
[docs] def diff(self, comps: np.ndarray = None): """Differential photometry. `Fluxes.fluxes` must not contain NaNs. Parameters ---------- comps : np.ndarray, optional index of comparison stars, by default None Returns ------- differential :code:`Fluxes` """ if comps is not None: weights = np.zeros(self.fluxes[0:2]) weights[:, comps] = 1 else: weights = None _new = deepcopy(self) _new.weights = weights diff_fluxes, diff_errors = diff(self.fluxes, weights, self.errors) _new.fluxes = diff_fluxes _new.errors = diff_errors return _new
[docs] def autodiff(self): """Automatic differential photometry with Broeg et al. 2005. `Fluxes.fluxes` must not contain NaNs. Returns ------- differential :code:`Fluxes` """ with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) diff_fluxes, weights, diff_errors = auto_diff( self.fluxes, self.target, self.errors ) _new = deepcopy(self) _new.fluxes = diff_fluxes _new.errors = diff_errors _new.weights = weights _new.aperture = _new.best_aperture_index() return _new
[docs] def best_aperture_index(self, method="stddiff", sigma=4): """Find index of best aperture Parameters ---------- method : str, optional Method to find best aperture, by default "stddiff" Returns ------- int index of best aperture """ i = optimal_flux(self._target_attr("fluxes", full=True), method, sigma=sigma) return i
[docs] def estimate_best_aperture( self, target: int = None, method: str = "stddiff", sigma=4 ): """Inplace setting of best aperture. Parameters ---------- target : int, optional Index of target, by default None method : str, optional Method to find best aperture, by default "stddiff" """ if target is None: target = self.target self.aperture = self.best_aperture_index(method=method, sigma=sigma)
def _estimate_error(self): pass
[docs] def plot(self, marker=".", color="0.8", ls="", ax=None, **kwargs): """Plot light curve Parameters ---------- marker : str, optional Marker style, by default "." color : str, optional Marker color, by default "0.8" ls : str, optional Line style, by default "" ax : _type_, optional Matplotlib axis, by default None which takes :code:`plt.gca()` """ if ax is None: ax = plt.gca() kwargs.update(dict(marker=marker, color=color, ls=ls)) if self.time is None: ax.plot(self.flux, **kwargs) else: ax.plot(self.time, self.flux, **kwargs)
[docs] def errorbar(self, color="k", fmt=".", **kwargs): """Error bar plot of the light curve Parameters ---------- color : str, optional Marker color, by default "k" fmt : str, optional Error bar plot style, by default "." """ kwargs.update(dict(color=color, fmt=fmt)) plt.errorbar(self.time, self.flux, self.error, **kwargs)
[docs] def bin(self, size: float, estimate_error: bool = False) -> "Fluxes": """Returns a :code:`Fluxes` instance with binned time series Parameters ---------- size : float bin size in same unit as :code:`self.time` estimate_error : bool, optional whether to estimate error as the standard deviation of flux in each bin, by default False Returns ------- _type_ _description_ """ if isinstance(size, float): assert ( self.time is not None ), "using a float bin size requires time to be set" time = self.time if self.time is not None else np.arange(self.fluxes.shape[-1]) idxs = utils.index_binning(time, size) _new = deepcopy(self) _new.fluxes = np.array([np.mean(self.fluxes.T[i], 0) for i in idxs]).T if self.time is not None: _new.time = np.array([np.mean(self.time[i], 0) for i in idxs]) if self.errors is not None: _new.errors = np.array( [np.mean(self.errors.T[i], 0) / np.sqrt(len(i)) for i in idxs] ).T if estimate_error: _new.errors = np.array( [np.std(self.fluxes.T[i], 0) / np.sqrt(len(i)) for i in idxs] ).T elif self.errors is not None: _new.errors = np.array( [np.sqrt(np.sum(np.power(self.errors[i], 2))) / len(i) for i in idxs] ).T return _new
[docs] def save(self, path: Union[str, Path]): """Save fluxes to file Parameters ---------- path : Union[str, Path] path of the file """ with open(path, "wb") as f: pickle.dump(asdict(self), f)
[docs] def load(path: Union[str, Path]): """Load fluxes from file""" with open(path, "rb") as f: return Fluxes(**pickle.load(f))
[docs] def copy(self): """Deep copy of the object""" return deepcopy(self)
@property def dataframe(self): """Pandas dataframe of the fluxes and associated measurements""" df_dict = self.data.copy() df_dict.update({"time": self.time}) if self._is_target_aperture_set: df_dict.update({"flux": self.flux}) return pd.DataFrame(df_dict) @property def df(self): """Pandas dataframe of the fluxes and associated measurements""" return self.dataframe
[docs] def mask(self, array): """Mask time-dependant fluxes attributes (time, fluxes, errors, data) Parameters ---------- m : np.array of bool mask Returns ------- Fluxes masked Fluxes """ _new = self.copy() _new.data = {key: value[array] for key, value in self.data.items()} if self.fluxes is not None: _new.fluxes = self.fluxes[..., array] if self.errors is not None: _new.errors = self.errors[..., array] if self.time is not None: _new.time = self.time[array] if self.apertures is not None: _new.apertures = self.apertures[array, ...] return _new
[docs] def sigma_clipping_data(self, iterations: int = 5, **kwargs): """Return a Fluxes instance masked using iteratively sigma clipped data. Parameters ---------- it : int, optional iterations, by default 5 **kwargs: dict dict where keys are the names of the data to sigma clip and value are the sigma Returns ------- Fluxes sigma clipped Fluxes """ mask = np.ones_like(self.time).astype(bool) for _ in range(iterations): for name, sigma in kwargs.items(): value = self.data[name].copy() value[~mask] = np.nan m = np.abs(value - np.nanmean(value)) < np.nanstd(value) * sigma mask = mask & m return self.mask(mask)
[docs] def sigma_clip_flux(self, iterations: int = 5, sigma: float = 4.0): """Return a Fluxes instance masked using iteratively sigma clipping. Parameters ---------- it : int, optional iterations, by default 5 sigma: float sigma, by default 4.0 Returns ------- Fluxes sigma clipped Fluxes """ flux = self.flux.copy() mask = np.ones_like(self.time).astype(bool) for _ in range(iterations): mask &= np.abs(flux - np.nanmean(flux)) < np.nanstd(flux[mask]) * sigma return self.mask(mask)
[docs] def mask_stars(self, mask: np.array, keep_indexing: bool = True): """Mask stars fluxes. In order to keep indexing, the fluxes are set to -1. Parameters ---------- mask : np.array A boolean array of the same length as the number of stars, indicating which fluxes should be masked remove : bool, optional whether to keep indexing (recommended) and only set pixels to 1, by default False Returns ------- Fluxes A new Fluxes instance with masked stars """ copy = self.copy() if not keep_indexing: copy.fluxes = self.fluxes[..., mask, :] if copy.errors is not None: copy.errors = self.errors[..., mask, :] if copy.weights is not None: copy.weights = self.weights[..., mask, :] else: copy.fluxes[:, ~mask] = -1 if copy.errors is not None: copy.errors[:, ~mask] = -1 if copy.weights is not None: copy.errors[:, ~mask] = 0 return copy