Source code for babao.inputs.inputBase
"""Base class for any input"""
from abc import ABC, abstractmethod
from typing import List
import pandas as pd
import babao.config as conf
import babao.utils.date as du
import babao.utils.file as fu
import babao.utils.log as log
INPUTS = [] # type: List[ABCInput]
LAST_WRITE = 0 # TODO: this is a stupid idea, bugs incoming!
REAL_TIME_LOOKBACK_DAYS = 7 # TODO: infere this from models/graph
CACHE_REAL_TIME_LOOKBACK_DAYS = REAL_TIME_LOOKBACK_DAYS * 4
TRAIN_TEST_RATIO = 3 / 4
SPLIT_DATE = int(
du.EPOCH + (du.TIME_TRAVELER.getTime(force=True) - du.EPOCH)
* TRAIN_TEST_RATIO
)
[docs]def resampleSerie(s):
"""
Call Serie.resample on s with preset parameters
(the serie's index must be datetime)
"""
# TODO: would be nice to do the base init once for all features
# (ensure sync and save some computing)
# also don't convert date or do it in utils.date
base = du.toDatetime(LAST_WRITE)
base = (base.minute + (base.second + 1) / 60) % 60
return s.resample(
str(conf.TIME_INTERVAL) + "Min",
closed="right",
label="right",
base=base
)
[docs]class ABCInput(ABC):
"""
Base class for any input
Your subclass should at least implement:
* fetch : self -> DataFrame
* raw_columns : List[str]
And eventually: (if you want self.resample to works)
* _resample : self -> DataFrame -> DataFrame
* fillMissing : self -> DataFrame -> DataFrame
* resampled_columns : List[str]
(cf. specific method doc-string in this class)
"""
@property
@abstractmethod
def raw_columns(self) -> List[str]:
"""
The columns names of your raw data
(as fetched and stored in database)
"""
pass
@property
@abstractmethod
def resampled_columns(self) -> List[str]:
"""The columns names of your resampled data (from raw data)"""
pass
def __init__(self):
self.up_to_date = True
self.current_row = None
self._cache_data = None
if conf.CURRENT_COMMAND == "train":
self.cache()
elif conf.CURRENT_COMMAND == "backtest":
self.cache(
since=SPLIT_DATE, till=du.TIME_TRAVELER.getTime(force=True)
)
else: # real-time
last_entry = fu.getLastRows(self.__class__.__name__, 1)
if not last_entry.empty:
du.TIME_TRAVELER.setTime(last_entry.index[0])
since = du.TIME_TRAVELER.nowMinus(
days=CACHE_REAL_TIME_LOOKBACK_DAYS
)
du.TIME_TRAVELER.setTime(None)
self.cache(since=since)
[docs] def write(self, raw_data):
"""Write the given raw_data to the database, and cache it if needed"""
if raw_data is None or raw_data.empty:
return None
if not fu.write(self.__class__.__name__, raw_data):
log.warning(
"Couldn't write to database frame '"
+ self.__class__.__name__ + "'"
)
return False
self.cache(fresh_data=raw_data)
return True
def _readFromCache(self, since=None, till=None):
"""Read data in cache from ´since´ to ´till´"""
if self._cache_data.empty:
return self._cache_data
return self._cache_data.loc[since:till]
def _readFromFile(self, since=None, till=None):
"""Read data in database from ´since´ to ´till´"""
where = None
if since is not None:
where = "index > %d" % since
if till is not None:
where += " & index < %d" % till
return fu.read(self.__class__.__name__, where=where)
[docs] def read(self, since=None, till=None):
"""Read data in database or cache from ´since´ to ´till´"""
if since is None:
since = du.EPOCH
now = du.TIME_TRAVELER.getTime()
if till is None or till > now:
till = now
if self._cache_data is not None:
return self._readFromCache(since, till)
return self._readFromFile(since, till)
[docs] def cache(self, fresh_data=None, since=None, till=None):
"""
Save some data to cache
If ´fresh_data´ is given, append it to cache,
otherwise read in database from ´since´ to ´till´ and cache it
"""
if fresh_data is not None:
self._cache_data = self._cache_data.append(
fresh_data
)
if not self._cache_data.empty:
self._cache_data = self._cache_data.loc[
self._cache_data.index[-1]
- du.secToNano(CACHE_REAL_TIME_LOOKBACK_DAYS * 24 * 3600):
]
else:
log.debug(
"Caching data from", du.toStr(since), "to", du.toStr(till),
"(" + self.__class__.__name__ + ")"
)
self._cache_data = self._readFromFile(since, till)
if not self._cache_data.empty:
self.updateCurrentRow(self._cache_data.iloc[-1])
else:
log.warning("Database '" + self.__class__.__name__ + "' is emtpy")
self._cache_data = pd.DataFrame(columns=self.raw_columns)
[docs] def refreshCache(self):
"""Make sure the cache is up to date"""
if self._cache_data.empty:
since = None
else:
since = self._cache_data.index[-1]
fresh_data = self._readFromFile(since)
if not fresh_data.empty:
self.cache(fresh_data=fresh_data)
[docs] def resample(self, raw_data):
"""
Return the DataFrame ´raw_data´ as a continuous time-serie
This is a wrapper around _resample and fillMissing
"""
if raw_data.empty:
return pd.DataFrame(columns=self.resampled_columns)
du.toDatetime(raw_data)
resampled_data = self._resample(raw_data)
resampled_data = self.fillMissing(resampled_data)
du.toTimestamp(raw_data)
du.toTimestamp(resampled_data)
return resampled_data
[docs] def updateCurrentRow(self, current_row=None, timestamp=None):
"""Update the property self.current_row, useful for time travel"""
global LAST_WRITE
if timestamp is not None: # time travel
if "Ledger" in self.__class__.__name__:
return # we're going to the future
current_row = self._readFromCache(
since=timestamp,
till=timestamp + du.secToNano(12 * 3600)
# assuming there is at least one row per day?
)
if not current_row.empty:
current_row = current_row.iloc[0]
if not current_row.empty:
self.current_row = current_row
if timestamp is not None: # time travel
LAST_WRITE = current_row.name
else:
LAST_WRITE = max(LAST_WRITE, current_row.name)
else: # spammy
log.warning(
"Couldn't update current row for database '"
+ self.__class__.__name__ + "'"
)
[docs] @abstractmethod
def fetch(self):
"""
Return a time-serie DataFrame fetched from the internets
This data will be stored on database for later use (and eventual
resampling).
Data can be continuous.
Index must be nanosecond timestamps.
"""
raise NotImplementedError(
"Your Input class '%s' should implement a 'fetch' method :/"
% self.__class__.__name__
)
def _resample(self, raw_data):
"""
Return the DataFrame ´raw_data´ as a continuous time-serie
´raw_data´ is a DataFrame with the same columns as the ones returned
by ´fetch´.
You should use the helper function ´resampleSerie´ on the desired
columns of your discrete ´raw_data´ to generate continuous data.
"""
raise NotImplementedError(
"Your Input class '%s' should implement a '_resample' method :/"
% self.__class__.__name__
)
[docs] def fillMissing(self, resampled_data):
"""
Fill missing values (np.nan/np.inf) in ´resampled_data´
"""
raise NotImplementedError(
"Your Input class '%s' should implement a 'fillMissing' method :/"
% self.__class__.__name__
)