Source code for pmaf.database._manifest._greengenes

from os import path

import numpy as np
import pandas as pd

from pmaf.database._core._base import DatabaseBase
from pmaf.database._core._tax_base import DatabaseTaxonomyMixin
from pmaf.database._core._seq_base import DatabaseSequenceMixin
from pmaf.database._core._phy_base import DatabasePhylogenyMixin
from pmaf.database._core._acs_base import DatabaseAccessionMixin
from pmaf.database._manager import DatabaseStorageManager
import pmaf.database._shared._assemblers as transformer
import pmaf.database._shared._summarizers as summarizer
from typing import Any, Tuple
from pmaf.internal._typing import AnyGenericIdentifier


[docs]class DatabaseGreengenes( DatabaseTaxonomyMixin, DatabaseSequenceMixin, DatabasePhylogenyMixin, DatabaseAccessionMixin, DatabaseBase, ): """Database class for Greengenes database :cite:t:`mcdonaldImprovedGreengenesTaxonomy2012a`""" DATABASE_NAME = "Greengenes" INVALID_TAXA = "uncultured" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] @classmethod def build_database_storage( cls, storage_hdf5_fp: str, taxonomy_map_csv_fp: str, tree_newick_fp: str, sequence_fasta_fp: str, sequence_alignment_fasta_fp: str, stamp_dict: dict, force: bool = False, chunksize: int = 500, **kwargs: Any ) -> None: """Factory method to build new database :term:`hdf5` Parameters ---------- storage_hdf5_fp Output path for :term:`hdf5` file taxonomy_map_csv_fp Path to taxonomy file tree_newick_fp Path to Newick tree file sequence_fasta_fp Path to FASTA sequences file sequence_alignment_fasta_fp Path to FASTA alignment file stamp_dict Dictionary with metadata that will be stamped to the database force Force output file overwrite chunksize Sequence/Alignment data processing chunk size. Longer chunks are faster to process but require more memory. **kwargs Compatibility. Returns ------- None if file was created successfully. """ if path.exists(storage_hdf5_fp) and not force: raise ValueError("Storage file exists.") if not path.isfile(taxonomy_map_csv_fp): raise ValueError("Invalid taxonomy CSV/TSV file path provided.") if not path.isfile(tree_newick_fp): raise ValueError("Invalid tree Newick file path provided.") if not path.isfile(sequence_fasta_fp): raise ValueError("Invalid sequence FASTA file path provided.") if not path.isfile(sequence_alignment_fasta_fp): raise ValueError("Invalid alignment FASTA file path provided.") if isinstance(chunksize, int): if chunksize <= 0: raise ValueError("`chunksize` must be greater than zero.") else: raise TypeError("`chunksize` must be integer.") tmp_storage_manager = DatabaseStorageManager( hdf5_filepath=storage_hdf5_fp, storage_name=cls.DATABASE_NAME, force_new=force, ) removed_rids, novel_tids, index_mapper, tmp_recap = cls.__process_tax_acs_map( tmp_storage_manager, taxonomy_map_csv_fp ) cls.__process_tree(tmp_storage_manager, tree_newick_fp, index_mapper) tmp_recap = cls.__process_sequence( tmp_storage_manager, index_mapper, removed_rids, tmp_recap, sequence_fasta_fp, sequence_alignment_fasta_fp, chunksize, ) tmp_storage_manager.commit_to_storage( "stat-reps", transformer.produce_rep_stats(tmp_storage_manager, chunksize) ) tmp_storage_manager.commit_to_storage( "stat-taxs", transformer.produce_tax_stats(tmp_storage_manager, novel_tids) ) cls.__process_interxmaps(tmp_storage_manager) return transformer.finalize_storage_construction( tmp_storage_manager, stamp_dict, tmp_recap, **kwargs )
@classmethod def __process_tax_acs_map( cls, storage_manager: DatabaseStorageManager, taxonomy_map_csv_fp: str ) -> Tuple[AnyGenericIdentifier, np.ndarray, pd.Series, pd.Series]: """Process taxonomy and accession numbers. Parameters ---------- storage_manager Active storage manager taxonomy_map_csv_fp Path to taxonomy map Returns ------- Results of the process that need to be passed to builder. """ from pmaf.database._parsers._qiime import ( read_qiime_taxonomy_map, parse_qiime_taxonomy_map, ) def produce_taxonomy_prior(tmp_taxonomy_prior, index_mapper): """The *taxonomy-prior* storage element producer function.""" yield None, None yield transformer.reindex_frame(tmp_taxonomy_prior, index_mapper) def produce_taxonomy_sheet(taxonomy_sheet): """The *taxonomy-sheet* storage element producer function.""" yield None, None yield taxonomy_sheet def produce_sequence_accession(index_mapper, dropped_taxa): """The *sequence-accession* storage element producer function.""" yield None, None tmp_accessions = ( index_mapper.drop(index=dropped_taxa, errors="ignore") .reset_index(name="nrids") .set_index("nrids") ) tmp_accessions.columns = ["greengenes"] yield tmp_accessions def produce_metadata_db_history(transformation_details): """The *metadata-db-history* storage element producer function.""" yield None, None yield transformation_details["changes"] def produce_map_rep2tid(transformation_details): """The *map-rep2tid* storage element producer function.""" yield None, None yield transformation_details["map-rep2tid"] tmp_taxonomy_prior = read_qiime_taxonomy_map(taxonomy_map_csv_fp) index_mapper = transformer.make_rid_index_mapper(tmp_taxonomy_prior.index) taxonomy_prior = storage_manager.commit_to_storage( "taxonomy-prior", produce_taxonomy_prior(tmp_taxonomy_prior, index_mapper) ) tmp_taxonomy_sheet_master = parse_qiime_taxonomy_map(taxonomy_prior) taxonomy_sheet, transformation_details = transformer.reconstruct_taxonomy( tmp_taxonomy_sheet_master, index_mapper, cls.INVALID_TAXA ) removed_rids = transformation_details["removed-rids"] storage_manager.commit_to_storage( "taxonomy-sheet", produce_taxonomy_sheet(taxonomy_sheet) ) tmp_recap = summarizer.merge_recaps( summarizer.recap_taxonomy_sheet(taxonomy_sheet), summarizer.recap_transformation(transformation_details), ) tmp_recap = summarizer.append_recaps( tmp_recap, { "molecule-type": "DNA", "strand-orientation": "forward", "gene": "rRNA", "gene-target-region": "16S", }, ) storage_manager.commit_to_storage( "sequence-accession", produce_sequence_accession(index_mapper, removed_rids) ) storage_manager.commit_to_storage( "metadata-db-history", produce_metadata_db_history(transformation_details) ) storage_manager.commit_to_storage( "map-rep2tid", produce_map_rep2tid(transformation_details) ) return ( removed_rids, transformation_details["novel-tids"], index_mapper, tmp_recap, ) @classmethod def __process_tree( cls, storage_manager: DatabaseStorageManager, tree_newick_fp: str, index_mapper: pd.Series, ): """Process phylogenetic tree. Parameters ---------- storage_manager Active storage manager tree_newick_fp Path to Newick tree index_mapper New index mapper. Returns ------- Results of the process that need to be passed to builder. """ from pmaf.database._parsers._phylo import read_newick_tree from ete3 import Tree def produce_tree_prior(tree_newick_fp): """The *tree-prior* storage element producer function.""" yield None, None yield read_newick_tree(tree_newick_fp) def produce_tree_parsed(tree_newick_string, index_mapper): """The *tree-parsed* storage element producer function.""" yield None, None tmp_tree = Tree(tree_newick_string, format=0) yield transformer.reparse_tree(tmp_tree, index_mapper) def produce_tree_object(tree_newick_string): """The *tree-object* storage element producer function.""" yield None, None yield Tree(tree_newick_string, format=2, quoted_node_names=True) def produce_map_tree(tree_object): """The *map-tree* storage element producer function.""" yield None, None tmp_rebuilded_tree = transformer.rebuild_phylo(tree_object) yield transformer.make_tree_map(tmp_rebuilded_tree) tmp_newick_string = storage_manager.commit_to_storage( "tree-prior", produce_tree_prior(tree_newick_fp) ) tmp_newick_parsed = storage_manager.commit_to_storage( "tree-parsed", produce_tree_parsed(tmp_newick_string, index_mapper) ) tmp_tree_object = storage_manager.commit_to_storage( "tree-object", produce_tree_object(tmp_newick_parsed) ) storage_manager.commit_to_storage("map-tree", produce_map_tree(tmp_tree_object)) return @classmethod def __process_sequence( cls, storage_manager: DatabaseStorageManager, index_mapper: pd.Series, removed_rids: AnyGenericIdentifier, prior_recap: pd.Series, sequence_fasta_fp: str, sequence_alignment_fasta_fp: str, chunksize: int, ) -> pd.Series: """Process sequences and alignments. Parameters ---------- storage_manager Active storage manager index_mapper Inqex mapper removed_rids List of removed identifiers to drop during processing of sequences. prior_recap Overall recap sequence_fasta_fp Path to sequence FASTA file sequence_alignment_fasta_fp Path to MSA FASTA file chunksize Size of sequence processing chunks Returns ------- Results of the process that need to be passed to builder. """ from pmaf.database._parsers._qiime import parse_qiime_sequence_generator def produce_sequence_representative( sequence_fasta_fp, index_mapper, dropped_taxa, chunksize ): """The *sequence-representative* storage element producer function.""" sequence_parser = parse_qiime_sequence_generator( sequence_fasta_fp, chunksize, False ) preparse_info, first_chunk = next(sequence_parser) yield preparse_info.copy() preparse_info["max_rows"] = preparse_info["max_rows"] - len(dropped_taxa) preparse_info.pop("min_sequence") yield preparse_info, transformer.reindex_frame( first_chunk.drop(index=dropped_taxa, errors="ignore"), index_mapper ) for next_chunk in sequence_parser: yield transformer.reindex_frame( next_chunk.drop(index=dropped_taxa, errors="ignore"), index_mapper ) def produce_sequence_aligned( sequence_alignment_fasta_fp, index_mapper, dropped_taxa, chunksize ): """The *sequence-aligned* storage element producer function.""" sequence_parser = parse_qiime_sequence_generator( sequence_alignment_fasta_fp, chunksize, True ) preparse_info, first_chunk = next(sequence_parser) yield preparse_info.copy() preparse_info["max_rows"] = preparse_info["max_rows"] - len(dropped_taxa) preparse_info.pop("min_sequence") yield preparse_info, transformer.reindex_frame( first_chunk.drop(index=dropped_taxa, errors="ignore"), index_mapper ) for next_chunk in sequence_parser: yield transformer.reindex_frame( next_chunk.drop(index=dropped_taxa, errors="ignore"), index_mapper ) repseq_wrapped_parser = produce_sequence_representative( sequence_fasta_fp, index_mapper, removed_rids, chunksize ) tmp_repseq_details = next(repseq_wrapped_parser) storage_manager.commit_to_storage( "sequence-representative", repseq_wrapped_parser ) alignment_wrapped_parser = produce_sequence_aligned( sequence_alignment_fasta_fp, index_mapper, removed_rids, chunksize ) tmp_alignment_details = next(alignment_wrapped_parser) storage_manager.commit_to_storage("sequence-aligned", alignment_wrapped_parser) tmp_recap = summarizer.merge_recaps( prior_recap, summarizer.recap_sequence_info(tmp_repseq_details, tmp_alignment_details), ) return tmp_recap @classmethod def __process_interxmaps(cls, storage_manager: DatabaseStorageManager) -> None: """Process interx maps. Parameters ---------- storage_manager: Active storage manager Returns ------- Results of the process that need to be passed to builder. """ def produce_map_interx_taxon(interx_maker_result): """The *map-interx-taxon* storage element producer function.""" yield None, None yield interx_maker_result["map-interx-taxon"] def produce_map_interx_repseq(interx_maker_result): """The *map-interx-repseq storage element producer function.""" yield None, None yield interx_maker_result["map-interx-repseq"] tmp_interx_result = transformer.make_interxmaps(storage_manager) storage_manager.commit_to_storage( "map-interx-taxon", produce_map_interx_taxon(tmp_interx_result) ) storage_manager.commit_to_storage( "map-interx-repseq", produce_map_interx_repseq(tmp_interx_result) ) return @property def name(self) -> str: """Database name/label.""" return self.DATABASE_NAME