Source code for pmaf.pipe.agents.miners._miner

from ._base import MinerBase
from pmaf.pipe.agents.mediators._metakit import (
    MediatorAccessionMetabase,
    MediatorTaxonomyMetabase,
    MediatorSequenceMetabase,
    MediatorPhylogenyMetabase,
)
from pmaf.pipe.agents.dockers._metakit import (
    DockerAccessionMetabase,
    DockerIdentifierMetabase,
    DockerPhylogenyMetabase,
    DockerTaxonomyMetabase,
    DockerSequenceMetabase,
)
from pmaf.pipe.agents.dockers._mediums._id_medium import DockerIdentifierMedium
from pmaf.pipe.agents.dockers._mediums._acs_medium import DockerAccessionMedium
from pmaf.pipe.agents.dockers._mediums._phy_medium import DockerPhylogenyMedium
from pmaf.pipe.agents.dockers._mediums._seq_medium import DockerSequenceMedium
from pmaf.pipe.agents.dockers._mediums._tax_medium import DockerTaxonomyMedium
from collections import defaultdict
from pmaf.internal._shared import chunk_generator
from typing import Any, Generator


[docs]class Miner(MinerBase): """Main class that perform data mining operations in :mod:`~pmaf.pipe` module."""
[docs] def yield_taxonomy_by_identifier( self, docker: DockerIdentifierMedium, **kwargs: Any ) -> Generator[DockerTaxonomyMedium, None, None]: """Yields the next taxonomy :term:`docker` from the given identifier parameter `docker` Parameters ---------- docker The input identifier :term:`docker` kwargs Compatibility Yields ------ Taxonomy :term:`docker` """ if isinstance(docker, DockerIdentifierMetabase): if isinstance(self.mediator, MediatorTaxonomyMetabase): yield from self.__yield_taxonomy_by_identifier(docker, **kwargs) else: raise RuntimeError("`mediator` does not support such request.") else: raise TypeError("`docker` must be instance of DockerIdentifierMetabase.")
[docs] def yield_phylogeny_by_identifier( self, docker: DockerIdentifierMedium, **kwargs: Any ) -> DockerPhylogenyMedium: """Yield the next phylogeny :term:`docker` from given identifier parameter `docker` Parameters ---------- docker The input identifier :term:`docker` kwargs Compatibility Yields ------ Phylogeny :term:`docker` """ if isinstance(docker, DockerIdentifierMetabase): if isinstance(self.mediator, MediatorPhylogenyMetabase): yield from self.__yield_phylogeny_by_identifier(docker, **kwargs) else: raise RuntimeError("`mediator` does not support such request.") else: raise TypeError("`docker` must be instance of DockerIdentifierMetabase.")
[docs] def yield_sequence_by_identifier( self, docker: DockerIdentifierMedium, **kwargs: Any ) -> DockerSequenceMedium: """Yield the next sequence :term:`docker` from given identifier parameter `docker` Parameters ---------- docker The input identifier :term:`docker` kwargs Compatibility Yields ------ Sequence :term:`docker` """ if isinstance(docker, DockerIdentifierMetabase): if isinstance(self.mediator, MediatorSequenceMetabase): yield from self.__yield_sequence_by_identifier(docker, **kwargs) else: raise RuntimeError("`mediator` does not support such request.") else: raise TypeError("`docker` must be instance of DockerIdentifierMetabase.")
[docs] def yield_accession_by_identifier( self, docker: DockerIdentifierMedium, **kwargs: Any ) -> DockerAccessionMedium: """Yield the next accession :term:`docker` from given identifier parameter `docker` Parameters ---------- docker The input identifier :term:`docker` kwargs Compatibility Yields ------ Accession :term:`docker` """ if isinstance(docker, DockerIdentifierMetabase): if isinstance(self.mediator, MediatorAccessionMetabase): yield from self.__yield_accession_by_identifier(docker, **kwargs) else: raise RuntimeError("`mediator` does not support such request.") else: raise TypeError("`docker` must be instance of DockerIdentifierMetabase.")
[docs] def yield_identifier_by_docker(self, docker, **kwargs): """Yield the next identifier :term:`docker` from any given parameter `docker`. This method will automatically decide which result to produce depending on assigned :term:`mediator` instance. Parameters ---------- docker The input :term:`docker` kwargs Compatibility Yields ------ Any type of :term:`docker` depending on assigned :term:`mediator` """ if self.verify_docker(docker): if isinstance(docker, DockerAccessionMetabase): yield from self.__yield_identifier_by_accession(docker, **kwargs) elif isinstance(docker, DockerPhylogenyMetabase): yield from self.__yield_identifier_by_phylogeny(docker, **kwargs) elif isinstance(docker, DockerTaxonomyMetabase): yield from self.__yield_identifier_by_taxonomy(docker, **kwargs) elif isinstance(docker, DockerSequenceMetabase): yield from self.__yield_identifier_by_sequence(docker, **kwargs) elif isinstance(docker, DockerIdentifierMetabase): yield from iter([docker]) else: raise RuntimeError else: raise TypeError("`docker` must be instance of DockerIdentifierMetabase.")
def __yield_accession_by_identifier(self, docker, chunksize=None, **kwargs): return self.__process_chunks_by_docker_for_method( self.mediator.get_accession_by_identifier, docker, DockerAccessionMedium, chunksize=chunksize, factor=self.factor, **kwargs ) def __yield_taxonomy_by_identifier(self, docker, chunksize=None, **kwargs): return self.__process_chunks_by_docker_for_method( self.mediator.get_taxonomy_by_identifier, docker, DockerTaxonomyMedium, chunksize=chunksize, factor=self.factor, **kwargs ) def __yield_sequence_by_identifier(self, docker, chunksize=None, **kwargs): return self.__process_chunks_by_docker_for_method( self.mediator.get_sequence_by_identifier, docker, DockerSequenceMedium, chunksize=chunksize, factor=self.factor, **kwargs ) def __yield_phylogeny_by_identifier(self, docker, chunksize=None, **kwargs): return self.__process_chunks_by_docker_for_method( self.mediator.get_phylogeny_by_identifier, docker, DockerPhylogenyMedium, chunksize=chunksize, factor=self.factor, **kwargs ) def __yield_identifier_by_accession(self, docker, chunksize=None, **kwargs): return self.__process_chunks_by_docker_for_method( self.mediator.get_identifier_by_accession, docker, DockerIdentifierMedium, chunksize=chunksize, factor=self.factor, **kwargs ) def __yield_identifier_by_taxonomy(self, docker, chunksize=None, **kwargs): return self.__process_chunks_by_docker_for_method( self.mediator.get_identifier_by_taxonomy, docker, DockerIdentifierMedium, chunksize=chunksize, factor=self.factor, **kwargs ) def __yield_identifier_by_sequence(self, docker, chunksize=None, **kwargs): return self.__process_chunks_by_docker_for_method( self.mediator.get_identifier_by_sequence, docker, DockerIdentifierMedium, chunksize=chunksize, factor=self.factor, **kwargs ) def __yield_identifier_by_phylogeny(self, docker, chunksize=None, **kwargs): return self.__process_chunks_by_docker_for_method( self.mediator.get_identifier_by_phylogeny, docker, DockerIdentifierMedium, chunksize=chunksize, factor=self.factor, **kwargs ) def __process_chunks_by_docker_for_method( self, method, docker, outlet, chunksize=None, **kwargs ): """Method to mine dockers in chunks.""" if docker.singleton: return iter([method(docker, **kwargs)]) else: if chunksize is not None: tmp_chunks_gen = chunk_generator(docker.get_iterator(), chunksize) tmp_docker_name = docker.name tmp_docker_metadata = docker.metadata tmp_docker_type = type(docker) tmp_outlet = outlet def chunk_products(**kwargs): chunk_i = 0 for tmp_chunk in tmp_chunks_gen: chunk_name = ( str(chunk_i) if tmp_docker_name is None else "{}-{}".format(tmp_docker_name, str(chunk_i)) ) tmp_chunk_dockers = {name: docker for name, docker in tmp_chunk} tmp_docker_container = tmp_docker_type( tmp_chunk_dockers, name=chunk_name, metadata=tmp_docker_metadata, ) yield self.__process_recursive_by_docker_for_method( method, tmp_docker_container, tmp_outlet, **kwargs ) chunk_i += 1 return chunk_products(**kwargs) else: return iter( [ self.__process_recursive_by_docker_for_method( method, docker, outlet, **kwargs ) ] ) def __process_recursive_by_docker_for_method( self, method, docker_container, outlet, **kwargs ): """Method to mine non-singleton dockers recursively.""" def nested_parser(docker): if docker.singleton: return method(docker, **kwargs) else: product_dict = defaultdict(None) product_metadata = defaultdict(dict) for ix, sub_docker in docker.get_iterator(): tmp_parsed_docker = nested_parser(sub_docker) product_dict[ix] = tmp_parsed_docker product_metadata[ix] = tmp_parsed_docker.metadata new_metadata = { "verbose": dict(product_metadata), "master": docker.wrap_meta(), } return outlet( product_dict, name=docker.name, metadata=new_metadata, _transit=docker_container, ) return nested_parser(docker_container)