Source code for pmaf.pipe.markers._marker

from ._metakit import MarkerBackboneMetabase
from pmaf.pipe.specs._metakit import SpecificationBackboneMetabase
from pmaf.pipe.agents.dockers._metakit import DockerBackboneMetabase
from datetime import datetime
from typing import Any, Optional


[docs]class Marker(MarkerBackboneMetabase): """This class assists user in using pipe module.""" def __init__( self, input: Any, name: Optional[str] = None, metadata: Optional[dict] = None ): """Constructor for :class:`.Marker` Parameters ---------- input Instance of any type that has `.data` attribute. For example, :mod:`~pmaf.biome` or :mod:`pmaf.pipe.agents.dockers`. name Name of the marker instance. metadata Metadata of the marker instance. """ if input is not None: if not isinstance(input, DockerBackboneMetabase): if hasattr(input, "data"): self.__input = input.data else: self.__input = input else: self.__input = input self.__inlet = type(input) else: raise ValueError("`data` cannot be None.") if isinstance(name, (str, int, type(None))): tmp_name = name else: raise TypeError("`name` can be str,int or None") if isinstance(metadata, dict): tmp_metadata = metadata elif metadata is None: tmp_metadata = {} else: raise TypeError("`metadata` can be dict or None") if hasattr(input, "name") and name is None: self.__name = input.name else: self.__name = tmp_name if hasattr(input, "metadata") and metadata is None: self.__metadata = input.metadata else: self.__metadata = tmp_metadata self.__outlet = None self.__output = None self.__tasks = [] self.__task_pointer = None def __repr__(self): class_name = self.__class__.__name__ state = "Active" if len(self.__tasks) > 0 else "Inactive" step = ( "{}/{}".format(str(self.__task_pointer), str(len(self.__tasks))) if len(self.__tasks) > 0 else "N/A" ) inlet = self.inlet.__name__ outlet = self.outlet.__name__ if self.outlet is not None else "N/A" repr_str = "<{}:[{}], Step/Total:[{}], Inlet:[{}], Outlet:[{}]>".format( class_name, state, step, inlet, outlet ) return repr_str
[docs] def embed_specs(self, *args): """Embed :term:`specs<spec>` to the :class:`.Marker`. Parameters ---------- *args Instances of :term:`spec` that must be embedded. Returns ------- Returns the outlet type of the last :term:`spec` """ for spec in args: if isinstance(spec, SpecificationBackboneMetabase): if self.__task_pointer is None: self.__task_pointer = 0 last_tix = None else: last_tix = len(self.__tasks) - 1 for name, method, outlet, description in spec.steps: self.__tasks.append( { "status": False, "results": None, "time": datetime.now(), "request": { "method": method, "input": last_tix, "outlet": outlet, "name": name, "description": description, }, } ) last_tix = len(self.__tasks) - 1 self.__outlet = outlet else: raise TypeError("`marker` has invalid type.")
def __get_pending(self): """Get the pending :term:`specs<spec>` to be evaluated.""" return sorted( [ix for ix, task in enumerate(self.__tasks) if not task["status"]] ) def __get_finished(self): """Get the last finished :term:`spec`""" return sorted([ix for ix, task in enumerate(self.__tasks) if task["status"]]) def __move_to_next_task(self, current_task_results): """Move cursor to the next :term:`spec` and mark last completed.""" self.__tasks[self.__task_pointer]["results"] = current_task_results self.__tasks[self.__task_pointer]["status"] = True self.__task_pointer = self.__task_pointer + 1 self.__output = current_task_results[0] return def __get_next_task(self): """get the next :term:`spec` to be evaluated.""" if self.__task_pointer is not None: if self.__task_pointer < len(self.__tasks): return self.__tasks[self.__task_pointer] else: return None else: raise RuntimeError("Marker is not initiated.") def __iter__(self): """Iterate over pending :term:`specs<spec>`""" if len(self.__tasks) > 0: while len(self.__get_pending()) > 0: yield self.__next() else: raise RuntimeError("Marker is not initiated.") def __next__(self): """Run next :term:`spec`""" if len(self.__tasks) > 0: if len(self.__get_pending()) > 0: return self.__next() else: raise StopIteration else: raise RuntimeError("Marker is not initiated.") def __next(self): """Helper for __next__""" next_task = self.__get_next_task() if next_task is not None: current_input = next_task["request"]["input"] spec_method = next_task["request"]["method"] if isinstance(current_input, int): last_results = self.__tasks[current_input]["results"] next_args = (last_results[0], *last_results[1]) next_kwargs = last_results[2] else: next_args = (self.__input,) next_kwargs = {"metadata": self.__metadata, "name": self.__name} tmp_results = spec_method(*next_args, **next_kwargs) self.__move_to_next_task(tmp_results) return tmp_results[0] else: return None
[docs] def next(self): """Same as builtin next() command.""" if len(self.__tasks) > 0: if len(self.__get_pending()) > 0: return self.__next() else: raise StopIteration else: raise RuntimeError("Marker is not initiated.")
[docs] def compute(self): """Evaluate :term:`specs<spec>`""" if len(self.__tasks) > 0: tmp_product = None while len(self.__get_pending()) > 0: tmp_product = self.__next() return tmp_product else: raise RuntimeError("Marker is not initiated.")
[docs] def get_outputs(self): """Get the outputs.""" return [self.__tasks[ix]["results"][0] for ix in self.__get_finished()]
@property def tasks(self): """List all the tasks.""" return self.__tasks @property def name(self): """Name of the instance.""" return self.__name @property def metadata(self): """Metadata of the instance.""" return self.__metadata @property def inlet(self): """Inlet type of the marker.""" return self.__inlet @property def outlet(self): """Outlet type marker.""" return self.__outlet @property def input(self): """Input of the marker.""" return self.__input @property def output(self): """Output of the marker.""" return self.__output @property def upcoming(self): """Upcoming task.""" if len(self.__tasks) > 0: return self.__tasks[self.__task_pointer]["request"]["name"] else: return None