from .step import Step
from .aggregate import Aggregator
from . import util, data
from itertools import chain
import pandas as pd
import logging
[docs]class AggregationBase(Step):
"""
AggregationBase uses aggregate.Aggregator to aggregate data.
It can include aggregations over multiple indexes and multiple data
transformations (e.g. subsets). The combinations can be run in
parallel and can be returned disjoint or concatenated. Finally
the results may be pivoted and joined to other datasets.
"""
def __init__(self, insert_args, aggregator_args, concat_args,
parallel=False, prefix=None, inputs=None):
"""
Args:
insert_args: collection of argument names to insert
into results
aggregator_args: collection of argument names to pass to
get_aggregator
concat_args: collection of argument names on which to
concatenate results. Typically a subset (or equal
to) aggregator_args.
parallel: whether to distribute the aggregation over
many inputs. uses self._parallel_kwargs to determine how
to distribute.
prefix: used as a prefix for feature names by join()
"""
Step.__init__(self,
insert_args=insert_args,
concat_args=concat_args,
aggregator_args=aggregator_args,
prefix=prefix,
parallel=parallel,
inputs=inputs)
if parallel:
# create a new Aggregation according to parallel_kwargs
# pass our input to those steps
# those become the inputs to this step
pkwargs = self.get_arguments()
pkwargs.update(parallel=False)
self.inputs = []
for pk in self._parallel_kwargs:
pkwargs.update(pk)
a = self.__class__(**pkwargs)
self.inputs.append(a)
self._aggregators = {}
"""
arguments is a list of dictionaries of argument names and values.
it must include the special 'index' argument, whose values are
keys to plug into the self.indexes dictionary, whose values are
the actual index that is passed to Aggregator.aggregate()
"""
@property
def argument_names(self):
return list(util.union(map(set, self.arguments)))
[docs] def args_prefix(self, args):
prefix = '' if self.prefix is None else self.prefix + '_'
prefix += str.join('_', map(str, args)) + '_'
return prefix
# left join to the specified DataFrame
# left should contain the index of the concatenated agg in its columns
[docs] def join(self, left):
fillna_value = pd.Series()
concat_result = self.get_concat_result()
# TODO: is it more efficient to first collect indexes from concat
# then outer join all of the dfs
# then left join that to left?
for concat_args, df in concat_result.items():
# TODO: print warning if df.index.names is not a subset of left.columns
# and skip this df
logging.info('Joining %s %s' % (self.prefix, str(concat_args)))
data.prefix_columns(df, self.args_prefix(concat_args))
if not set(df.index.names).issubset(left.columns):
logging.info("Skipping join since aggregation index not in left: %s"
% df.index.names)
continue
left = left.merge(df, left_on=df.index.names,
right_index=True, how='left', copy=False)
fillna_value = fillna_value.append(self.fillna_value(
df=df,
left=left,
**{k: v for k, v in zip(self.concat_args, concat_args)}))
logging.info('Filling missing values')
left.fillna(fillna_value, inplace=True)
return left
[docs] def fillna_value(self, df, left, **concat_args):
"""
This method gives subclasses the opportunity to define how
join() fills missing values. Return value must be compatible with
DataFrame.fillna() value argument. Examples:
- return 0: replace missing values with zero
- return df.mean(): replace missing values with column mean
This default implimentation fills counts with zero.
TODO: identify counts more robustly instead of relying on column name
Typically fill other fields with mean but can't do that during the join
because that would leak information across a train/test split
"""
value = pd.Series(
0, index=[c for c in df.columns
if c.endswith('_count') and c.find('_per_') == -1])
return value
[docs] def select(self, df, args, inplace=False):
"""
After joining, selects a subset of arguments
df: the result of a call to self.join(left)
args: a collcetion of arguments to select, as accepted by drain.util.list_expand:
- a tuple corresponding to concat_args,
e.g. [('District', '12h'), ('Distict', '24h')]
- a dict to be exanded into the above,
e.g. {'District': ['12h', '24h']}
"""
if self.prefix is None:
raise ValueError('Cannot do selection on an Aggregation without a prefix')
# run list_expand and ensure all args to tuples for validation
args = [tuple(i) for i in util.list_expand(args)]
# check that the args passed are valid
for a in args:
has_arg = False
for argument in self.arguments:
if a == tuple(argument[k] for k in self.concat_args):
has_arg = True
break
if not has_arg:
raise ValueError('Invalid argument for selection: %s' % str(a))
df = data.select_features(
df, exclude=[self.prefix + '_.*'],
include=map(lambda a: self.args_prefix(a) + '.*', args), inplace=inplace)
return df
[docs] def run(self, *args, **kwargs):
if self.parallel:
# use tuple to avoid mapping to positional arguments by step.merge_results()
return tuple(chain(*args))
if not self.parallel:
dfs = []
for argument in self.arguments:
logging.info('Aggregating %s %s' % (self.prefix, argument))
aggregator = self._get_aggregator(**argument)
df = aggregator.aggregate(self.indexes[argument['index']])
logging.info('Aggregated %s: %s' % (argument, df.shape))
# insert insert_args
for k in argument:
if k in self.insert_args:
df[k] = argument[k]
df.set_index(self.insert_args, append=True, inplace=True)
dfs.append(df)
return tuple(dfs)
[docs] def load(self):
# overload load in order to restore result to a tuple
Step.load(self)
self.result = tuple(self.result)
[docs] def get_concat_result(self):
to_concat = {}
dfs = self.result
for argument, df in zip(self.arguments, dfs):
concat_args = tuple(argument[k] for k in self.concat_args)
if concat_args not in to_concat:
to_concat[concat_args] = [df]
else:
to_concat[concat_args].append(df)
dfs = {concat_args: pd.concat(dfs, copy=False)
for concat_args, dfs in to_concat.items()}
return dfs
def _get_aggregator(self, **kwargs):
args_tuple = (kwargs[k] for k in self.aggregator_args)
if args_tuple in self._aggregators:
return self._aggregators[args_tuple]
else:
aggregator = self.get_aggregator(
**util.dict_subset(kwargs, self.aggregator_args))
self._aggregators[args_tuple] = aggregator
return aggregator
[docs] def get_aggregator(self, **kwargs):
"""
Given the arguments, return an aggregator
This method exists to allow subclasses to use Aggregator objects
efficiently, i.e. only apply AggregateSeries once per set of
Aggregates. If the set of Aggregates depends on some or none of
the arguments the subclass need not recreate Aggregators
"""
raise NotImplementedError
[docs]class AggregationJoin(Step):
"""
first input is left and second input is aggregation
if left step returned a dict, use MapResults to clarify e.g.:
mapping=[{'aux': None}]
"""
def __init__(self, inputs, **kwargs):
Step.__init__(self, inputs=inputs, **kwargs)
[docs] def run(self, aggregations, left):
# aggregations = iter(self.inputs)
# next(aggregations) # first input is left, not aggregation
# for aggregation in aggregations:
left_columns = list(left.columns)
left = self.inputs[0].join(left)
left = left.drop(left_columns, axis=1)
return left
[docs]class SpacetimeAggregationJoin(AggregationJoin):
"""
Specify a temporal lag between the aggregations and left
Useful for simulating a delay in receipt of aggregation data sources
"""
def __init__(self, inputs, lag=None, **kwargs):
AggregationJoin.__init__(self, lag=lag, inputs=inputs, **kwargs)
[docs] def run(self, aggregations, left):
if self.lag is not None:
delta = data.parse_delta(self.lag)
for a in aggregations:
a.reset_index(level='date', inplace=True)
a.date = a.date.apply(lambda d: d + delta)
a.set_index('date', append=True, inplace=True)
return AggregationJoin.run(self, aggregations, left)
[docs]class SimpleAggregation(AggregationBase):
"""
A simple AggreationBase subclass with a single aggregrator
The only argument is the index
An implementation need only define an aggregates attributes, see
test_aggregation.SimpleCrimeAggregation for an example.
"""
def __init__(self, inputs, indexes, prefix=None, parallel=False):
# if indexes was not a dict but a list, make it a dict
if not isinstance(indexes, dict):
indexes = {index: index for index in indexes}
self.indexes = indexes
self.inputs = inputs
AggregationBase.__init__(self, insert_args=[], concat_args=['index'],
aggregator_args=[], parallel=parallel, prefix=prefix)
[docs] def get_aggregator(self, **kwargs):
return Aggregator(self.inputs[0].result, self.aggregates)
@property
def _parallel_kwargs(self):
"""
Returns: a list of kwargs for each parallel input
"""
return [{'indexes': {name: index}}
for name, index in self.indexes.items()]
@property
def arguments(self):
return [{'index': name} for name in self.indexes]
[docs]class SpacetimeAggregation(AggregationBase):
"""
SpacetimeAggregation is an Aggregation over space and time.
Specifically, the index is a spatial index and an additional date and delta argument select
a subset of the data to aggregate.
We assume that the index and deltas are independent of the date,
so every date is aggregated to all spacedeltas
By default the aggregator_args are date and delta (i.e. independent of aggregation index).
To change that, pass aggregator_args=['date', 'delta', 'index'] and override get_aggregator
to accept an index argument.
Note that dates should be datetime.datetime, not numpy.datetime64, for yaml serialization
and to work with dateutil.relativedelta.
However since pandas automatically turns a datetime column in the index into datetime64
DatetimeIndex, the left dataframe passed to join() should use datetime64!
See test_aggregation.SpacetimeCrimeAggregation for an example.
"""
def __init__(self, spacedeltas, dates, date_column, parallel=False, max_date_column=None,
censor_columns=None, aggregator_args=None, concat_args=None,
inputs=None, prefix=None):
if aggregator_args is None:
aggregator_args = ['date', 'delta']
if concat_args is None:
concat_args = ['index', 'delta']
self.censor_columns = censor_columns if censor_columns is not None else {}
self.date_column = date_column
self.max_date_column = max_date_column
self.dates = dates
self.spacedeltas = spacedeltas
"""
spacedeltas is a dict of the form {name: (index, deltas)}
where deltas is an array of delta strings
dates are end dates for the aggregators
"""
AggregationBase.__init__(self,
insert_args=['date'],
aggregator_args=aggregator_args,
concat_args=concat_args,
prefix=prefix,
parallel=parallel,
inputs=inputs)
@property
def indexes(self):
return {name: value[0] for name, value in self.spacedeltas.items()}
@property
def arguments(self):
names = list(self.spacedeltas.keys())
names.sort()
a = []
for date in self.dates:
for name in names:
for delta in self.spacedeltas[name][1]:
a.append({'date': date, 'delta': delta, 'index': name})
return a
@property
def _parallel_kwargs(self):
return [{'spacedeltas': self.spacedeltas, 'dates': [date]} for date in self.dates]
[docs] def join(self, left):
# this check doesn't work with lag!
# TODO: fix by moving Aggregation.join() code to AggregationJoin.sun()
# difference = set(pd.to_datetime(left.date.unique()))\
# .difference(pd.to_datetime(self.dates))
# if len(difference) > 0:
# raise ValueError('Left contains unaggregated dates: %s' % difference)
return AggregationBase.join(self, left)
[docs] def get_aggregator(self, date, delta):
df = self.get_data(date, delta)
aggregator = Aggregator(df, self.get_aggregates(date, delta))
return aggregator
[docs] def get_data(self, date, delta):
df = self.inputs[0].result
df = data.date_select(df, self.date_column, date, delta, self.max_date_column)
df = data.date_censor(df.copy(), self.censor_columns, date)
return df
[docs] def get_aggregates(self, date, delta):
raise NotImplementedError