Source code for prose.core.sequence

import sys
from collections import OrderedDict
from functools import partial
from pathlib import Path
from time import time

import multiprocess as mp
import numpy as np
import yaml
from tabulate import tabulate
from tqdm.autonotebook import tqdm

from prose.citations import citations as default_citations
from prose.console_utils import TQDM_BAR_FORMAT, error, warning
from prose.core.image import Buffer, FITSImage, Image
from prose.utils import full_class_name


def progress(name, x, **kwargs):
    return tqdm(x, desc=name, unit="images", **kwargs)


[docs]class Sequence:
[docs] def __init__(self, blocks, name=None): """A sequence of :py:class:`Block` objects to sequentially process images Parameters ---------- blocks : list list of :py:class:`Block` objects name : str, optional name of the sequence, by default None """ self.name = name self.images = [] self.blocks_dict = None self.blocks = blocks self.data = {} self.n_processed_images = None self.last_image = None # initially the buffer must have a size of max(front_size) + max(back_size) in # order to hold all images necessary to all blocks buffer_size = np.max([block.size for block in self.blocks]) self.buffer = Buffer(buffer_size)
def __getattr__(self, item): return self.blocks_dict[item] @property def blocks(self): """list of :py:class:`Block` objects Returns ------- _type_ _description_ """ return list(self.blocks_dict.values()) @blocks.setter def blocks(self, blocks): self.blocks_dict = OrderedDict( { block.name if block.name is not None else "block{}".format(i): block for i, block in enumerate(blocks) } ) def _set_blocks_in_sequence(self, in_sequence): for b in self.blocks: b.in_sequence = in_sequence
[docs] def run(self, images, terminate=True, show_progress=True, loader=FITSImage): """Run the sequence Parameters ---------- images : list, str, :py:class:`Image` :py:class:`Image` object or path (single or as a list) to be processed by the sequence terminate : bool, optional whether to run :py:class:`Sequence.terminate` at the end of the sequence, by default True show_progress : bool, optional whether to show a progress bar, by default True loader : Image sub-class, optional An Image sub-class to load images path(s) of provided as inputs, by default py:class:`Image` """ self._set_blocks_in_sequence(True) self.images = images if not isinstance(images, (str, Path, Image)) else [images] assert len(self.images) != 0, "Empty array or no images provided" if not show_progress: def _p(x, **kwargs): return x self.progress = _p else: self.progress = partial(progress, self.name) if isinstance(self.images, list): if len(self.images) == 0: raise ValueError("No images to process") elif self.images is None: raise ValueError("No images to process") # run self.n_processed_images = 0 self.discards = {} self._run(loader=loader) for block_name, discarded in self.discards.items(): warning( f"{block_name} discarded image{'s' if len(discarded)>1 else ''} {', '.join(discarded)}" ) if terminate: self.terminate()
def _load(self, image, loader=FITSImage): _image = loader(image) if isinstance(image, (str, Path)) else image return _image def _run(self, loader=FITSImage): self.buffer.loader = partial(self._load, loader=loader) self.buffer.init(self.images) for i, buffer in enumerate(self.progress(self.buffer, total=len(self.images))): buffer.current.i = i self.last_image = buffer.current for block in self.blocks: block._run(buffer) # This allows to discard image in any Block if buffer.current is not None: if buffer.current.discard: self._add_discard(type(block).__name__, buffer.current.i) break self.n_processed_images += 1
[docs] def terminate(self): """Run the :py:class:`Block.terminate` method of all blocks""" for block in self.blocks: block._terminate() self._set_blocks_in_sequence(False)
def __str__(self): rows = [ [ i, block.name, block.__class__.__name__, f"{block.processing_time:.3f} s ({(block.processing_time/self.processing_time)*100:.0f}%)", ] for i, block in enumerate(self.blocks) ] headers = ["index", "name", "type", "processing"] return tabulate(rows, headers, tablefmt="fancy_grid") def __repr__(self) -> str: return self.__str__() @property def processing_time(self): """Total processing time of the sequence last run""" return np.sum([block.processing_time for block in self.blocks]) def __getitem__(self, item): return self.blocks[item] # io # -- def _add_discard(self, discard_block, i): if discard_block not in self.discards: self.discards[discard_block] = [] self.discards[discard_block].append(str(i)) @property def args(self): blocks = [] for block in self.blocks: blocks.append({"block": full_class_name(block), **block.args}) return blocks
[docs] @classmethod def from_args(cls, args): import prose blocks = [] for block_dict in args: block_class = block_dict["block"] del block_dict["block"] block = eval(block_class).from_args(block_dict) blocks.append(block) return cls(blocks)
@property def params_str(self): return yaml.safe_dump(self.args, sort_keys=False)
[docs] def citations(self): """Return the citations of the sequence""" # concatenate all blocks citations citations = [] for block in self.blocks: citations += block.citations # remove duplicates citations = list(set(citations)) citation_dict = {} for name in citations: if name[0] == "@": citation_dict[name] = name else: citation_dict[name] = default_citations[name] tex_citep = ", ".join( [ f"{name} \citep{{{name}}}" for name in citation_dict.keys() if name not in ["prose", "astropy"] ] ) tex_citep += " and astropy \citep{astropy}" tex = ( f"This research made use of \\textsf{{prose}} \citep{{prose}} and its dependencies ({tex_citep})." "" ) return tex, "\n\n".join(citation_dict.values())
[docs]class SequenceParallel(Sequence): """ A multi-process :py:class:`Sequence` of blocks to be executed in parallel. .. caution:: :py:class:`SequenceParallel` is an experimental feature and not all blocks are compatible with it. If you encounter issues (or for debugging) you should switch back to a normal :py:class:`Sequence`. The :code:`data_blocks` allow blocks carying large amount of data to be run sequentially so that they are not copied from one process to another. Parameters ---------- blocks : list A list of blocks to be executed in parallel. data_blocks : list, optional A list of data blocks to be executed in parallel. name : str, optional A name for the sequence. """
[docs] def __init__(self, blocks, data_blocks=None, name=""): super().__init__(blocks, name=name) if data_blocks is None: self.data = None self._has_data = False else: self.data = Sequence(data_blocks) self._has_data = True
[docs] def check_data_blocks(self): bad_blocks = [] for b in self.blocks: if b._data_block: bad_blocks.append(f"{b.__class__.__name__}") if len(bad_blocks) > 0: bad_blocks = ", ".join(list(np.unique(bad_blocks))) error( f"Data blocks [{bad_blocks}] cannot be used in MPSequence\n\nConsider using the data_blocks kwargs" ) sys.exit()
def _run(self, loader=FITSImage): self.check_data_blocks() self.n_processed_images = 0 n = len(self.images) processed_blocks = mp.Manager().list(self.blocks) images_i = list(enumerate(self.images)) with mp.Pool() as pool: for image in self.progress( pool.imap( partial(_run_all, blocks=processed_blocks, loader=loader), images_i ), total=n, ): if not image.discard: if self._has_data: self.data.run(image, terminate=False, show_progress=False) else: self._add_discard(image.discard_block, image.i)
[docs] def terminate(self): if self._has_data: self.data.terminate()
def _run_all(image_i, blocks=None, loader=None): i, image = image_i if isinstance(image, (str, Path)): image = loader(image) image.i = i for block in blocks: # This allows to discard image in any Block if image.discard: return image else: block._run(image) if image.discard: image.discard_block = type(block).__name__ return image