Source code for drain.aggregate

"""Classes that facilitate transformation and aggregation of dataframes.

This module provides wrappers to simplify grouping and aggregation
of dataframes by index, and to consequently perform arithmetics on
the aggregated dataframes.

Examples:

    The following code would take a Pandas dataframe ``df``, and produce
    a dataframe ``res_df`` that is indexed by ``name``. The result would have
    a column ``count`` that gives the number of rows per ``name``; a column
    that gives the sum of ``score`` per ``name``, divided by the sum of
    ``arrests`` per ``name``; and finally two columns that give the sum
    of ``score`` per ``name``, and the sum of squared ``score`` per ``name``::

        aggregates = [Count(),
                      Proportion('score','arrests'),
                      Aggregate(['score', lambda x: x.score**2],'sum')
                      ]
        res_df = Aggregator(df, aggregates).aggregate('name')

    Note how we specify all definitions before performing the actual
    aggregation, and how ``Aggregator.aggregate()`` then takes an
    ``index`` to group by.

Aggregator also caches individual transformations of columns, as to
reduce redundant calculations.

Classes that endusers interface with are Aggregate, Fraction,
Count, and Proportion (all of which specify outcome columns and row-wise
aggregation functions), and Aggregator (which takes an input dataframe and an
index by which rows are being grouped).

"""

from itertools import product
from six import string_types

import pandas as pd
import numpy as np

from . import util
from .data import Column


[docs]class ColumnReduction(object): """Wraps and hashes a `Column` together with a function that aggregates across rows. """ def __init__(self, column, agg_func): """Args: column (Column): The column that will be aggregated across rows. agg_func (Function): The function that will be used to aggregate rows, once the Column has been grouped by some index. """ if not isinstance(column, Column): raise ValueError("ColumnReduction needs a Column") self.column = column self.agg_func = agg_func # use float32 by default for string agg functions except 'nunique', # e.g. 'min', 'median', 'max', etc. if isinstance(agg_func, string_types) and\ agg_func != 'nunique' and\ self.column.astype is None: self.column.astype = np.float32 def __hash__(self): return hash((self.column, self.agg_func)) def __eq__(self, other):
return hash(self) == hash(other)
[docs]class ColumnFunction(object): """Abstract base class for functions on reduced Columns; names the outcomes. Having obtained Columns that have been created and aggregated along rows, with the same index - ColumnReductions, in other words - we might want to perform arithmetics on these ColumnReductions, such as element-wise division of one by the other. These transformations are handled by ColumnFunctions. Note: ColumnFunction guarantees an `apply_and_name(aggregator)`. This is the lowest-level function in this module that actually returns 'populated' DataFrames (as provided by the aggregator). Children of this class thus include functions on pairs of popoulated dataframes, such as division and addition. """ def __init__(self, column_reductions, names): """Args: column_reductions (list[ColumnReduction]): List of input ColumnReductions. names (list[str]): A list of strings. Its length must be equal to the number of pd.Series' that `apply_and_name()` produces. """ for cr in column_reductions: if not isinstance(cr, ColumnReduction): raise ValueError( "ColumnFunction requires a list of ColumnReductions; %s is not one." % repr(cr)) self.column_reductions = column_reductions self.names = names
[docs] def apply_and_name(self, aggregator): """Fetches the row-aggregated input columns for this ColumnFunction. Args: aggregator (Aggregator) Returns: pd.DataFrame: The dataframe has columns with names self.names that were created by this ColumnFunction, and is indexed by the index that was passed to aggregator.aggregate(index). """ reduced_df = self._apply(aggregator) if len(self.names) != len(reduced_df.columns): raise IndexError("ColumnFunction creates more columns than it has names for.") reduced_df.columns = self.names
return reduced_df def _apply(self, aggregator): """Abstract function for the actual transformation. Note: Don't call this; it gets called by `self.apply_and_name()`. Args: aggregator(Aggregator) """ raise NotImplementedError def __div__(self, other):
return Fraction(numerator=self, denominator=other)
[docs]class ColumnIdentity(ColumnFunction): """The simplest non-abstract ColumnFunction. """ def __init__(self, column_reductions, names): ColumnFunction.__init__(self, column_reductions, names) def _apply(self, aggregator): """ Returns: pd.DataFrame: A dataframe with self.column_reductions as columns, where the column names are self.names. """
return aggregator.get_reduced(self.column_reductions)
[docs]class Aggregate(ColumnIdentity): """A highly convenient wrapper around ColumnReductions. Example:: Aggregate(['arrests','income','age'], ['min','max','mean']) This would create 3x3 columns: arrests_min, arrests_max, arrests_mean, and so on. """ def __init__(self, column_def, agg_func, name=None, fname=None, astype=None): """ Args: column_def: List of (or a single) column definitions, as accepted by Column. agg_func: List of (or a single) row-wise aggregation function (will be passed to Panda's groupby.agg()) name: List of strings of names for the column_defintions. Must be of same length as column_def, or be None, in which case the names default to the string representation of the each column definition. fname: List of strings of names for the aggregation functions. Must be of same length as agg_func or be None, in which case the names default to the string representation of the each aggregation function. astype: List of pandas dtypes, which gets passed to Column together with the column definitions. Must be of same length as column_def or be None, in which case no casting will be performed. Names for the resulting column reductions are of the format `columnname_functionname`. """ # make list of column reductions from arguments # each ColumnReduction needs a Column(definition) and an agg_func column_def = util.make_list(column_def) agg_func = util.make_list(agg_func) astype = util.make_list(astype) if len(astype) == 1: astype = astype*len(column_def) if len(astype) != len(column_def): raise ValueError("astype must be a datatype, or a list of datatypes") column_reductions = [ColumnReduction(Column(defn, at), func) for (defn, at), func in product(zip(column_def, astype), agg_func)] # make list of names from the arguments # same length as the list of column reductions above! name = [str(c) for c in column_def] if name is None else util.make_list(name) if len(name) != len(column_def): raise ValueError("name and column_def must be same length, or name must be None.") fname = [str(a) for a in agg_func] if fname is None else util.make_list(fname) if len(fname) != len(agg_func): raise ValueError("fname and agg_func must be same length, or fname must be None.") # if there's only on agg_func, you can override the function-naming-schema, # by passing fname=False; then we only use the name if len(agg_func) == 1 and fname == [False]: column_names = name else: column_names = ['%s_%s' % (cn, fn) for cn, fn in product(name, fname)]
ColumnFunction.__init__(self, column_reductions, column_names)
[docs]class Aggregator(object): """Binds column functions to a dataframe and allows for aggregation by a given index. """ def __init__(self, df, column_functions): """ Args: df (pd.DataFrame): A dataframe to apply column functions to, and which will be aggregated. column_functions (list[ColumnFunction]): ColumnFunctions that will be applied to the dataframe. """ self.df = df self.column_functions = column_functions # unique column reductions from all the column functions self.column_reductions = set([cr for cf in column_functions for cr in cf.column_reductions]) # unique columns across all the column reductions self.columns = set([c.column for c in self.column_reductions]) # dataframe of the unique, populated columns # with the column objects as the dataframe's column names self.col_df = pd.DataFrame({col: col.apply(df) for col in self.columns})
[docs] def get_reduced(self, column_reductions): """This function gets called by ColumnFunction._apply(). After a ColumnFunction has been passed to Aggregator's constructor, the ColumnFunction can use this function to request the populated, aggregated columns that correspond to its ColumnReductions. Args: column_reduction (list[ColumnReduction]) Returns: pd.DataFrame: A dataframe, where the column names are ColumnReductions. """ for cr in column_reductions: if cr not in self.column_reductions: raise ValueError("Column reduction %r is not known to this Aggregator!" % cr)
return self.reduced_df[column_reductions]
[docs] def aggregate(self, index): """Performs a groupby of the unique Columns by index, as constructed from self.df. Args: index (str, or pd.Index): Index or column name of self.df. Returns: pd.DataFrame: A dataframe, aggregated by index, that contains the result of the various ColumnFunctions, and named accordingly. """ # deal with index as a string vs index as a index/MultiIndex if isinstance(index, string_types): col_df_grouped = self.col_df.groupby(self.df[index]) else: self.col_df.index = pd.MultiIndex.from_arrays([self.df[i] for i in index]) col_df_grouped = self.col_df.groupby(level=index) self.col_df.index = self.df.index # perform the actual aggregation self.reduced_df = pd.DataFrame({ colred: col_df_grouped[colred.column].agg(colred.agg_func) for colred in self.column_reductions }) # then apply the functions to produce the final dataframe reduced_dfs = [] for cf in self.column_functions: # each apply_and_name() calls get_reduced() with the column reductions it wants reduced_dfs.append(cf.apply_and_name(self))
return pd.concat(reduced_dfs, axis=1)
[docs]class Fraction(ColumnFunction): """Divides all pairs of column reductions from two column functions. Example:: Fraction(Aggregate(['arrests','score'], 'sum'), Aggregate('score','mean')) This would create two columns: `arrests_sum_per_score_mean`, and `score_sum_per_score_mean`. """ def __init__(self, numerator, denominator, name='{numerator}_per_{denominator}', include_numerator=False, include_denominator=False, include_fraction=True): """ Args: numerator (ColumnFunction) denominator (ColumnFunction) name (list[str] or str): A list of strings of length denominator.names*numerator.names. Output columns will be named by this. Defaults to `numerator.name_per_denominator.name`. include_numerator (bool): If the unmodified numerator will be part of the output include_denominator (bool): If the unmodified denominator will be part of the output include_fraction (bool): If the division of all pairs of columns (pd.Series) from the numerator and denominator will be included in the output. """ self.numerator = numerator self.denominator = denominator self.include_numerator = include_numerator self.include_denominator = include_denominator self.include_fraction = include_fraction names = [] if include_fraction: if hasattr(name, '__iter__') and len(name) == \ len(numerator.names)*len(denominator.names): names.extend(name) elif isinstance(name, string_types): names.extend([name.format(numerator=n, denominator=d) for n, d in product(numerator.names, denominator.names)]) else: raise ValueError('Name must either be a list of names or a format string') if include_numerator: names.extend(numerator.names) if include_denominator: names.extend(denominator.names) column_reductions = [] # numerator first, then denominator if include_fraction: column_reductions += numerator.column_reductions + denominator.column_reductions else: if include_numerator: column_reductions += numerator.column_reductions if include_denominator: column_reductions += denominator.column_reductions ColumnFunction.__init__(self, column_reductions=column_reductions, names=names) def _apply(self, aggregator): """Returns a dataframe with the requested ColumnReductions. """ reduced_dfs = [] if self.include_fraction: n_df = self.numerator.apply_and_name(aggregator) d_df = self.denominator.apply_and_name(aggregator) reduced_dfs.extend([n_df[cn]/d_df[cd] for cn, cd in product(n_df.columns, d_df.columns)]) if self.include_numerator: reduced_dfs.append(self.numerator.apply_and_name(aggregator)) if self.include_denominator: reduced_dfs.append(self.denominator.apply_and_name(aggregator))
return pd.concat(reduced_dfs, axis=1)
[docs]class Count(Fraction): """Define various counts, sums, and proportions of Columns. Examples:: Count() # count the number of rows per grouped index Count('Arrests') # sum column 'Arrests' by grouped index Count('Arrests',prop=True) # as above, but also add a column that is normalized, i.e. where each group's sum is divided by the size of that group Count('Arrests',prop=True, prop_only=True) # as above, but exclude the raw sum Count('Arrests', prop=lambda x: x.score**2) # create a column with the sum of `Arrests` per grouped index, and also create a column with the sum of `Arrests` per grouped index divided by the sum of `score**2` per grouped index. Count(['Arrests','Stops']) # create both Count('Arrests') and Count('Stops') By default names are set similar to this example: 'count', 'Arrests_count', 'Arrests_prop', 'Arrests_prop_score', etc. """ def __init__(self, definition=None, name=None, prop=None, prop_only=False, prop_name=None, astype=None, prop_astype=None): """ Args: definition: List of (or single) column definition, as accepted by Column. `Count` will create ColumnFunctions corresponding to these definitions, using `sum` as the row-wise aggregation function. name: List of (or single) string for naming above definitions. If `None`, then the definitions' string representation will be used. prop: List of (or single) column definition, as accepted by Column. Defaults to None, in which case `Count` only creates the columns as defined in `definition`. If `prop=1` or `prop=True`, then `Count` also creates a column in which the sums from the column definitions are divided by the length of each group. `prop` can also be any column definition as accepted by `Column`. In that case, `Count` will divide the results of `definition` by the result `prop`. prop_name: List of strings (or single string) for naming the column definition from `prop`. Defaults to `None`, in which case the string representation of `prop` is used. astype: Pandas dtypes, or list thereof. If list, needs to be of same length as `definition`. `astype` will be passed to `ColumnDefinition` along with `definition`. prop_astype: Like `astype`, but for the `prop` definition. If no name is given and `definition` is `None` or 1, then the resulting column will simply be called 'count'. Otherwise, columns are named similar to 'Arrests_count', 'Arrests_prop', 'Arrests_prop_score', etc. """ if prop is None and prop_only: raise ValueError("Cannot calculate only the proportion when no prop") definition = np.float32(1) if definition is None else definition prop = np.float32(1) if prop is True else prop # if we do a vanilla count, just call it 'count' if definition == 1 and name is None: name = 'count' fname = False else: fname = 'count' denominator = Aggregate(prop, 'sum', prop_name, astype=prop_astype) if prop else None numerator = Aggregate(definition, 'sum', name, fname, astype=astype) if prop not in [None, 1]: fracnames = [n[:-5]+'per_'+d[:-4] for n, d in product(numerator.names, denominator.names)] else: fracnames = [n[:-5]+'prop' for n in numerator.names] Fraction.__init__(self, numerator=numerator, denominator=denominator, include_numerator=not prop_only, include_denominator=False, include_fraction=prop is not None,
name=fracnames)
[docs]class Proportion(Count): """Convenience wrapper for count. Example:: Proportion('Arrests','Inspections') Creates a column 'Arrests_prop_Inspections', which divides the sum of 'Arrests' per group by the sum of 'Inspections' per group. """ def __init__(self, definition, denom_def=True, name=None, denom_name=None, astype=None, denom_astype=None): Count.__init__(self, definition=definition, name=name, prop=denom_def, prop_only=True, prop_name=denom_name, astype=astype,
prop_astype=denom_astype)
[docs]def aggregate_list(l):
return list(np.concatenate(l.values))
[docs]def aggregate_set(l):
return set(np.concatenate(l.values))
[docs]def aggregate_counts(l): lists = [list(i) for i in l.values if len(i) > 0] if len(lists) == 0: return None else: ls = np.concatenate(lists)
return np.unique(ls, return_counts=True)
[docs]def days(date1, date2): """ returns a lambda that determines the number of days between the two dates the dates can be strings (column names) or actual dates e.g. Aggregate(days('date', today), ['min','max'], 'days_since') TODO: should there be a Days(AggregateBase) for this? handle naming """
return lambda df, date1=date1, date2=date2: _days(df, date1, date2) def _days(df, date1, date2): # when df is empty the subtraction below is broken if len(df) == 0: return pd.Series(dtype=np.float64, index=df.index) else: d1 = df[date1] if isinstance(date1, string_types) else date1 d2 = df[date2] if isinstance(date2, string_types) else date2 return (d2 - d1) / util.day
[docs]def date_min(d): """ groupby()['date_colum'].aggregate('min') returns a float? convert it back to a timestamp """
return pd.to_datetime(d.min())
[docs]def date_max(d):
return pd.to_datetime(d.max())