Source code for pmaf.database._manifest._otl

from os import path
from pmaf.database._core._base import DatabaseBase
from pmaf.database._core._tax_base import DatabaseTaxonomyMixin
from pmaf.database._core._phy_base import DatabasePhylogenyMixin
from pmaf.database._core._acs_base import DatabaseAccessionMixin
from pmaf.database._manager import DatabaseStorageManager
import pmaf.database._shared._assemblers as transformer
import pmaf.database._shared._summarizers as summarizer
import pandas as pd
import numpy as np
from typing import Any, Tuple
from pmaf.internal._typing import AnyGenericIdentifier


[docs]class DatabaseOTL( DatabaseTaxonomyMixin, DatabasePhylogenyMixin, DatabaseAccessionMixin, DatabaseBase ): """Database class for OTL database :cite:t:`reesAutomatedAssemblyReference2017a`""" DATABASE_NAME = "OpenTreeOfLife" INVALID_TAXA = None def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs)
[docs] @classmethod def build_database_storage( cls, storage_hdf5_fp: str, taxonomy_map_csv_fp: str, tree_newick_fp: str, stamp_dict: dict, force: bool = False, chunksize: int = 500, delimiter: str = "|", **kwargs: Any ): """Factory method to build new database :term:`hdf5` Parameters ---------- storage_hdf5_fp Output path for :term:`hdf5` taxonomy_map_csv_fp Path to taxonomy file tree_newick_fp Path to FASTA sequences file stamp_dict Dictionary with metadata that will be stamped to the storage force Force output file overwrite chunksize : Sequence/Alignment data processing chunk size. Longer chunks are faster to process but require more memory. delimiter Taxonomy map delimiter **kwargs Compatibility. Returns ------- None if file was created successfully. """ if path.exists(storage_hdf5_fp) and not force: raise ValueError("Storage file exists.") if not path.isfile(taxonomy_map_csv_fp): raise ValueError("Invalid taxonomy CSV/TSV file path provided.") if not path.isfile(tree_newick_fp): raise ValueError("Invalid tree Newick file path provided.") if isinstance(chunksize, int): if chunksize <= 0: raise ValueError("`chunksize` must be greater than zero.") else: raise TypeError("`chunksize` must be integer.") tmp_storage_manager = DatabaseStorageManager( hdf5_filepath=storage_hdf5_fp, storage_name=cls.DATABASE_NAME, force_new=force, ) removed_rids, novel_tids, index_mapper, tmp_recap = cls.__process_tax_acs_map( tmp_storage_manager, taxonomy_map_csv_fp, delimiter ) cls.__process_tree(tmp_storage_manager, tree_newick_fp, index_mapper) tmp_storage_manager.commit_to_storage( "stat-taxs", transformer.produce_tax_stats(tmp_storage_manager, novel_tids) ) cls.__process_interxmaps(tmp_storage_manager) return transformer.finalize_storage_construction( tmp_storage_manager, stamp_dict, tmp_recap, **kwargs )
@classmethod def __process_tax_acs_map( cls, storage_manager: DatabaseStorageManager, taxonomy_map_csv_fp: str, delimiter: str, ) -> Tuple[AnyGenericIdentifier, np.ndarray, pd.Series, pd.Series]: from pmaf.internal._extensions import cython_functions from pmaf.internal._constants import VALID_RANKS from tempfile import NamedTemporaryFile from collections import defaultdict def produce_taxonomy_prior(full_taxonomy_prior, index_mapper): """The *taxonomy-prior* storage element producer function.""" tmp_taxonomy_map = full_taxonomy_prior.loc[index_mapper.index].applymap( lambda x: "" if pd.isna(x) else x ) yield None, None yield transformer.reindex_frame(tmp_taxonomy_map, index_mapper) def produce_taxonomy_sheet(taxonomy_sheet): """The *taxonomy-sheet* storage element producer function.""" yield None, None yield taxonomy_sheet def produce_sequence_accession(taxonomy_prior, index_mapper, dropped_taxa): """The *sequence-accession* storage element producer function.""" if len(dropped_taxa) > 0: taxonomy_map_df = taxonomy_prior.drop( index=index_mapper.loc[dropped_taxa].values, errors="ignore" ) else: taxonomy_map_df = taxonomy_prior acc_map_series = taxonomy_map_df.loc[:, "sourceinfo"] def acc_rearranger(taxon_acc): acc_split = taxon_acc.split(",") acc_type_split = [acc_data.split(":") for acc_data in acc_split] acc_parsed = defaultdict(list) for acc_elem in acc_type_split: if ( "additions" not in acc_elem[0] ): # This fixes invalid sourceinfo elements within OTT taxonomy.tsv file. acc_parsed[acc_elem[0]].append(acc_elem[1]) return {key: "|".join(values) for key, values in acc_parsed.items()} acc_map_arranged_dict = acc_map_series.map(acc_rearranger).to_dict() del acc_map_series total_taxid = len(acc_map_arranged_dict) all_acc_types = [ list(taxon_acc.keys()) for taxon_acc in acc_map_arranged_dict.values() ] unique_acc_types = list( set( list( [ acc_type for acc_types in all_acc_types for acc_type in acc_types ] ) ) ) del all_acc_types def valid_acc_generator(acc_map_dict): for taxid, acc_dict in acc_map_dict.items(): yield [taxid] + [ acc_dict.get(acc_type, "") for acc_type in unique_acc_types ] accession_map_df = pd.DataFrame.from_records( valid_acc_generator(acc_map_arranged_dict), index=["index"], columns=["index"] + unique_acc_types, nrows=total_taxid, ) accession_map_df.insert( loc=0, column="otl", value=index_mapper.reset_index() .set_index("index") .loc[accession_map_df.index, "rids"], ) yield None, None yield accession_map_df def produce_metadata_db_history(transformation_details): """The *metadata-db-history* storage element producer function.""" yield None, None yield transformation_details["changes"] def produce_map_rep2tid(transformation_details): """The *main-rep2tid* storage elemenet producer function.""" yield None, None yield transformation_details["map-rep2tid"] full_taxonomy_map = pd.read_csv( taxonomy_map_csv_fp, index_col="uid", sep=delimiter, usecols=["uid", "parent_uid", "name", "rank", "sourceinfo", "uniqname"], engine="python", header=0, dtype=str, ) with NamedTemporaryFile() as tmp_taxonomy: full_taxonomy_map.to_csv( tmp_taxonomy.name, sep="|", columns=["parent_uid", "name", "rank"], index_label="uid", ) tmp_ott_sheet = cython_functions.generate_taxa_list_for_ott( VALID_RANKS, tmp_taxonomy.name, "|" ) tmp_taxonomy_sheet_master = pd.DataFrame.from_records( tmp_ott_sheet, columns=["uid", "pid"] + VALID_RANKS, index=["uid"], exclude=["pid"], ) tmp_taxonomy_sheet_master.index = tmp_taxonomy_sheet_master.index.astype( full_taxonomy_map.index.dtype ) index_mapper = transformer.make_rid_index_mapper( tmp_taxonomy_sheet_master.index ) taxonomy_prior = storage_manager.commit_to_storage( "taxonomy-prior", produce_taxonomy_prior(full_taxonomy_map, index_mapper) ) taxonomy_sheet_master = transformer.reindex_frame( tmp_taxonomy_sheet_master, index_mapper ) tmp_taxonomy_sheet, transformation_details = transformer.reconstruct_taxonomy( taxonomy_sheet_master, index_mapper, cls.INVALID_TAXA ) removed_rids = transformation_details["removed-rids"] taxonomy_sheet = storage_manager.commit_to_storage( "taxonomy-sheet", produce_taxonomy_sheet(tmp_taxonomy_sheet) ) tmp_recap = summarizer.merge_recaps( summarizer.recap_taxonomy_sheet(taxonomy_sheet), summarizer.recap_transformation(transformation_details), ) tmp_recap = summarizer.append_recaps( tmp_recap, { "molecule-type": "N/A", "strand-orientation": "N/A", "gene": "N/A", "gene-target-region": "N/A", }, ) storage_manager.commit_to_storage( "sequence-accession", produce_sequence_accession(taxonomy_prior, index_mapper, removed_rids), ) storage_manager.commit_to_storage( "metadata-db-history", produce_metadata_db_history(transformation_details) ) storage_manager.commit_to_storage( "map-rep2tid", produce_map_rep2tid(transformation_details) ) return ( removed_rids, transformation_details["novel-tids"], index_mapper, tmp_recap, ) @classmethod def __process_tree( cls, storage_manager: DatabaseStorageManager, tree_newick_fp: str, index_mapper: pd.Series, ): """Process phylogenetic tree. Parameters ---------- storage_manager Active storage manager tree_newick_fp Path to Newick tree index_mapper Index renamer/mapper """ from pmaf.database._parsers._phylo import read_newick_tree from ete3 import Tree import re regex_mrca_tags = re.compile("mrcaott[0-9]+ott[0-9]+") regex_ott_tags = re.compile("ott([0-9]+)") def produce_tree_prior(tree_newick_fp): yield None, None yield read_newick_tree(tree_newick_fp) def produce_tree_parsed(tree_newick_string, index_mapper): yield None, None newick_string_no_mrca_tag = re.sub(regex_mrca_tags, "", tree_newick_string) newick_string_parsed = re.sub( regex_ott_tags, "\\1", newick_string_no_mrca_tag ) tmp_tree = Tree(newick_string_parsed, format=8) yield transformer.reparse_tree(tmp_tree, index_mapper) def produce_tree_object(tree_newick_string): yield None, None yield Tree(tree_newick_string, format=2, quoted_node_names=True) def produce_map_tree(tree_object): yield None, None tmp_rebuilded_tree = transformer.rebuild_phylo(tree_object) yield transformer.make_tree_map(tmp_rebuilded_tree) tmp_newick_string = storage_manager.commit_to_storage( "tree-prior", produce_tree_prior(tree_newick_fp) ) tmp_newick_parsed = storage_manager.commit_to_storage( "tree-parsed", produce_tree_parsed(tmp_newick_string, index_mapper) ) tmp_tree_object = storage_manager.commit_to_storage( "tree-object", produce_tree_object(tmp_newick_parsed) ) storage_manager.commit_to_storage("map-tree", produce_map_tree(tmp_tree_object)) return @classmethod def __process_interxmaps(cls, storage_manager: DatabaseStorageManager): """Process interx maps. Parameters ---------- storage_manager Active storage manager """ def produce_map_interx_taxon(interx_maker_result): yield None, None yield interx_maker_result["map-interx-taxon"] def produce_map_interx_repseq(interx_maker_result): yield None, None yield interx_maker_result["map-interx-repseq"] tmp_interx_result = transformer.make_interxmaps(storage_manager) storage_manager.commit_to_storage( "map-interx-taxon", produce_map_interx_taxon(tmp_interx_result) ) storage_manager.commit_to_storage( "map-interx-repseq", produce_map_interx_repseq(tmp_interx_result) ) return @property def name(self): """Storage database name/label.""" return self.DATABASE_NAME