Source code for pmaf.database._manager

"""
Database Manager
----------------
Database Storage Manager

.. currentmodule:: pmaf.database

"""
import warnings

import pandas as pd
import tables
from os import path
import pickle
from collections import defaultdict
from pmaf.database._shared._common import *
from typing import Optional, Tuple, Union, Dict, Generator, Any
from pmaf.internal._typing import AnyGenericIdentifier


[docs]class DatabaseStorageManager: """Database :term:`hdf5` storage manager.""" def __init__(self, hdf5_filepath: str, storage_name: str, force_new: bool = False): """Constructor for storage manager. Parameters ---------- hdf5_filepath Path to :term:`hdf5` file storage_name Storage database name. Must match the one in storage file. force_new If file exists the override it for new construction """ self._storer = None # Current storer(pandas/pytables) handle self._storer_state = False # Current storer state self._storer_mode = None # Value can be either 1 for `tables` or 2 for `pandas` self._hdf5_filepath = None # HDF5 filepath self._storage_name = None # Storage database name self._init_state = 0 # Initiation state. 0 or False: Inactive; -1: Under construction; 1: Active for data retrieval. self._db_info_cache = ( None # Database cache for storage element "metadata-db-info" ) self._db_summary = ( None # Database cache for storage element states. Stored as pd.Series ) self._interxmap_cache = { "map-interx-taxon": None, "map-interx-repseq": None, } # Cache for interxmap elements self._supplement_cache = defaultdict(None) # Additional optional cache elements if isinstance(storage_name, str) and isinstance(hdf5_filepath, str): if len(storage_name) > 0 and len(hdf5_filepath) > 0: if not path.exists(hdf5_filepath) or force_new: self._hdf5_filepath = hdf5_filepath self._storage_name = storage_name self.__build_storage() else: if self.validate_storage(hdf5_filepath, storage_name): self._hdf5_filepath = hdf5_filepath self._storage_name = storage_name self.__init_manager() else: raise ValueError("Storage file or storage name is invalid.") else: raise ValueError("Storage file or storage name is invalid.") else: raise ValueError("Storage file or storage name is invalid.") def __exit__(self, exc_type, exc_value, traceback): """Close storer when :class:`.DatabaseStorageManager` is deleted.""" self._storer.close() def __init_manager(self): """This function initiates storage manager by making it active for working. It also loads metadata-db-info into memory which is required to let know other package _components about what kind of data is available in the storage file. Returns: Nothing but raises error if initiation was not success. """ try: self._db_info_cache = pd.read_hdf( self._hdf5_filepath, DATABASE_HDF5_STRUCT["metadata-db-info"] ) self._init_state = 1 if not self.initiate_memory_cache(): raise RuntimeError("Cannot initiate cache.") except TypeError: raise TypeError("Invalid _local file.") except: raise RuntimeError( "Failed to initiate manager. Reading HDF5 was not successful." ) return
[docs] def initiate_memory_cache(self, level: int = 1) -> bool: """Load various elements based on `level` from storage to the memory for rapid data access. Parameters ---------- level Level of data caching. Levels: - Level 1: Only loads inter index map to the memory. # Run by default - Level 2: Additionally load taxonomy-sheet to the memory - Level 3: Additionally load all map-elements to the memory - Level 4: Additionally load all tree-instance to the memory Returns ------- True level until which data was cached. """ ret = False if self._init_state == 1: tmp_storer = pd.HDFStore(self._hdf5_filepath, mode="r") if level >= 1: if ( self._db_info_cache["map-interx-taxon"] and self._db_info_cache["map-interx-repseq"] ): self._interxmap_cache["map-interx-repseq"] = tmp_storer.select( DATABASE_HDF5_STRUCT["map-interx-repseq"] ) self._interxmap_cache["map-interx-taxon"] = tmp_storer.select( DATABASE_HDF5_STRUCT["map-interx-taxon"] ) self._db_summary = tmp_storer.select( DATABASE_HDF5_STRUCT["metadata-db-summary"] ) ret = 1 if level >= 2: if self._db_info_cache["taxonomy-sheet"]: self._supplement_cache["taxonomy-sheet"] = tmp_storer.select( DATABASE_HDF5_STRUCT["taxonomy-sheet"] ) ret = 2 if level >= 3: for element_key in filter_elements_by( "map-", ["map-interx-repseq", "map-interx-taxon"] ): if self._db_info_cache[element_key]: self._supplement_cache[element_key] = tmp_storer.select( DATABASE_HDF5_STRUCT[element_key] ) ret = 3 tmp_storer.close() if level >= 4: if self._db_info_cache["tree-object"]: tmp_storer = tmp_storer.get_handle_by_element("tree-object") tree_object_bytes = tmp_storer.get_node( DATABASE_HDF5_STRUCT["tree-object"] ).read()[0] self._supplement_cache["tree-object"] = tree_object_bytes tmp_storer.close() ret = 4 return ret
[docs] @staticmethod def validate_storage(hdf5_filepath: str, storage_name: str) -> bool: """Validates storage :term:`hdf5` file. Parameters ---------- hdf5_filepath Path to :term:`hdf5` file storage_name Storage database name Returns ------- Validation result """ ret = False try: tmp_storer = tables.open_file(hdf5_filepath, mode="r") if tmp_storer.title == storage_name: title_list = [] path_list = [] for node in tmp_storer.walk_nodes(): if node._v_title != "": title_list.append(node._v_title) path_list.append(node._v_pathname) tmp_storer.close() title_verify = all( [title in title_list[1:] for title in DATABASE_HDF5_STRUCT.keys()] ) path_verify = all( [path in path_list[1:] for path in DATABASE_HDF5_STRUCT.values()] ) if title_verify and path_verify: ret = True else: tmp_storer.close() except: raise RuntimeError("Error during validating storage file.") return ret
[docs] def shutdown(self): """Shutdown the database and mark cache clean for garbage collector.""" try: self.__close_storer() self._db_info_cache = None self._interxmap_cache = { "map-interx-taxon": None, "map-interx-repseq": None, } self._supplement_cache = defaultdict(None) self._db_summary = None self._storer_state = False self._storer = None self._init_state = 0 except: raise RuntimeError("Error during closing _local.") return
def __close_storer(self): """This method closes :meth:`.storer` and releases the storage file for reopening.""" if self._storer_state: try: self._storer.close() # Same function for both pandas and tables. Might change in the future. except: raise RuntimeError("Storer cannot be closed.") self._storer_state = False return def __open_as_tables(self, mode: str = "r"): """Open the storage :term:`hdf5` file using :mod:`tables` Parameters ---------- mode Open file using mode """ if self._init_state: self.__close_storer() try: self._storer = tables.open_file(self._hdf5_filepath, mode=mode) except: raise RuntimeError("Storage file cannot be opened properly.") self._storer_mode = 1 self._storer_state = True else: raise RuntimeError("Storage manager must is not initiated.") return self._storer_state def __open_as_pandas(self, mode="r"): """Open the storage :term:`hdf5` file using :mod:`pandas` Parameters ---------- mode Open file using mode """ if self._init_state: self.__close_storer() try: self._storer = pd.HDFStore(self._hdf5_filepath, mode=mode) except: raise RuntimeError("Storage file cannot be opened properly.") self._storer_mode = 2 self._storer_state = True else: raise RuntimeError("Storage manager must is not initiated.") return self._storer_state def __build_storage(self): """This method builds new storage element and generates its internal structure. It also fill necessary defaults to make working storage file. Returns: Nothing but will raise error if something goes wrong. """ try: tmp_storer = tables.open_file( self._hdf5_filepath, mode="w", title=self._storage_name ) tmp_storer.create_group("/", "tre", title="root-tree") tmp_storer.create_group("/tre", "master", title="tree-prior") tmp_storer.create_group("/tre", "parsed", title="tree-parsed") tmp_storer.create_group("/tre", "pickled", title="tree-object") tmp_storer.create_vlarray( "/tre/master", "value", title="bytes", atom=tables.VLUnicodeAtom() ) tmp_storer.create_vlarray( "/tre/parsed", "value", title="bytes", atom=tables.VLUnicodeAtom() ) tmp_storer.create_vlarray( "/tre/pickled", "value", title="bytes", atom=tables.ObjectAtom() ) tmp_storer.create_group("/", "tax", title="root-taxonomy") tmp_storer.create_group("/tax", "master", title="taxonomy-prior") tmp_storer.create_group("/tax", "parsed", title="taxonomy-sheet") tmp_storer.create_group("/", "seq", title="root-sequence") tmp_storer.create_group("/seq", "reps", title="sequence-representative") tmp_storer.create_group("/seq", "algn", title="sequence-aligned") tmp_storer.create_group("/seq", "accs", title="sequence-accession") tmp_storer.create_group("/", "map", title="root-map") tmp_storer.create_group("/map", "interxtax", title="map-interx-taxon") tmp_storer.create_group("/map", "interxreps", title="map-interx-repseq") tmp_storer.create_group("/map", "reptid", title="map-rep2tid") tmp_storer.create_group("/map", "repseq", title="map-repseq") tmp_storer.create_group("/map", "tree", title="map-tree") tmp_storer.create_group("/", "stat", title="root-stats") tmp_storer.create_group("/stat", "reps", title="stat-reps") tmp_storer.create_group("/stat", "taxs", title="stat-taxs") tmp_storer.create_group("/", "meta", title="root-metadata") tmp_storer.create_group("/meta", "summary", title="metadata-db-summary") tmp_storer.create_group("/meta", "info", title="metadata-db-info") tmp_storer.create_group("/meta", "stamp", title="metadata-db-stamp") tmp_storer.create_group("/meta", "history", title="metadata-db-history") tmp_element_key_defaults = [ "tree-parsed", "tree-prior", "tree-object", "taxonomy-prior", "taxonomy-sheet", "sequence-representative", "sequence-aligned", "sequence-accession", "metadata-db-summary", "metadata-db-stamp", "map-interx-taxon", "map-interx-repseq", "map-rep2tid", "map-repseq", "map-tree", "stat-reps", "stat-taxs", ] tmp_storer.close() tmp_storer = pd.HDFStore(self._hdf5_filepath, mode="a") tmp_element_defaults_series = pd.Series(False, tmp_element_key_defaults) tmp_storer.put( DATABASE_HDF5_STRUCT["metadata-db-info"], tmp_element_defaults_series, format="fixed", ) tmp_storer.get_node( DATABASE_HDF5_STRUCT["metadata-db-info"] )._v_title = "metadata-db-info" tmp_storer.close() self._db_info_cache = pd.Series(False, tmp_element_key_defaults) self._init_state = -1 except: raise RuntimeError("Error creating new HDF5 storage file.") return
[docs] def commit_to_storage(self, element_key, product_generator): """This is a primary function that commit changes to the storage. Parameters ---------- element_key element key to which product product must be put. product_generator Primary generator that yields output that can be put into storage element. All product generators and must follow following output rules. For `sequence-master` and `sequence-aligned` generator must first yield (`product_inits`, `product_generator_first_chunk`). Where `product_inits` contain data such as `expectedrows` or `min_itemsize`, which are required if product processes in chunks. Following yield must be `product_product_chunk`. For all others, generator must first yield `product_inits`, `None`. Following yield must be `product_product` Note: Not all product generators are processed same way. For more details, view product documentation. Returns ------- Last result from generator if success. Otherwise RuntimeError is raised. """ if self._init_state == -1: print("Starting to process [{}]".format(element_key)) last_product = self.__put_to_storage(element_key, product_generator) if last_product is not None: self._db_info_cache.loc[element_key] = True print("Successfully committed [{}]".format(element_key)) return last_product else: raise RuntimeError( "Data base was already stamped so it is not possible to commit again." )
def __put_to_storage(self, element_key: str, product_generator: Generator) -> Any: """This is main function that performs actual commit to the storage. Parameters ---------- element_key Storage element key to put data into product_generator Data generator that will be put into storage element Returns ------- Return last product chunk that was put to storage. """ ret = None try: if element_key in ["sequence-representative", "sequence-aligned"]: if self.__open_as_pandas("a"): product_inits, product_generator_first_chunk = next( product_generator ) max_rows = product_inits.pop("max_rows") self._storer.put( DATABASE_HDF5_STRUCT[element_key], product_generator_first_chunk, format="table", data_columns=True, min_itemsize=product_inits, index=False, ) last_chunk = product_generator_first_chunk for next_chunk in product_generator: self._storer.append( DATABASE_HDF5_STRUCT[element_key], next_chunk, format="table", data_columns=True, min_itemsize=product_inits, expectedrows=max_rows, index=False, ) last_chunk = next_chunk self._storer.get_node( DATABASE_HDF5_STRUCT[element_key] )._v_title = element_key self._storer.create_table_index( DATABASE_HDF5_STRUCT[element_key], columns=["index"], optlevel=9, kind="full", ) ret = last_chunk elif element_key in [ "sequence-accession", "taxonomy-prior", "taxonomy-sheet", "metadata-db-summary", "map-rep2tid", "map-repseq", "metadata-db-history", "stat-reps", "stat-taxs", ]: if self.__open_as_pandas("a"): product_inits, product_generator_first_chunk = next( product_generator ) if (product_inits is not None) and ( product_generator_first_chunk is not None ): max_rows = product_inits.pop("max_rows") x_columns = product_inits.pop("index_columns") min_itemsize = product_inits if len(product_inits) > 0 else None self._storer.put( DATABASE_HDF5_STRUCT[element_key], product_generator_first_chunk, format="table", data_columns=True, min_itemsize=min_itemsize, index=False, ) last_chunk = product_generator_first_chunk for next_chunk in product_generator: self._storer.append( DATABASE_HDF5_STRUCT[element_key], next_chunk, format="table", data_columns=True, min_itemsize=min_itemsize, expectedrows=max_rows, index=False, ) last_chunk = next_chunk self._storer.get_node( DATABASE_HDF5_STRUCT[element_key] )._v_title = element_key self._storer.create_table_index( DATABASE_HDF5_STRUCT[element_key], columns=x_columns, optlevel=9, kind="full", ) ret = last_chunk else: product_product_whole = next(product_generator) self._storer.put( DATABASE_HDF5_STRUCT[element_key], product_product_whole, format="table", data_columns=True, ) self._storer.get_node( DATABASE_HDF5_STRUCT[element_key] )._v_title = element_key ret = product_product_whole elif element_key in ["tree-prior", "tree-parsed"]: if self.__open_as_tables("a"): _, _ = next(product_generator) product_product_whole = next(product_generator) product_product_whole_utf8 = product_product_whole.encode("utf-8") self._storer.get_node(DATABASE_HDF5_STRUCT[element_key]).append( product_product_whole_utf8 ) ret = product_product_whole elif element_key == "tree-object": if self.__open_as_tables("a"): _, _ = next(product_generator) product_product_whole = next(product_generator) product_product_whole_bytes = pickle.dumps(product_product_whole) self._storer.get_node(DATABASE_HDF5_STRUCT[element_key]).append( product_product_whole_bytes ) ret = product_product_whole elif element_key in ["map-interx-taxon", "map-interx-repseq", "map-tree"]: if self.__open_as_pandas("a"): _, _ = next(product_generator) product_product_whole = next(product_generator) self._storer.put( DATABASE_HDF5_STRUCT[element_key], product_product_whole, format="fixed", data_columns=True, ) self._storer.get_node( DATABASE_HDF5_STRUCT[element_key] )._v_title = element_key ret = product_product_whole except: raise RuntimeError("Error while processing [{}]".format(element_key)) return ret
[docs] def imprint_database(self, stamp_dict: dict): """This is the final function that _local constructor must call. This function will add signature to the storage file and will lock it so that no changes can be performed. Locking is performed only stamp presence check via storage manager. Parameters ---------- stamp_dict Stamp data to imprint into storage file. """ if self._init_state == -1: if not ( self._db_info_cache["map-interx-taxon"] and self._db_info_cache["map-interx-repseq"] ): raise ValueError( "Cannot imprint _local without `map-interx-taxon` and `map-interx-repseq` elements." ) if self.__open_as_tables("r"): title_list = [] for node in self._storer.walk_nodes(): if ( node._v_title in DATABASE_HDF5_STRUCT.keys() and not node._v_title.startswith("root-") ): if node._v_nchildren > 0: if node._v_children.values()[0]._v_title == "bytes": if node._v_children.values()[0].nrows > 0: title_list.append(node._v_title) else: title_list.append(node._v_title) if ( self._db_info_cache.index.isin(title_list).sum() != self._db_info_cache.sum() ): raise RuntimeError( "Cannot imprint. Possible storage file corruption." ) self.__open_as_pandas("a") self._db_info_cache["metadata-db-stamp"] = True self._storer.put( DATABASE_HDF5_STRUCT["metadata-db-info"], self._db_info_cache, format="fixed", ) self._storer.get_node( DATABASE_HDF5_STRUCT["metadata-db-info"] )._v_title = "metadata-db-info" self._storer.put( DATABASE_HDF5_STRUCT["metadata-db-stamp"], pd.Series(stamp_dict), format="fixed", ) self._storer.get_node( DATABASE_HDF5_STRUCT["metadata-db-stamp"] )._v_title = "metadata-db-stamp" return
def __set_handle_by_element(self, element_key: str): """Re-assign :attr:`.storer` handle based on `element_key`""" storer_mode = get_element_mode(element_key) if not ((storer_mode == self._storer_mode) and self._storer_state): if storer_mode == 2: self.__open_as_pandas() elif storer_mode == 1: self.__open_as_tables() return self._storer_state def __get_coordinates_for_element_by_ids( self, element_key: str, ids: AnyGenericIdentifier ) -> pd.Index: """Retrieve cached index coordianates for target `ids` and `element_key` Parameters ---------- element_key Target storage element ids Target ids of storage element Returns ------- :class:`~pandas.Index` with coordinates of target `ids` for target storage elementof target `ids` for target storage element """ tmp_interxmap_type = get_element_index_type(element_key) if self._init_state == 1: valid_ids = self._interxmap_cache[tmp_interxmap_type].index.isin(ids) if valid_ids.sum() == len(ids): tmp_target_index_map = self._interxmap_cache[tmp_interxmap_type][ element_key ][valid_ids] ret = pd.Index(tmp_target_index_map.values) else: raise ValueError("Invalid identifiers are provided.") else: index_coord = self._storer.select_column( DATABASE_HDF5_STRUCT[element_key], "index" ) ret = index_coord[index_coord.isin(ids)].index return ret
[docs] def get_index_by_element( self, element_key: str, condition: Optional[str] = None ) -> pd.Index: """Get index of target storage element. Parameters ---------- element_key Storage element's key condition Custom :mod:`tables` conditions. Returns ------- Index of target storage element at given conditions. """ if self._init_state: if self._db_info_cache[element_key] or self._init_state == -1: if get_element_mode(element_key) == 2: self.__set_handle_by_element(element_key) if condition is None: return self._storer.select_column( DATABASE_HDF5_STRUCT[element_key], "index" ) else: if isinstance(condition, str): return self._storer.select( DATABASE_HDF5_STRUCT[element_key], "columns == index & {}".format(condition), ) else: raise ValueError( "Invalid condition parameter is provided. `condition` must have string type." ) else: raise ValueError("Requested element requested does not have index.") else: raise ValueError("Invalid element requested.") else: raise RuntimeError("Storage manager must be initiated.")
[docs] def retrieve_data_by_element( self, element_key, columns=None, chunksize=None ) -> Any: """Retrieves data from storage element as whole or in chunks. Parameters ---------- element_key Target storage element columns Target columns of table to retrieve chunksize Size of chunks to split retrieval or None to retrieve as whole. Returns ------- Depends on input parameters. If chunked the return generator, if not then depends on type of storage element. """ def fixed_product_generator(chunk_iter): for product_chunk in chunk_iter: yield self.__fix_table(element_key, product_chunk) if self._init_state: if self._db_info_cache[element_key]: if get_element_mode(element_key) == 2: if (chunksize is not None) and get_element_type( element_key ) != "table": raise ValueError( "Error! Attempt to use `chunksize` on fixed tables." ) if self._supplement_cache.get(element_key, None) is None: self.__set_handle_by_element(element_key) if columns is None: product = self._storer.select( DATABASE_HDF5_STRUCT[element_key], chunksize=chunksize ) else: product = self._storer.select( DATABASE_HDF5_STRUCT[element_key], "columns in ['index', {}]".format( ", ".join( ["'{}'".format(column) for column in columns] ) ), chunksize=chunksize, ) return ( self.__fix_table(element_key, product) if chunksize is None else fixed_product_generator(product) ) else: if columns is None: return self.__fix_table( element_key, self._supplement_cache[element_key] ) else: return self.__fix_table( element_key, self._supplement_cache[element_key][columns], ) else: if self._supplement_cache.get(element_key, None) is None: self.__set_handle_by_element(element_key) return self._storer.get_node( DATABASE_HDF5_STRUCT[element_key] ).read()[0] else: return self._supplement_cache[element_key] else: raise ValueError("Invalid element requested.") else: raise RuntimeError("Storage manager must be initiated.")
[docs] def get_element_data_by_ids(self, element_key, ids): """Get partial data from storage element by `ids`. Parameters ---------- element_key Target storage element ids Target identifiers to retrieve data for Returns ------- Depends on storage element type. """ if self._init_state: if self._db_info_cache[element_key]: target_ids = np.asarray(ids) if (get_element_mode(element_key) == 2) and ( get_element_index_type(element_key) != None ): if self._supplement_cache.get(element_key, None) is None: self.__set_handle_by_element(element_key) coord = self.__get_coordinates_for_element_by_ids( element_key, target_ids ) product = self._storer.select( DATABASE_HDF5_STRUCT[element_key], coord ) return self.__fix_table(element_key, product) else: valid_ids = self._supplement_cache[element_key].index.isin( target_ids ) if valid_ids.sum() == len(target_ids): return self.__fix_table( element_key, self._supplement_cache[element_key].loc[target_ids], ) else: raise ValueError("Invalid identifiers provided.") else: raise ValueError( "Required element was not intended to be used via indexing. Use `retrieve_data_by_element` instead." ) else: raise ValueError("Invalid element requested.") else: raise RuntimeError("Storage manager must be initiated.")
[docs] def compress_storage( self, complevel: int = 9, complib: str = "blosc", overwrite: bool = False ) -> bool: """Compresses the storage file using the required parameters. Parameters ---------- complevel Compression level. Higher value will produce better compressions but will be processed for longer time. complib Compression method. For details look into :mod:`tables` documentation overwrite Whether to overwrite the file if it exists. Returns ------- True if successful compressions, False otherwise. """ ret = False if self._init_state: if path.exists(self._hdf5_filepath) and isinstance(complevel, int): if complevel < 10 and complevel > 0: import subprocess import os self.__open_as_tables() original_title = self._storer.title self.shutdown() hdf5_abs_path = path.abspath(self._hdf5_filepath) hdf5_dir_path = path.dirname(hdf5_abs_path) hdf5_basename = path.basename(hdf5_abs_path) hdf5_filename = path.splitext(hdf5_basename)[0] hdf5_extension = path.splitext(hdf5_basename)[1] compressed_hdf5_basename = "{}.compressed{}".format( hdf5_filename, hdf5_extension ) compressed_hdf5_abs_path = "{}/{}".format( hdf5_dir_path, compressed_hdf5_basename ) ptrepack_cmd = "ptrepack --dest-title={} --chunkshape=auto --propindexes --complevel={} --complib={}".format( original_title, str(complevel), complib ).split() ptrepack_cmd.extend([hdf5_abs_path, compressed_hdf5_abs_path]) process = subprocess.Popen( ptrepack_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE ) output, error = process.communicate() output = output.decode() if isinstance(output, bytes) else output error = error.decode() if isinstance(error, bytes) else error process.kill() if output == "": if overwrite: os.remove(hdf5_abs_path) os.rename(compressed_hdf5_abs_path, hdf5_abs_path) ret = True else: ret = True if error.find("."): if error.strip(".") != "": print(error) return ret
@classmethod def __fix_table(cls, element_key, target_df): """Fix the table with missing values.""" if element_key == "map-rep2tid": return target_df else: return missing_to_none(target_df) @property def state(self): """State of the storage manager. True of :attr:`.storer` is assigned. """ return self._init_state @property def active_elements(self): """Active storage elements that can be used.""" return self._db_info_cache[self._db_info_cache == True].index.tolist() @property def element_state(self): """Cached states per storage element.""" return self._db_info_cache @property def summary(self): """Cached storage database summary table.""" return self._db_summary @property def taxon_ids(self) -> pd.Index: """The :term:`tids` present in the storage.""" if self._interxmap_cache["map-interx-taxon"] is not None: return self._interxmap_cache["map-interx-taxon"].index else: RuntimeError("Storage is closed/broken.") @property def repseq_ids(self) -> pd.Index: """The :term:`rids` present in the storage.""" if self._interxmap_cache["map-interx-repseq"] is not None: return self._interxmap_cache["map-interx-repseq"].index else: RuntimeError("Storage is closed/broken.") @property def storage_name(self): """Storage database name/label.""" return self._storage_name @property def hdf5_filepath(self): """Path to :term:`hdf5` file.""" return path.abspath(self._hdf5_filepath) @property def has_repseq(self): """True if database storage contains reference sequences, False otherwise.""" return self._db_info_cache["sequence-representative"] @property def has_align(self): """True if database storage contains alignments, False otherwise.""" return self._db_info_cache["sequence-aligned"] @property def has_accs(self): """True if database storage contains sequence accessions, False otherwise.""" return self._db_info_cache["sequence-accession"] @property def has_tree(self): """True if database storage contains reference phylogeny, False otherwise.""" return self._db_info_cache["tree-parsed"] @property def has_tax(self): """True if database storage contains reference taxonomy, False otherwise.""" return self._db_info_cache["taxonomy-sheet"]