Source code for

import sqlite3
import sys
from functools import partial
from pathlib import Path

import numpy as np
import pandas as pd
from IPython.display import display

from prose.console_utils import info, progress

from .io import fits_to_df, get_files

# Convenience
# -----------
SQL_DAYS_BETWEEN = "date >= date('{date}', '-{past:.0f} days') AND date <= date('{date}', '+{future:.0f} days')"

QMARKS_UNIQUE = ",".join(["?"] * len(UNIQUE_FIELDS.split(",")))

PWD = Path(__file__).parent

def in_value(value):
    return f"'{value}'" if isinstance(value, str) else value

def exposure_constraint(exposure=0, tolerance=1000000):
    return f"exposure between {exposure-tolerance} and {exposure+tolerance}"

# ------------

[docs]class FitsManager: """ A class for managing FITS files. Parameters ---------- folders : str or list of str, optional The folder(s) to search for FITS files. If not provided, `files` must be provided. files : str or list of str, optional The file(s) to read. If not provided, `folders` must be provided. depth : int, optional The subfolder depth to search for files in the folder(s). Default is 0 (search only in the provided folder(s)). hdu : int, optional The HDU to read from the FITS file. Default is 0 (the primary HDU). extension : str, optional The extension of the FITS file(s) to search for. Default is ".f*t*" (search for all FITS file extensions). file : str, optional The name of the SQLite database file to use. Default is ":memory:" (create an in-memory database). batch_size : int or bool, optional The number of files to store in the databse at once. If `False`, read all files at once. Default is `False`. This is to allow a scanning of a large number of files that are still saved in the database if an error occurs. scan : callable, optional The function used to retrieve files from a folder. Signature is scan(folder) -> list of file paths. Default is `None`. verbose : bool, optional Whether to display progress information. Default is `True`. to_df : function, optional A function to use for converting FITS files to pandas DataFrames. Default is `None`. telescope : str, optional The name of the telescope used to take the FITS files. Default is `None`. Attributes ---------- con : sqlite3.Connection The SQLite database connection. cur : sqlite3.Cursor The SQLite database cursor. fits_to_df : function The function used for converting FITS files to pandas DataFrames. """
[docs] def __init__( self, folders=None, files=None, depth=0, hdu=0, extension=".f*t*", file=None, batch_size=False, scan=None, verbose=True, to_df=None, telescope=None, ): if file is None: file = ":memory:" self.con = sqlite3.connect(file) self.cur = self.con.cursor() # check if file Table exists tables = list( self.cur.execute("SELECT name FROM sqlite_master WHERE type='table';") ) if len(tables) == 0: db_creation = open(PWD / "create_fm_db.sql", "r").read() self.cur.executescript(db_creation) if folders is not None: assert files is None, "Only 'folders' or 'files' must be provided, not both" files = self.get_files(folders, extension, depth=depth, scan=scan) if to_df is None: if telescope is None: self.fits_to_df = fits_to_df else: self.fits_to_df = partial(fits_to_df, telescope=telescope) else: self.fits_to_df = to_df if files is not None: if len(files) > 0: self.scan_files( files, batch_size=batch_size, hdu=hdu, verbose=verbose, telescope=telescope, )
def _insert( self, path, date, telescope, type, target, filter, dimensions, _, jd, exposure, id=None, update_obs=True, ): """Insert FITS data to object database""" if isinstance(filter, float): filter = "" else: filter = filter or "" telescope = telescope or "" target = target or "" id = id or "NULL" width, height = dimensions filter = filter.replace("'", "p") # update observation if update_obs: obs = (date, telescope, filter, target, type, width, height, exposure) self.con.execute( f"INSERT or IGNORE INTO observations({UNIQUE_FIELDS}, files) VALUES ({QMARKS_UNIQUE}, 0)", obs, ) query = " AND ".join( [ f"{str(key)} = {in_value(value)}" for key, value in zip(UNIQUE_FIELDS_LIST, obs) ] ) id = self.con.execute( f"SELECT id FROM observations where {query}" ).fetchall()[0][0] self.con.execute( f"UPDATE observations SET files = files + 1 WHERE id = {id}" ) # or IGNORE to handle the unique constraint self.cur.execute( f"INSERT or IGNORE INTO files VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ( date, path, telescope, filter, type, target, width, height, jd, id, exposure, ), )
[docs] def get_files(self, folders, extension, scan=None, depth=0): """Return paths of files with specific extension in the specified folder(s) Parameters ---------- folders : str or list of str path or list of paths of folders to look into extension : str wildcard pattern for file extension scan : function, optional function to use, called on each folder path, by default None depth : int, optional number of sub-folders to look into, by default 0 """ def _get_files(folder): if scan is None: return get_files(extension, folder, depth=depth) else: return scan(folder) if isinstance(folders, (list, tuple)): files = [] for folder in folders: assert Path(folder).exists(), f"Folder {folder} does not exists" files += _get_files(folder) else: assert Path(folders).exists(), f"Folder {folders} does not exists" files = _get_files(folders) return files
def _path_in(self, path): return ( self.con.execute(f"SELECT * FROM files WHERE path='{path}'").fetchone() is not None )
[docs] def scan_files( self, files, batch_size=False, verbose=True, hdu=0, telescope=None, verbose_new=False, verbose_os=False, leave=False, ): """Scan files and add data to database Parameters ---------- files : list of str or Path paths of files batch_size : bool or int, optional - if False: update database after all FITS files are parsed - if int: update database every time ``batch_size`` FITS files are parse by default False verbose : bool, optional whether to show progress bar during parsing, by default True hdu: int FITS data unit extension where header will be parsed telescope: prose.Telescope telescope to be imposed for these files, by default None verbose_new : bool, optional whether to show how many files are new, by default False verbose_os : bool, optional whether to show OS commands, by default False leave : bool, optional whether to leave progress bar after completion, by default False """ if len(files) > 0: files_to_scan = [path for path in files if not self._path_in(path)] if verbose_new: info(f"{len(files_to_scan)} new files to scan") if len(files_to_scan) > 0: if batch_size is None: batches = [files_to_scan] elif batch_size is False: pass elif batch_size is True: batches = [files_to_scan] else: assert isinstance(batch_size, int), "batch_size must be an int" if len(files_to_scan) < batch_size: batches = [files_to_scan] else: batches = np.array_split( files_to_scan, len(files_to_scan) // batch_size ) _verbose = verbose and batch_size is not False _progress = progress( _verbose, desc="Reading fits", unit="files", leave=leave, ) # to improve: use same progress for batch and non-batch if batch_size is not False: for batch in _progress(batches): try: df = self.fits_to_df( batch, verbose=False, hdu=hdu, verbose_os=verbose_os ) for row in df.values: if telescope is not None: row[2] = self._insert(*row) self.con.commit() except: "ERROR, batch ignored" else: df = self.fits_to_df( files_to_scan, verbose=verbose, hdu=hdu, verbose_os=verbose_os ) for row in df.values: if telescope is not None: row[2] = self._insert(*row) self.con.commit() else: if not verbose_new: f"{len(files_to_scan)} new files to scan" else: raise AssertionError(f"No files provided")
[docs] def observations(self, hide_exposure=True, **kwargs): """return a pandas DataFrame of observations given some metadata constraints in the form of wildcards Parameters ---------- hide_exposure : bool, optional whether to include exposure in the pandas.DataFrame header, by default True **kwargs: wildcards value for telescope, target, filter, type or id, default is '*' Returns ------- pd.DataFrame pandas DataFrame filtered with kwargs wildcards """ columns = { c[1]: "%" for c in self.con.execute("PRAGMA table_info(observations)").fetchall()[ 1:-3 ] } inputs = kwargs.copy() for key, value in inputs.items(): inputs[key] = "%" if value is None else str(value).replace("*", "%") columns.update(inputs) where = " AND ".join( [f"{key} LIKE {in_value(value)}" for key, value in columns.items()] ) query = f"select * from observations where {where}" if hide_exposure: query = f"select *, SUM(files) from observations where {where} GROUP BY date, telescope, target, filter, type" df = self.to_pandas(query) df["files"] df = df.drop(columns=["files", "exposure"]).rename( columns={"SUM(files)": "files"} ) else: query = f"select * from observations where {where}" df = self.to_pandas(query) return df.set_index(["id"])
[docs] def calibrations(self, **kwargs): """return a pandas DataFrame of calibrations observations given some metadata constraints in the form of wildcards. Parameters ---------- **kwargs: wildcards value for telescope, target, filter or id, default is '*' """ darks = self.observations(type="dark", **kwargs) flats = self.observations(type="flat", **kwargs) bias = self.observations(type="bias", **kwargs) return pd.concat([darks, flats, bias], axis=0)
[docs] def files(self, id=None, path=False, exposure=0, tolerance=1000, **kwargs): """Return a pandas DataFrame of files given some metadata constraints in the form of wildcards. Parameters ---------- id : int, optional id of the observation for which files are retrieved, by default None, i.e. all files path : bool, optional whether to include files paths in the pandas.DataFrame header, by default False exposure : int, optional exposure constraint on the files to retrieve, by default 0 tolerance : int, optional tolerance on the exposure constraint, by default 1000. For example: if exposure is set to 10 and tolerance to 2, all files with exposure = 10 +- 2 will be retrieved """ columns = { c[1]: "%" for c in self.con.execute("PRAGMA table_info(files)").fetchall() } if not path: del columns["path"] columns["id"] = id or "%" inputs = kwargs.copy() for key, value in inputs.items(): inputs[key] = "%" if value is None else str(value).replace("*", "%") columns.update(inputs) where = " AND ".join( [f"{key} LIKE {in_value(value)}" for key, value in columns.items()] ) where += f" AND {exposure_constraint(exposure, tolerance)}" del columns["id"] df = self.to_pandas( f"select {','.join(columns.keys())} from files where {where} order by jd" ) return df
[docs] def paths(self, **kwargs): """Get the paths of all files matching the kwargs query (see prose.FitsImage.files) Returns ------- list list of files paths """ return self.files(**kwargs, path=True).path.values
[docs] def observation_files( self, i, past=1e3, future=0, tolerance=1e15, same_telescope=True, lights="images", show=True, ): """ Return a dictionary of files for a given observation ID, along with calibration files. Parameters ---------- i : int id of the observation for which files are retrieved. past : float, optional Number of days in the past to consider when retrieving calibrartion files, by default 1e3. future : float, optional Number of days in the future to consider when retrieving calibrartion files, by default 0. tolerance : float, optional Tolerance on the exposure constraint, by default 1e15. For example: if exposure is set to 10 and tolerance to 2, all files with exposure = 10 +- 2 will be retrieved. same_telescope : bool, optional Whether to retrieve files from the same telescope as the observation, by default True. lights : str, optional key of images files in the return dict, by default "images". show : bool, optional Whether to print the pandas dataframe of returned files, by default True. Returns ------- dict A dictionary containing the files for the given observation id, along with the associated calibration files. """ files = {} obs_dict = self.observations(id=i, hide_exposure=False).to_dict("records")[0] sql_days = SQL_DAYS_BETWEEN.format( date=obs_dict["date"], future=future, past=past ) sql_exposure = exposure_constraint( exposure=obs_dict["exposure"], tolerance=tolerance ) files[lights] = self.to_pandas( f"SELECT path from files where id = {i} order by jd" ).values.flatten() dfs = [] if show: dfs.append(self.to_pandas(f"SELECT * from observations where id = {i}")) for type in ("dark", "bias", "flat"): fields = ["width", "height"] if same_telescope: fields.append("telescope") if type == "flat": fields.append("filter") query = " AND ".join( [f"{key} = {in_value(obs_dict[key])}" for key in fields] ) query += f" AND type = '{type}'" query = query.format(**obs_dict) obs_ids = self.to_pandas( f"""SELECT id FROM observations WHERE {sql_exposure} AND {query} AND date = (SELECT MAX(date) FROM files WHERE {sql_days} AND {query}) """ ).values.flatten() if show: dfs.append( self.to_pandas( f"""SELECT * FROM observations WHERE {sql_exposure} AND {query} AND type = '{type}' AND date = (SELECT MAX(date) FROM files WHERE {sql_days} AND {query}) """ ) ) _files = [ self.to_pandas( f"select path from files where id={j} order by jd" ).values.flatten() for j in obs_ids ] if len(_files) > 0: _files = np.hstack(_files) files[type + ("s" if type[-1] != "s" else "")] = _files if show: df = pd.concat(dfs, axis=0).set_index(["id"]) display(df) return files
@property def unique_obs(self): """Return whether the object contains a unique observation (observation is defined as a unique combinaison of date, telescope, target and filter). Returns ------- bool """ return len(self.observations(show=False)) == 1 @property def all_images(self): """fits paths of the observation science images Returns ------- list of str """ return self.files(type="light", path=True).path.values
[docs] def images(self, i, show=False, **kwargs): """ Return the paths of the observation science images for a given observation id. Parameters ---------- i : int The observation id. show : bool, optional Whether to show the pandas dataframe of the returned files, by default False. **kwargs : dict, optional Additional arguments to pass to the `observation_files` method. Returns ------- list of str The fits paths of the observation science images. """ return self.observation_files(i, show=show, **kwargs)["images"]
@property def all_darks(self): """fits paths of the observation dark images Returns ------- list of str """ return self.files(type="dark", path=True).path.values
[docs] def bias(self, i, show=False, **kwargs): """ Return the paths of the bias images associated to a given observation. Parameters ---------- i : int The index of the observation. show : bool, optional Whether to display the pandas dataframe of the files being returned, by default False. **kwargs : dict Additional keyword arguments to pass to the `observation_files` method. Returns ------- list of str The fits paths of the bias images. """ return self.observation_files(i, show=show, **kwargs)["bias"]
@property def all_bias(self): """fits paths of the observation bias images Returns ------- list of str """ return self.files(type="bias", path=True).path.values
[docs] def darks(self, i, show=False, **kwargs): """ Return the paths of the dark images associated to a given observation. Parameters ---------- i : int The index of the observation. show : bool, optional Whether to display the pandas dataframe of the files being returned, by default False. **kwargs : dict Additional keyword arguments to pass to the `observation_files` method. Returns ------- list of str The fits paths of the observation dark images. """ return self.observation_files(i, show=show, **kwargs)["darks"]
@property def all_flats(self): """fits paths of the observation flats images Returns ------- list of str """ return self.files(type="flat", path=True).path.values
[docs] def flats(self, i, show=False, **kwargs): """ Return the paths of the flat images associated to a given observation. Parameters ---------- i : int The index of the observation. show : bool, optional Whether to display the pandas dataframe of the files being returned, by default False. **kwargs : dict Additional keyword arguments to pass to the `observation_files` method. Returns ------- list of str The fits paths of the observation flat images. """ return self.observation_files(i, show=show, **kwargs)["flats"]
@property def stack(self): """fits paths of the observation stack image if present Returns ------- list of str """ return self.files(imtype="stack") @property def reduced(self): """fits paths of the observation calibrated images if present Returns ------- list of str """ return self.files(imtype="reduced")
[docs] def label(self, i): """ Return a string label for the observation with the given index. Parameters ---------- i : int The index of the observation. Returns ------- str A string label in the format "{telescope}_{date}_{target}_{filter}". """ date, telescope, filter, _, target, *_ = self.observations(id=i).values[0] return f"{telescope}_{date.replace('-', '')}_{target}_{filter}"
@property def obs_name(self): """Observation name ({telescope}_{date}_{target}_{filter}) if a single observation is present""" if self.unique_obs: return self.label() else: raise AssertionError( "obs_name property is only available for FitsManager containing a unique observation" ) def __repr__(self): return str(self.observations()) def _repr_html_(self): return self.observations()._repr_html_()
[docs] def to_pandas(self, query): """ Execute a SQL query and return the result as a pandas DataFrame. Parameters ---------- query : str The SQL query to execute. Returns ------- pandas.DataFrame The result of the query as a pandas DataFrame. """ return pd.read_sql_query(query, self.con)
def _update_observations(self, verbose=False): """ Slow! This should never be used (use update_obs=True in _insert) """ observations = self.to_pandas( f"select {UNIQUE_FIELDS} from files WHERE id is NULL GROUP BY {UNIQUE_FIELDS}" ).values _progress = progress(verbose, desc="observations scan", unit=" obs") for obs in _progress(observations): # insert obs self.con.execute( f"INSERT or IGNORE INTO observations({UNIQUE_FIELDS}, files) VALUES ({QMARKS_UNIQUE}, 0)", obs, ) self.con.commit() # get its id query = " AND ".join( [ f"{str(key)} = {in_value(value)}" for key, value in zip(UNIQUE_FIELDS_LIST, obs) ] ) obs_id = self.con.execute( f"SELECT id FROM observations where {query}" ).fetchall()[0][0] # and fill files id values self.con.execute( f"UPDATE files SET id = ? WHERE id is NULL AND {query}", [obs_id] ) files_updated = self.con.execute("select changes()").fetchall()[0][0] self.con.execute( "UPDATE observations SET files = files + ? WHERE id = ?", [files_updated, obs_id], )