Source code for pmaf.sequence._multiple._multiple

import warnings

warnings.simplefilter("ignore", category=FutureWarning)
from skbio import TabularMSA
from skbio.sequence import GrammaredSequence
from io import StringIO, IOBase
from shutil import copyfileobj
import copy
import numpy as np
from pmaf.internal.io._seq import SequenceIO
from pmaf.sequence._sequence._nucleotide import Nucleotide
from pmaf.sequence._metakit import MultiSequenceMetabase, NucleotideMetabase
from pmaf.sequence._shared import validate_seq_mode
from typing import Union, Optional, Any, Sequence, Generator
from pmaf.internal._typing import AnyGenericIdentifier


[docs]class MultiSequence(MultiSequenceMetabase): """Class responsible for handling multiple sequences.""" def __init__( self, sequences: Any, name: Optional[str] = None, mode: Optional[str] = None, metadata: Optional[dict] = None, aligned: bool = False, **kwargs: Any ): """Constructor for :class:`.MultiSequence` Parameters ---------- sequences Anything that can be parsed as multiple sequences. name Name of the multi-sequence instance mode Mode of the sequences. All sequences must have same mode/type. Otherwise error will be raised metadata Metadata of the multi-sequence instance aligned True if sequences are aligned. Default is False kwargs Compatibility """ if name is None or np.isscalar(name): tmp_name = name else: raise TypeError("`name` can be any scalar") if isinstance(metadata, dict): tmp_metadata = metadata elif metadata is None: tmp_metadata = {} else: raise TypeError("`metadata` can be dict or None") if mode is not None: if validate_seq_mode(mode): tmp_mode = mode.lower() else: raise ValueError("`mode` is invalid.") else: tmp_mode = mode tmp_sequences = [] if isinstance(sequences, list): if all( [isinstance(sequence, NucleotideMetabase) for sequence in sequences] ): tmp_sequences = sequences elif all( [isinstance(sequence, GrammaredSequence) for sequence in sequences] ): tmp_sequences = [ Nucleotide(skbio_seq, mode=None, **kwargs) for skbio_seq in sequences ] else: raise ValueError( "`sequences` must have same type when provided as list." ) else: if tmp_mode is not None: seq_gen = SequenceIO(sequences, upper=True).pull_parser( parser="simple", id=True, description=True, sequence=True ) for sid, desc, seq_str in seq_gen: tmp_sequences.append( Nucleotide( seq_str, name=sid, mode=tmp_mode, metadata={"description": desc}, **kwargs ) ) else: raise ValueError("`mode` cannot be None if raw read is performed.") if aligned: if len(set([sequence.length for sequence in tmp_sequences])) != 1: raise ValueError("`sequences` must be all of the length if aligned.") tmp_indices = [sequence.name for sequence in tmp_sequences] if len(tmp_indices) != len(set(tmp_indices)): raise ValueError("`sequences` must have unique names.") tmp_modes = set([sequence.mode for sequence in tmp_sequences]) if len(tmp_modes) > 1: raise ValueError("`sequences` cannot have different modes.") if tmp_mode is not None: if tmp_mode not in tmp_modes: raise ValueError("`mode` must match modes of sequences.") else: tmp_mode = tmp_modes.pop() tmp_internal_id = kwargs.get("internal_id", None) if tmp_internal_id is not None: for sequence in tmp_sequences: if tmp_internal_id not in sequence.metadata.keys(): raise ValueError( "Metadata of all sequences must contain same internal_id." ) self.__indices = np.asarray([seq.name for seq in tmp_sequences]) self.__sequences = tmp_sequences self.__metadata = tmp_metadata self.__aligned = bool(aligned) self.__internal_id = tmp_internal_id self.__skbio_mode = tmp_sequences[0].skbio_mode self.__mode = tmp_mode self.__name = tmp_name self.__buckled = bool(kwargs.get("buckled", None)) def __repr__(self): class_name = self.__class__.__name__ name = self.__name if self.__name is not None else "N/A" count = len(self.__sequences) metadata_state = "Present" if len(self.__metadata) > 0 else "N/A" aligned = "Yes" if self.__aligned else "No" mode = self.__mode.upper() if self.__mode is not None else "N/A" repr_str = ( "<{}:[{}], Name:[{}], Mode:[{}], Aligned: [{}], Metadata:[{}]>".format( class_name, count, name, mode, aligned, metadata_state ) ) return repr_str
[docs] def to_skbio_msa( self, indices: Optional[AnyGenericIdentifier] = None ) -> TabularMSA: """Convert to :mod:`skbio` :class:`~skbio.alignment.TabularMSA` instance. Parameters ---------- indices List of target sequences to select. Default is None for all sequences. Returns ------- Instance of :class:`skbio.alignment.TabularMSA` """ if self.__aligned: tmp_sequences = self.__get_seqs_by_index(indices) return TabularMSA([sequence.skbio for sequence in tmp_sequences]) else: raise RuntimeError("TabularMSA can only be retrieved for alignment.")
def __get_seqs_by_index(self, ids: Optional[AnyGenericIdentifier]): """Get sequences by indices/ids.""" if ids is not None: target_ids = np.asarray(ids) else: target_ids = self.__indices if np.isin(self.__indices, target_ids).sum() == len(target_ids): return [seq for seq in self.__sequences if seq.name in target_ids] else: raise ValueError("Invalid indices are provided.")
[docs] def get_consensus( self, indices: Optional[AnyGenericIdentifier] = None ) -> Nucleotide: """If sequence are aligned, estimate consensus sequence from the :term:`MSA` Parameters ---------- indices List of target sequences to select. Default is None for all sequences. Returns ------- Consensus sequence. """ if self.__aligned: tmp_msa = self.to_skbio_msa(indices) return Nucleotide( tmp_msa.consensus(), name=self.__name, metadata=self.__metadata, mode=self.__mode, ) else: raise RuntimeError("Consensus can be retrieved only from alignment.")
[docs] def get_subset( self, indices: Optional[AnyGenericIdentifier] = None ) -> "MultiSequence": """Get subset of the mutli-sequence instance. Parameters ---------- indices Indices to subset for. Returns ------- Subset instance of :class:`.MultiSequence` """ return type(self)( self.__get_seqs_by_index(indices), name=self.__name, metadata=self.__metadata, mode=self.__mode, aligned=self.__aligned, )
[docs] def buckle_for_alignment(self) -> dict: """Buckle individual sequences for alignment. Returns ------ Packed metadata of all sequences. """ if not self.__buckled: from collections import defaultdict from random import random if self.__internal_id is None: self.__internal_id = round(random() * 100000, None) packed_metadata = { "master-metadata": self.__metadata, "__name": self.__name, "__internal_id": self.__internal_id, } children_metadata = defaultdict(dict) for tmp_uid, sequence in enumerate(self.__sequences): tmp_uid_str = "TMP_ID_{}".format(str(tmp_uid)) children_metadata[tmp_uid_str] = sequence.buckle_by_uid(tmp_uid_str) packed_metadata.update({"children-metadata": dict(children_metadata)}) self.__buckled = True return packed_metadata else: raise RuntimeError("MultiSequence instance is already buckled.")
[docs] def restore_buckle(self, buckled_pack: dict) -> None: """Restore the buckled :class:`MultiSequence` instance. Parameters ---------- buckled_pack Backed up packed metadata of all individual sequences Returns ------- None if success or raise error """ if self.__buckled: self.__metadata = buckled_pack["master-metadata"] self.__name = buckled_pack["__name"] self.__internal_id = buckled_pack["__internal_id"] for sequence in self.__sequences: tmp_uid = sequence.unbuckle_uid() child_packed_metadata = buckled_pack["children-metadata"][tmp_uid] sequence.restore_buckle(child_packed_metadata) self.__indices = np.asarray([seq.name for seq in self.__sequences]) else: raise RuntimeError("MultiSequence instance is not buckled.")
[docs] def get_iter(self, method: str = "asis") -> Generator: """Get generator for the idividual sequences. Parameters ---------- method Method indicate how generator must yield the sequence data Returns ------- Generator for the sequences. Depending on `method` result can yield on of following: - 'asis' - (name[str], sequence[Instance]) - 'string' - (name[str], sequence[str]) - 'skbio' - (name[str], sequence[skbio]) """ def make_generator(): for sequence in self.__sequences: if method == "asis": yield sequence.name, sequence elif method == "string": yield sequence.name, sequence.text elif method == "skbio": yield sequence.name, sequence.skbio else: raise ValueError("`method` is invalid.") return make_generator()
[docs] def copy(self): """Copy current instance.""" return copy.deepcopy(self)
[docs] def write(self, file: Union[str, IOBase], mode: str = "w", **kwargs: Any) -> None: """Write the sequence data into the file. Parameters ---------- file File path or IO stream to write into mode File write mode such as "w" or "a" or "w+" kwargs Compatibility. """ buffer_io = self.__make_fasta_io(**kwargs) if isinstance(file, IOBase): file_handle = file elif isinstance(file, str): file_handle = open(file, mode=mode) else: raise TypeError("`file` has invalid type.") copyfileobj(buffer_io, file_handle) buffer_io.close()
[docs] def get_string_as(self, **kwargs): """Get string of all sequences. Parameters ---------- kwargs Compatibility. Will be passed to :meth:`pmaf.sequence.Nucleotide.write` method. Returns ------- String with formatted sequence data """ buffer_io = self.__make_fasta_io(**kwargs) ret = buffer_io.getvalue() buffer_io.close() return ret
def __make_fasta_io(self, **kwargs): """Make a FASTA file IO stream.""" buffer_io = StringIO() for sequence in self.__sequences: sequence.write(buffer_io, mode="a", **kwargs) buffer_io.seek(0) return buffer_io
[docs] @classmethod def from_buckled( cls, sequences: Any, buckled_pack: dict, **kwargs: Any ) -> "MultiSequence": """Factory method to create :class:`.MultiSequence` using packed metadata from buckling. Parameters ---------- sequences Sequences that will be passed to constructor buckled_pack Packed metadata produced during buckling kwargs Compatibility Returns ------- New instance of :class:`.MultiSequence` """ if not isinstance(buckled_pack, dict): raise TypeError("`buckled_pack` must have dict type.") tmp_multiseq = cls(sequences, buckled=True, **kwargs) tmp_multiseq.restore_buckle(buckled_pack) return tmp_multiseq
@property def count(self): """Total number of sequences.""" return len(self.__sequences) @property def metadata(self): """Instance metadata.""" return self.__metadata @property def mode(self): """Mode/type of the sequences.""" return self.__mode @property def skbio_mode(self): """The :mod:`skbio` mode of the sequence.""" return self.__skbio_mode @property def sequences(self): """List of individual sequence instances.""" return self.__sequences @property def name(self): """Name of the instance.""" return self.__name @property def is_alignment(self): """Is mutli-sequence is aligned or not.""" return self.__aligned @property def is_buckled(self): """Is mulit-sequence instance is buckled or not.""" return self.__buckled @property def index(self): """Indices of the internals sequences.""" return self.__indices