Source code for hydrobox.preprocessing.scale

"""
The scale module combines low-level aggregation functionality for time series
data. The single functions operate on single time series instances.
All functions aiming on aggregating multi-dimensional data or multiple time
series are have a preceding 'm' in their function name.
"""
from datetime import datetime

import numpy as np
import pandas as pd

from hydrobox.utils.decorators import accept


[docs]@accept(x=(pd.Series, pd.DataFrame), by=(str, 'None'), func=(str, 'callable')) def aggregate(x, by, func='mean'): """Time series aggregation This function version will only operate on a single ``pandas.Series`` or ``pandas.DataFrame`` instance. It has to be indexed by a `pandas.DatetimeIndex`. The input data will be aggregated to the given frequency by passing a `pandas.Grouper` conform string argument specifying the desired period like: '1M' for one month or '3Y-Sep' for three years starting at the first of October. Parameters ---------- x: ``pandas.Series``, ``pandas.DataFrame`` The input data, will be aggregated over the index. by : string Specifies the desired temporal resolution. Will be passed as ``freq`` argument of a ``pandas.Grouper`` object for grouping the data into the new resolution. If by is ``None``, the whole Series will be aggregated to only one value. The same applies to ``by='all'``. func : string Function identifier used for aggregation. Has to be importable from ``numpy``. The function must accept n input values and aggregate them to only a single one. Returns ------- pandas.Series : if x was of type ``pandas.Series`` pandas.DataFrame : if c was of type ``pandas.DataFrame`` """ # check for being a time series if not isinstance(x.index, pd.DatetimeIndex) \ and not (by is None or by == 'all'): raise ValueError('The data has to be indexed by a DatetimeIndex.') if by is not None and by == 'all': by = None # get the function if callable(func): f = func else: try: f = getattr(np, func) except AttributeError: raise ValueError('The function %s cannot be imported. the \ aggregation function has to be importable \ from numpy.' % func) if by is None: return x.aggregate(f) else: return x.groupby(pd.Grouper(freq=by)).aggregate(f)
[docs]@accept( x=(pd.Series, pd.DataFrame), start=(str, datetime, 'None'), stop=(str, datetime, 'None')) def cut_period(x, start, stop): """Truncate Time series Truncates a ``pandas.Series`` or ``pandas.DataFrame`` to the given period. The start and stop parameter need to be either a string or a ``datetime.datetime``, which will then be converted. Returns the truncated time series. Parameters ---------- x : ``pandas.Series``, ``pandas.DataFrame`` The input data, will be truncated start : string, datetime Begin of truncation. Can be a ``datetime.datetime`` or a string. If a string is passed, it has to use the format 'YYYYMMDDhhmmss', where the time component 'hhmmss' can be omitted. stop : string, datetime, End of truncation. Can be a ``datetime.datetime`` or a string. If a string is passed, it has to use the format 'YYYYMMDDhhmmss', where the time component 'hhmmss' can be omitted. """ # check for being a time series if not isinstance(x.index, pd.DatetimeIndex): raise ValueError('The data has to be indexed by a DatetimeIndex.') if isinstance(start, datetime): start = start.strftime('%Y%m%d%H%M%S') if isinstance(stop, datetime): stop = stop.strftime('%Y%m%d%H%M%S') return x[start:stop].copy()