Source code for babao.utils.file

"""Some utils functions for hdf handling"""

import pandas as pd

LOCK = None
STORE = None


[docs]def setLock(lock): """Store the given ´lock´ object for later use in database processing""" global LOCK LOCK = lock
[docs]def initStore(filename): """Open the hdf database from ´filename´""" global STORE STORE = pd.HDFStore(filename) # complevel=9, complib='blosc'
[docs]def closeStore(): """Close the hdf database""" if STORE is not None and STORE.is_open: STORE.close()
[docs]def maintenance(): """ Maintenance routine on the hdf database Create table index for each table, and make sure everything is sorted. """ if STORE is not None and STORE.is_open: for k in STORE.keys(): df = STORE.get(k) if not df.index.is_monotonic_increasing: print(k, "is NOT sorted!!!! Database fucked up :/") df.sort_index(inplace=True) STORE.append(k, df, append=False) # I *love* this argument STORE.create_table_index(k, optlevel=9, kind='full')
[docs]def delete(frame): """ Remove the given ´frame´ entry (key) from the hdf database Thread Safe! """ ret = True if STORE.is_open: close_me = False store = STORE else: close_me = True store = pd.HDFStore(STORE.filename) if LOCK is not None: LOCK.acquire_write() try: del store[frame] except KeyError: ret = False finally: if close_me: STORE.close() if LOCK is not None: LOCK.release() return ret
[docs]def write(frame, df): """ Append the given ´df´ dataframe to the ´frame´ entry (key) in the hdf database Thread Safe! """ ret = True if LOCK is not None: LOCK.acquire_write() try: if not STORE.is_open: with pd.HDFStore(STORE.filename) as store: store.append(frame, df) else: STORE.append(frame, df) except RuntimeError: # HDF5ExtError ret = False finally: if LOCK is not None: LOCK.release() return ret
[docs]def read(frame, where=None): """ Read a ´frame´ (key) from the hdf database ´where´ can be use to specify search criteria. Thread Safe! """ if LOCK is not None: LOCK.acquire_read() try: if not STORE.is_open: # graph proc will close store, to avoid cache ret = pd.read_hdf(STORE.filename, frame, where=where) elif where is None: ret = STORE.get(frame) else: ret = STORE.select(frame, where) except (KeyError, FileNotFoundError): ret = pd.DataFrame() finally: if LOCK is not None: LOCK.release() return ret
[docs]def getLastRows(frame, nrows): """ Return ´nrows´ rows from a ´frame´ (key) in the hdf database as a DataFrame """ if LOCK is not None: LOCK.acquire_read() try: if not STORE.is_open: with pd.HDFStore(STORE.filename) as store: frame_len = store.get_storer(frame).nrows ret = store.select(frame, start=frame_len - nrows) else: frame_len = STORE.get_storer(frame).nrows ret = STORE.select(frame, start=frame_len - nrows) except (KeyError, FileNotFoundError, AttributeError): ret = pd.DataFrame() finally: if LOCK is not None: LOCK.release() return ret