Source code for pmaf.database._manifest._gtdb

from os import path
from pmaf.database._core._base import DatabaseBase
from pmaf.database._core._tax_base import DatabaseTaxonomyMixin
from pmaf.database._core._seq_base import DatabaseSequenceMixin
from pmaf.database._core._phy_base import DatabasePhylogenyMixin
from pmaf.database._core._acs_base import DatabaseAccessionMixin
from pmaf.database._manager import DatabaseStorageManager
import pmaf.database._shared._assemblers as transformer
import pmaf.database._shared._summarizers as summarizer
from pmaf.internal.io._seq import SequenceIO
import numpy as np
import pandas as pd
from typing import Any, Tuple
from pmaf.internal._typing import AnyGenericIdentifier


[docs]class DatabaseGTDB( DatabaseTaxonomyMixin, DatabaseSequenceMixin, DatabasePhylogenyMixin, DatabaseAccessionMixin, DatabaseBase, ): """Database class for GTDB database :cite:t:`parksCompleteDomaintospeciesTaxonomy2020`""" DATABASE_NAME = "GTDP" INVALID_TAXA = None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] @classmethod def build_database_storage( cls, storage_hdf5_fp: str, taxonomy_map_csv_fp: str, tree_newick_fp: str, sequence_fasta_fp: str, metadata_csv_fp: str, stamp_dict: dict, force: bool = False, chunksize: int = 500, **kwargs: Any ) -> None: """Factory method to build new database :term:`hdf5` Parameters ---------- storage_hdf5_fp Output path for :term:`hdf5` file taxonomy_map_csv_fp Path to taxonomy file tree_newick_fp Path to Newick tree file sequence_fasta_fp Path to FASTA sequences file stamp_dict Dictionary with metadata that will be stamped to the database force Force output file overwrite chunksize Sequence/Alignment data processing chunk size. Longer chunks are faster to process but require more memory. **kwargs Compatibility. Returns ------- None if file was created successfully. """ if path.exists(storage_hdf5_fp) and not force: raise ValueError("Storage file exists.") if isinstance(taxonomy_map_csv_fp, (list, tuple)): if not all( [ path.isfile(taxonomy_map_csv_fp) for taxonomy_map_csv_fp in taxonomy_map_csv_fp ] ): raise ValueError("Invalid taxonomy CSV/TSV file path provided.") else: raise TypeError("`taxonomy_map_csv_fp` must be tuple or list.") if isinstance(metadata_csv_fp, (list, tuple)): if not all( [path.isfile(metadata_csv_fp) for metadata_csv_fp in metadata_csv_fp] ): raise ValueError("Invalid metadata CSV/TSV file path provided.") else: raise TypeError("`metadata_csv_fp` must be tuple or list.") if isinstance(tree_newick_fp, (list, tuple)): if not all( [path.isfile(tree_newick_fp) for tree_newick_fp in tree_newick_fp] ): raise ValueError("Invalid tree Newick file path provided.") else: raise TypeError("`tree_newick_fp` must be tuple or list.") if isinstance(sequence_fasta_fp, (list, tuple)): if not all( [ path.isfile(sequence_fasta_fp) for sequence_fasta_fp in sequence_fasta_fp ] ): raise ValueError("Invalid sequence FASTA file path provided.") else: raise TypeError("`sequence_fasta_fp` must be tuple or list") if isinstance(chunksize, int): if chunksize <= 0: raise ValueError("`chunksize` must be greater than zero.") else: raise TypeError("`chunksize` must be integer.") tmp_storage_manager = DatabaseStorageManager( hdf5_filepath=storage_hdf5_fp, storage_name=cls.DATABASE_NAME, force_new=force, ) valid_ids = np.asarray( [ sid[0] for sid in SequenceIO(sequence_fasta_fp).pull_parser( parser="simple", id=True, description=False, sequence=False ) ] ) removed_rids, novel_tids, index_mapper, tmp_recap = cls.__process_tax_map( tmp_storage_manager, taxonomy_map_csv_fp, valid_ids ) cls.__process_acs( tmp_storage_manager, metadata_csv_fp, index_mapper, removed_rids ) cls.__process_tree(tmp_storage_manager, tree_newick_fp, index_mapper) tmp_recap = cls.__process_sequence( tmp_storage_manager, index_mapper, removed_rids, tmp_recap, sequence_fasta_fp, chunksize, ) tmp_storage_manager.commit_to_storage( "stat-reps", transformer.produce_rep_stats(tmp_storage_manager, chunksize) ) tmp_storage_manager.commit_to_storage( "stat-taxs", transformer.produce_tax_stats(tmp_storage_manager, novel_tids) ) cls.__process_interxmaps(tmp_storage_manager) return transformer.finalize_storage_construction( tmp_storage_manager, stamp_dict, tmp_recap, **kwargs )
@classmethod def __process_tax_map( cls, storage_manager: DatabaseStorageManager, taxonomy_map_csv_fp: str, valid_ids: np.ndarray, ) -> Tuple[AnyGenericIdentifier, np.ndarray, pd.Series, pd.Series]: """Process taxonomy only. Parameters ---------- storage_manager Active storage manager taxonomy_map_csv_fp Path to taxonomy map valid_ids Valid reference identifiers to keep Returns ------- Results of the process that need to be passed to builder. """ from pmaf.database._parsers._qiime import ( read_qiime_taxonomy_map, parse_qiime_taxonomy_map, ) def produce_taxonomy_prior(tmp_taxonomy_prior, index_mapper): """The *taxonomy-prior* storage element producer function.""" yield None, None yield transformer.reindex_frame(tmp_taxonomy_prior, index_mapper) def produce_taxonomy_sheet(taxonomy_sheet): """The *taxonomy-sheet* storage element producer function.""" yield None, None yield taxonomy_sheet def produce_metadata_db_history(transformation_details): """The *metadata-db-history* storage element producer function.""" yield None, None yield transformation_details["changes"] def produce_map_rep2tid(transformation_details): """The *main-rep2tid* storage elemenet producer function.""" yield None, None yield transformation_details["map-rep2tid"] full_taxonomy_prior = pd.concat( [read_qiime_taxonomy_map(tax_map_fp) for tax_map_fp in taxonomy_map_csv_fp], axis=0, ) tmp_taxonomy_prior = full_taxonomy_prior.loc[valid_ids] index_mapper = transformer.make_rid_index_mapper(tmp_taxonomy_prior.index) taxonomy_prior = storage_manager.commit_to_storage( "taxonomy-prior", produce_taxonomy_prior(tmp_taxonomy_prior, index_mapper) ) tmp_taxonomy_sheet_master = parse_qiime_taxonomy_map(taxonomy_prior) taxonomy_sheet, transformation_details = transformer.reconstruct_taxonomy( tmp_taxonomy_sheet_master, index_mapper, cls.INVALID_TAXA ) removed_rids = transformation_details["removed-rids"] storage_manager.commit_to_storage( "taxonomy-sheet", produce_taxonomy_sheet(taxonomy_sheet) ) tmp_recap = summarizer.merge_recaps( summarizer.recap_taxonomy_sheet(taxonomy_sheet), summarizer.recap_transformation(transformation_details), ) tmp_recap = summarizer.append_recaps( tmp_recap, { "molecule-type": "DNA", "strand-orientation": "forward", "gene": "rRNA", "gene-target-region": "16S", }, ) storage_manager.commit_to_storage( "metadata-db-history", produce_metadata_db_history(transformation_details) ) storage_manager.commit_to_storage( "map-rep2tid", produce_map_rep2tid(transformation_details) ) return ( removed_rids, transformation_details["novel-tids"], index_mapper, tmp_recap, ) @classmethod def __process_acs( cls, storage_manager: DatabaseStorageManager, metadata_csv_fp: str, index_mapper: pd.Series, dropped_taxa: np.ndarray, ) -> None: """Process accession numbers. Parameters ---------- storage_manager Active storage manager metadata_csv_fp File path to metadata file index_mapper Index renamer/mapper dropped_taxa Taxa that was previously dropped """ def produce_sequence_accession(valid_metadata, index_mapper, dropped_taxa): """The *sequence-accession8 storage element producer function.""" yield None, None id_to_drop = valid_metadata.index[valid_metadata.index.isin(dropped_taxa)] tmp_gtdp_acs = ( index_mapper.drop(index=dropped_taxa, errors="ignore") .reset_index(name="nrids") .set_index("nrids") ) tmp_accessions = transformer.reindex_frame( valid_metadata.drop(index=id_to_drop, errors="ignore"), index_mapper ) tmp_accessions.insert(loc=0, column="gtdb", value=tmp_gtdp_acs.values) yield tmp_accessions total_metadata = pd.concat( [ pd.read_csv(metadata_csv_fp, sep="\t", index_col=0, na_values="none") for metadata_csv_fp in metadata_csv_fp ], axis=0, ) valid_metadata = total_metadata.loc[ index_mapper.index, ["ncbi_genbank_assembly_accession", "ncbi_taxid"] ] valid_metadata.columns = ["ncbi-genbank", "ncbi-taxid"] storage_manager.commit_to_storage( "sequence-accession", produce_sequence_accession(valid_metadata, index_mapper, dropped_taxa), ) return @classmethod def __process_tree( cls, storage_manager: DatabaseStorageManager, tree_newick_fp: str, index_mapper: pd.Series, ): """Process phylogenetic tree. Parameters ---------- storage_manager Active storage element tree_newick_fp Path to Newick tree index_mapper Index renamer/mapper """ from ete3 import Tree def produce_tree_prior(merged_tree): """The *tree-prior* storage element producer function.""" yield None, None yield merged_tree.write(format=2) def produce_tree_parsed(tree_newick_string, index_mapper): """The *tree-parsed* storage element producer function.""" yield None, None tmp_tree = Tree(tree_newick_string, format=2) yield transformer.reparse_tree(tmp_tree, index_mapper) def produce_tree_object(tree_newick_string): """The *tree-object* storage element producer function.""" yield None, None yield Tree(tree_newick_string, format=2, quoted_node_names=True) def produce_map_tree(tree_object): """The *map-tree* storage element producer function.""" yield None, None tmp_rebuilded_tree = transformer.rebuild_phylo(tree_object) yield transformer.make_tree_map(tmp_rebuilded_tree) merged_tree = Tree(name="root") for tree_fp in tree_newick_fp: merged_tree.add_child(child=Tree(tree_fp, format=1, quoted_node_names=True)) tmp_newick_string = storage_manager.commit_to_storage( "tree-prior", produce_tree_prior(merged_tree) ) tmp_newick_parsed = storage_manager.commit_to_storage( "tree-parsed", produce_tree_parsed(tmp_newick_string, index_mapper) ) tmp_tree_object = storage_manager.commit_to_storage( "tree-object", produce_tree_object(tmp_newick_parsed) ) storage_manager.commit_to_storage("map-tree", produce_map_tree(tmp_tree_object)) return @classmethod def __process_sequence( cls, storage_manager: DatabaseStorageManager, index_mapper: pd.Series, removed_rids: np.ndarray, prior_recap: pd.Series, sequence_fasta_fp: str, chunksize: int, ) -> pd.Series: """Process sequence data. Parameters ---------- storage_manager Active storage manager index_mapper Index renamer/mapper removed_rids Identifiers that must be removed prior_recap Previous recap data sequence_fasta_fp Path to FASTA file chunksize Size of processing chunks Returns ------- Results of the process that need to be passed to the builder """ from pmaf.database._parsers._qiime import parse_qiime_sequence_generator def produce_sequence_representative( sequence_fasta_fp, index_mapper, dropped_taxa, chunksize ): sequence_parser = parse_qiime_sequence_generator( sequence_fasta_fp, chunksize, False ) preparse_info, first_chunk = next(sequence_parser) yield preparse_info.copy() preparse_info["max_rows"] = preparse_info["max_rows"] - len(dropped_taxa) preparse_info.pop("min_sequence") yield preparse_info, transformer.reindex_frame( first_chunk.drop(index=dropped_taxa, errors="ignore"), index_mapper ) for next_chunk in sequence_parser: yield transformer.reindex_frame( next_chunk.drop(index=dropped_taxa, errors="ignore"), index_mapper ) repseq_wrapped_parser = produce_sequence_representative( sequence_fasta_fp, index_mapper, removed_rids, chunksize ) tmp_repseq_details = next(repseq_wrapped_parser) storage_manager.commit_to_storage( "sequence-representative", repseq_wrapped_parser ) tmp_recap = summarizer.merge_recaps( prior_recap, summarizer.recap_sequence_info(tmp_repseq_details) ) return tmp_recap @classmethod def __process_interxmaps(cls, storage_manager: DatabaseStorageManager): """Process interx maps. Parameters ---------- storage_manager Active storage maanger """ def produce_map_interx_taxon(interx_maker_result): yield None, None yield interx_maker_result["map-interx-taxon"] def produce_map_interx_repseq(interx_maker_result): yield None, None yield interx_maker_result["map-interx-repseq"] tmp_interx_result = transformer.make_interxmaps(storage_manager) storage_manager.commit_to_storage( "map-interx-taxon", produce_map_interx_taxon(tmp_interx_result) ) storage_manager.commit_to_storage( "map-interx-repseq", produce_map_interx_repseq(tmp_interx_result) ) return @property def name(self): """Database name/label.""" return self.DATABASE_NAME