Source code for pmaf.pipe.agents.mediators._remote._ncbi

from pmaf.pipe.agents.mediators._base import MediatorBase
from pmaf.pipe.factors._metakit import FactorBackboneMetabase
from pmaf.remote._entrez._metakit import EntrezBackboneMetabase
from pmaf.pipe.agents.mediators._metakit import (
    MediatorSequenceMetabase,
    MediatorTaxonomyMetabase,
    MediatorAccessionMetabase,
)
from pmaf.pipe.agents.dockers._metakit import (
    DockerIdentifierMetabase,
    DockerTaxonomyMetabase,
)
from pmaf.pipe.agents.dockers._mediums._id_medium import DockerIdentifierMedium
from pmaf.pipe.agents.dockers._mediums._acs_medium import DockerAccessionMedium
from pmaf.pipe.agents.dockers._mediums._seq_medium import DockerSequenceMedium
from pmaf.pipe.agents.dockers._mediums._tax_medium import DockerTaxonomyMedium
from pmaf.internal._shared import sort_ranks
from pmaf.alignment._multiple._metakit import MultiSequenceAlignerBackboneMetabase
from pmaf.internal._constants import ITS
from pmaf.internal.io._seq import SequenceIO
from typing import Any, Optional


[docs]class NCBIMediator( MediatorBase, MediatorSequenceMetabase, MediatorTaxonomyMetabase, MediatorAccessionMetabase, ): """The :term:`NCBI` Entrez API :cite:t:`schuler10EntrezMolecular1996` mediator.""" SEQ_EXTRACT_METHODS = ["asis", "consensus"] def __init__( self, entrez: EntrezBackboneMetabase, seq_method: str = "asis", seq_aligner: Optional[MultiSequenceAlignerBackboneMetabase] = None, **kwargs: Any ): """Constructor for :class:`.NCBIMediator` Parameters ---------- entrez Working/active instance of :class:`pmaf.remote.Entrez` seq_method Method of sequence retrieval. - The "asis" will retrieve multiple sequences as-is. - The "consensus" will attempt to align sequences using `seq_aligner` and return the consensus sequence seq_aligner Sequence aligner to use when `seq_method` is set to "consensus" kwargs Compatibility """ if not isinstance(entrez, EntrezBackboneMetabase): TypeError("`entrez` has invalid type.") if not entrez.state: raise ValueError("`entrez` have invalid state.") if isinstance(seq_method, str): if seq_method not in self.SEQ_EXTRACT_METHODS: raise ValueError("`seq_method` is unknown.") else: raise TypeError("`seq_method` has invalid type.") if seq_aligner is not None: if not isinstance(seq_aligner, MultiSequenceAlignerBackboneMetabase): raise TypeError("`seq_aligner` has invalid type.") if seq_method == "consensus" and not seq_aligner is not None: raise ValueError( "`seq_method` in consensus mode require valid `seq_aligner`." ) tmp_configs = dict(seq_method=seq_method, seq_aligner=seq_aligner) super().__init__(_client=entrez, _configs=tmp_configs, **kwargs) def __repr__(self): class_name = self.__class__.__name__ client_class_name = "Entrez API" state = "Active" if self.client.state else "Inactive" repr_str = "<{}:[{}], Client:[{}]>".format(class_name, state, client_class_name) return repr_str
[docs] def verify_factor(self, factor: FactorBackboneMetabase) -> bool: """Verify factor compatibility with mediator. Parameters ---------- factor The :term:`factor` to test Returns ------- Result of validations """ if isinstance(factor, FactorBackboneMetabase): gene_type = factor.factors.get("gene-type", None) == "marker" molecule_type = factor.factors.get("molecule-type", None) in ["DNA", "RNA"] gene_name = factor.factors.get("gene-name", None) in ["rRNA"] gene_target = factor.factors.get("gene-target", None) in [ "16S", "18S", "ITS", ] return all([gene_type, molecule_type, gene_name, gene_target]) else: raise TypeError("`factor` has invalid type.")
[docs] def get_sequence_by_identifier( self, docker: DockerIdentifierMedium, factor: FactorBackboneMetabase, **kwargs: Any ) -> DockerSequenceMedium: """Get sequence from identifier. In case of :term:`NCBI` the identifier is :term:`taxid` Parameters ---------- docker Identifier(:term:`taxid`) factor The mining :term:`factor` to use kwargs Compatibility Returns ------- The :term:`docker` with sequence data """ if not self.verify_factor(factor): raise ValueError("`factor` is invalid.") if isinstance(docker, DockerIdentifierMetabase): if docker.singleton: return self.__retrieve_sequence_by_identifier(docker, factor, **kwargs) else: raise ValueError("`docker` must be singleton.") else: raise TypeError("`docker` must be instance of DockerIdentifierMetabase.")
[docs] def get_accession_by_identifier( self, docker: DockerIdentifierMedium, factor: FactorBackboneMetabase, **kwargs: Any ) -> DockerAccessionMedium: """Get accessions from identifier. In case of :term:`NCBI` the identifier is :term:`taxid` Parameters ---------- docker Identifier(:term:`taxid`) factor The mining :term:`factor` to use kwargs Compatibility Returns ------- The :term:`docker` with accession data """ if not self.verify_factor(factor): raise ValueError("`factor` is invalid.") if isinstance(docker, DockerIdentifierMetabase): if docker.singleton: return self.__retrieve_accession_by_identifier(docker) else: raise ValueError("`docker` must be singleton.") else: raise TypeError("`docker` must be instance of DockerIdentifierMetabase.")
[docs] def get_identifier_by_accession( self, docker: DockerAccessionMedium, factor: FactorBackboneMetabase, **kwargs: Any ) -> DockerIdentifierMedium: """Get identifier from accession number. In case of :term:`NCBI` the identifier is :term:`taxid` Parameters ---------- docker The :term:`docker` with accession numbers factor The mining :term:`factor` to use kwargs Compatibility Returns ------- The :term:`docker` with identifiers """ raise NotImplementedError
[docs] def get_identifier_by_sequence( self, docker: DockerSequenceMedium, factor: FactorBackboneMetabase, **kwargs: Any ) -> DockerIdentifierMedium: """Get identifier from sequence. In case of :term:`NCBI` the identifier is :term:`taxid` THIS METHOD IS NOT YET IMPLEMENTED. Parameters ---------- docker The :term:`docker` with sequence data factor The mining :term:`factor` to use kwargs Compatibility Returns ------- The :term:`docker` with identifiers """ raise NotImplementedError
[docs] def get_taxonomy_by_identifier( self, docker: DockerIdentifierMedium, factor: FactorBackboneMetabase, **kwargs: Any ) -> DockerTaxonomyMedium: """Get taxonomy from identifiers. In case of :term:`NCBI` the identifier is :term:`taxid` THIS METHOD IS NOT YET IMPLEMENTED. Parameters ---------- docker The :term:`docker` with identifiers data factor The mining :term:`factor` to use kwargs Compatibility Returns ------- The :term:`docker` with taxonomy """ raise NotImplementedError
[docs] def get_identifier_by_taxonomy( self, docker: DockerTaxonomyMedium, factor: FactorBackboneMetabase, **kwargs: Any ) -> DockerIdentifierMedium: """Get identifier from taxonomy. In case of :term:`NCBI` the identifier is :term:`taxid` Parameters ---------- docker The :term:`docker` with taxonomy data factor The mining :term:`factor` to use kwargs Compatibility Returns ------- The :term:`docker` with identifiers """ if not self.verify_factor(factor): raise ValueError("`factor` is invalid.") if isinstance(docker, DockerTaxonomyMetabase): if docker.singleton: return self.__retrieve_identifier_by_taxonomy(docker, **kwargs) else: raise ValueError("`docker` must be singleton.") else: raise TypeError("`docker` must be instance of DockerTaxonomyMetabase.")
def __retrieve_identifier_by_taxonomy(self, docker, **kwargs): """Get :term:`taxid` for :term:`NCBI` taxonomy.""" tmp_identifiers = dict.fromkeys(docker.index, None) tmp_query_metadata = dict.fromkeys(docker.valid, None) for ix, taxonomy in docker.get_iterator(exclude_missing=True): tmp_query = self.__make_query_for_taxonomy_entry(taxonomy) tmp_taxid = self.client.get_taxid_by_query(tmp_query) if tmp_taxid: tmp_identifiers[ix] = tmp_taxid tmp_query_metadata[ix] = tmp_query new_metadata = {"queries": tmp_query_metadata, "master": docker.wrap_meta()} return DockerIdentifierMedium( tmp_identifiers, name=docker.name, metadata=new_metadata ) def __make_query_for_taxonomy_entry(self, taxonomy): """Generate a query for taxonomy :term:`docker`""" taxa_dict = {k: v for k, v in taxonomy.items() if v is not None} ordered_ranks = sort_ranks(taxa_dict.keys()) target_rank = ordered_ranks[-1] target_taxon = taxa_dict[target_rank] target_rank_full = ITS["r2rank"][target_rank] ancestors = [taxon for rank, taxon in taxa_dict.items() if rank != target_rank] ancestors_query = " OR ".join( ["{}[Subtree]".format(taxon) for taxon in ancestors] ) target_query = "{}[All Names] AND {}[Rank]".format( target_taxon, target_rank_full ) query = "({}) AND ({})".format(ancestors_query, target_query) return query def __retrieve_accession_by_identifier(self, docker): """Get accession numbers from :term:`taxid`""" tmp_accession = dict.fromkeys(docker.index, None) tmp_metadata = dict.fromkeys(docker.valid, None) for ix, taxid in docker.get_iterator(exclude_missing=True): tmp_taxid_metadata = { "tax-id": taxid, "genome-id": None, "chromosome-id": None, } tmp_genome_id = self.client.get_genome_id_by_taxid(taxid) if tmp_genome_id: tmp_taxid_metadata["genome-id"] = tmp_genome_id tmp_chromosome_id = self.client.get_chromosome_id_by_genome_id( tmp_genome_id ) if tmp_chromosome_id: tmp_taxid_metadata["chromosome-id"] = tmp_chromosome_id tmp_accession[ix] = {"ncbi": tmp_chromosome_id} tmp_metadata[ix] = tmp_taxid_metadata new_metadata = {"verbose": tmp_metadata, "master": docker.wrap_meta()} return DockerAccessionMedium( tmp_accession, name=docker.name, metadata=new_metadata ) def __retrieve_sequence_by_identifier(self, docker, factor, **kwargs): """Get sequences from :term:`taxid` identifiers.""" tmp_sequences = dict.fromkeys(docker.index, None) tmp_metadata = dict.fromkeys(docker.valid, None) target_gene_filter = factor.factors["gene-target"].lower().strip() for ix, accession_no in docker.get_iterator(exclude_missing=True): tmp_features = self.client.get_gene_features_by_chromosome_id( accession_no, factor.factors["gene-name"] ) if tmp_features: tmp_copy_sequences_str = "" tmp_copy_metadata = [] tmp_copy_names = [] for feature_id, feature_details in enumerate(tmp_features): feature_genes_adj = feature_details[0].lower().strip() if target_gene_filter in feature_genes_adj: tmp_copy_name = "{}-{}".format(str(ix), str(feature_id)) tmp_copy_names.append(tmp_copy_name) tmp_start_pos = feature_details[1]["start"] tmp_end_pos = feature_details[1]["end"] tmp_strand = feature_details[1]["strand"] tmp_fasta_str = self.client.get_fasta_sequence_by_param( str(accession_no), str(tmp_start_pos), str(tmp_end_pos), str(tmp_strand), ) tmp_copy_sequences_str += "{}\n".format(tmp_fasta_str) tmp_copy_metadata.append( { "start": tmp_start_pos, "end": tmp_end_pos, "strand": tmp_strand, "gene-target": feature_genes_adj, "gene-name": factor.factors["gene-name"], "accession": accession_no, } ) tmp_copy_sequences_dict = dict.fromkeys(tmp_copy_names, None) tmp_copy_metadata_dict = dict.fromkeys(tmp_copy_names, None) tmp_copy_counter = 0 for fasta_id, fasta_desc, seq_str in SequenceIO( tmp_copy_sequences_str, ftype="fasta", upper=True ).pull_parser(id=True, description=True, sequence=True): tmp_copy_name_x = tmp_copy_names[tmp_copy_counter] tmp_copy_metadata_adj = { **tmp_copy_metadata[tmp_copy_counter], **{"fasta-id": fasta_id, "fasta-description": fasta_desc}, } tmp_copy_metadata_dict[tmp_copy_name_x] = tmp_copy_metadata_adj tmp_copy_sequences_dict[tmp_copy_name_x] = seq_str tmp_copy_counter += 1 tmp_sequences[ix] = self.__transform_sequence_for_method( DockerSequenceMedium( tmp_copy_sequences_dict, mode=None, name=ix, metadata=tmp_copy_metadata_dict, ) ) tmp_metadata[ix] = { "accession": accession_no, "copy-number": len(tmp_copy_sequences_dict), "verbose": tmp_copy_metadata_dict, "gene-name": factor.factors["gene-name"], } new_metadata = {"verbose": tmp_metadata, "master": docker.wrap_meta()} return DockerSequenceMedium( tmp_sequences, mode=None, name=docker.name, metadata=new_metadata ) def __transform_sequence_for_method(self, sequence_docker): """Apply sequence processing method to the sequence :term:`docker`""" if self.configs["seq_method"] == "consensus" and isinstance( self.configs["seq_aligner"], MultiSequenceAlignerBackboneMetabase ): if sequence_docker.count > 1: return ( self.configs["seq_aligner"] .align(sequence_docker.to_multiseq()) .get_consensus() .text ) else: return next(iter(sequence_docker.data.values())) else: return sequence_docker @property def state(self): """State of the mediator(client).""" return self.client.state