import re
import os
from . import util
import logging
from copy import deepcopy
import pandas as pd
from six import string_types
import numpy as np
from numpy import random
from dateutil.relativedelta import relativedelta
import collections
from sklearn import datasets
from sklearn.utils.validation import _assert_all_finite
from .step import Step, MapResults
[docs]class Column(object):
"""Defines a new or existing column that can be calculated from a dataframe.
Column accepts as ``definition`` a string that refers to an
existing column, or a lambda function that returns a pd.Series,
or a constant, in which case it creates a pd.Series of that constant.
Columns are hashed based on their definition and type.
"""
def __init__(self, definition, astype=None):
"""Args:
definition ({str, function, constant}): Specifies a
new column (that is, a pd.Series) from a dataframe.
It can be a function, an existing column name, or a
constant (that will be replicated along rows).
astype (Pandas dtype): A Pandas datatype to which the
resulting pd.Series will be converted to.
"""
self.definition = definition
self.astype = astype
[docs] def apply(self, df):
"""Takes a pd.DataFrame and returns the newly defined column, i.e.
a pd.Series that has the same index as `df`.
"""
if hasattr(self.definition, '__call__'):
r = self.definition(df)
elif self.definition in df.columns:
r = df[self.definition]
elif not isinstance(self.definition, string_types):
r = pd.Series(self.definition, index=df.index)
else:
raise ValueError("Invalid column definition: %s" % str(self.definition))
return r.astype(self.astype) if self.astype else r
def __hash__(self):
return hash((self.definition, self.astype))
def __eq__(self, other):
return hash(self) == hash(other)
[docs]class ClassificationData(Step):
[docs] def run(self):
X, y = datasets.make_classification(
**self.get_arguments(inputs=False, dependencies=False))
X, y = pd.DataFrame(X), pd.Series(y)
train = np.zeros(len(X), dtype=bool)
train[random.choice(len(X), int(len(X)/2))] = True
train = pd.Series(train)
return {'X': X, 'y': y, 'train': train, 'test': ~train}
[docs]class CreateEngine(Step):
return util.create_engine()
[docs]class CreateDatabase(Step):
return util.create_db()
[docs]class FromSQL(Step):
def __init__(self, query=None, to_str=None, table=None,
tables=None, inputs=None, auto_parse_dates=True,
**read_sql_kwargs):
"""
Use tables to automatically set dependecies
"""
if query is None:
if table is None:
raise ValueError("Must specify query or table")
query = "SELECT * FROM %s" % table
tables = [table]
if to_str is None:
to_str = []
if inputs is None:
self.inputs = [CreateEngine()]
Step.__init__(self, query=query,
to_str=to_str,
inputs=inputs,
auto_parse_dates=auto_parse_dates,
read_sql_kwargs=read_sql_kwargs)
if tables is not None and 'SQL_DIR' in os.environ:
self.dependencies = [os.path.join(
os.environ['SQL_DIR'], t.replace('.', '/'))
for t in tables]
[docs] def run(self, engine):
df = pd.read_sql(self.query, engine, **self.read_sql_kwargs)
for column in self.to_str:
if column in df.columns:
df[column] = df[column].astype(str)
if self.auto_parse_dates:
util.parse_dates(df, errors='coerce', inplace=True)
return df
[docs]class ToSQL(Step):
"""
Step for util.PgSQLDatabase.to_sql()
inputs:
df is the DataFrame to import
db is an instance of PgSQLDatabase
defaults to CreateDatabase()
TODO: once drain Steps have outputs,
include psql/schema/name
"""
def __init__(self, table_name, **kwargs):
"""
Args:
table_name: a hack because name is a special kwarg currently
TODO: use name once refactor/init is merged
"""
Step.__init__(self, table_name=table_name, **kwargs)
if len(self.inputs) == 1:
self.inputs = self.inputs + [MapResults([CreateDatabase()], 'db')]
[docs] def run(self, df, db):
kwargs = self.get_arguments(inputs=False)
kwargs['name'] = kwargs.pop('table_name')
db.to_sql(df, **kwargs)
[docs]class Merge(Step):
[docs] def run(self, *dfs):
df = dfs[0]
for d in dfs[1:]:
df = df.merge(d, **self.get_arguments(inputs=False))
return df
[docs]class ToHDF(Step):
"""
write DataFrames to an HDF store
pass put_arguments (format, mode, data_columns, etc.) to init
pass DataFrames by name via inputs
"""
def __init__(self, objects_to_ascii=False, **kwargs):
Step.__init__(self, objects_to_ascii=objects_to_ascii, **kwargs)
self._target = True
[docs] def run(self, **kwargs):
store = pd.HDFStore(os.path.join(self._dump_dirname, 'result.h5'))
for key, df in kwargs.items():
if self.objects_to_ascii:
for c, dtype in df.dtypes.items():
if dtype == object:
df[c] = df[c].str.encode("ascii", "ignore")
logging.info('Writing %s %s' % (key, str(df.shape)))
args = self.get_arguments().get('put_args', {}).get(key, {})
store.put(key, df, mode='w', **deepcopy(args))
return store
return
self.result = pd.HDFStore(os.path.join(self._dump_dirname, 'result.h5'), mode='r')
[docs]def prefix_columns(df, prefix, ignore=[]):
df.columns = [prefix + c if c not in ignore else c for c in df.columns]
[docs]def expand_dates(df, columns=[]):
"""
generate year, month, day features from specified date features
"""
columns = df.columns.intersection(columns)
df2 = df.reindex(columns=set(df.columns).difference(columns))
for column in columns:
df2[column + '_year'] = df[column].apply(lambda x: x.year)
df2[column + '_month'] = df[column].apply(lambda x: x.month)
df2[column + '_day'] = df[column].apply(lambda x: x.day)
return df2
[docs]def binarize(df, category_classes, all_classes=True, drop=True,
astype=None, inplace=True, min_freq=None):
"""
Binarize specified categoricals. Works inplace!
Args:
- df: the DataFrame whose columns to binarize
- category_classes: either a dict of (column : [class1, class2, ...]) pairs
or a collection of column names, in which case classes are
given using df[column].unique()
- all_classes: when False, the last class is skipped
- drop: when True, the original categorical columns are dropped
- astype: a type for the resulting binaries, e.g. np.float32.
When None, use the defualt (bool).
- inplace: whether to modify the DataFrame inplace
Returns:
the DataFrame with binarized columns
"""
if type(category_classes) is not dict:
columns = set(category_classes)
category_classes = {column: df[column].unique() for column in columns}
else:
columns = category_classes.keys()
df_new = df if inplace else df.drop(columns, axis=1)
for category in columns:
classes = category_classes[category]
for i in range(len(classes)-1 if not all_classes else len(classes)):
c = df[category] == classes[i]
if not min_freq or c.sum() >= min_freq:
if astype is not None:
c = c.astype(astype)
df_new['%s_%s' % (category, str(classes[i]).replace(' ', '_'))] = c
if drop and inplace:
df_new.drop(columns, axis=1, inplace=True)
return df_new
[docs]def binarize_sets(df, columns, cast=False, drop=True, min_freq=None):
"""
Create dummies for the elements of a set-valued column. Operates in place.
Args:
df: data frame
columns: either a dictionary of column: values pairs or a collection of columns.
cast: whether or not to cast values to set
drop: whether or not to drop the binarized columns
TODO: make interface same as binarize(). merge the two?
"""
for column in columns:
d = df[column].dropna() # avoid nulls
if cast:
d = d.apply(set)
values = columns[column] if isinstance(columns, dict) else util.union(d)
for value in values:
name = values[value] if type(values) is dict else str(value)
column_name = column + '_' + name.replace(' ', '_')
series = d.apply(lambda c: value in c)
series.fillna(0, inplace=True)
if not min_freq or series.sum() >= min_freq:
df[column_name] = series
if drop:
# list(columns) will return keys if columns was dict
df.drop(list(columns), axis=1, inplace=True)
return df
[docs]def counts_to_dicts(df, column):
"""
convert (values, counts) as returned by aggregate.aggregate_counts() to dicts
makes expand_counts much faster
"""
# index where there are counts and they aren't null
d = df[column].apply(lambda c: pd.notnull(c) and len(c[0]) > 0)
return df.loc[d, column].apply(lambda c: {k: v for k, v in zip(*c)})
[docs]def expand_counts(df, column, values=None):
"""
expand a column containing value:count dictionaries
"""
d = counts_to_dicts(df, column)
if len(d) > 0:
if values is None:
values = set(np.concatenate(d.apply(lambda c: c.keys()).values))
for value in values:
name = values[value] if type(values) is dict else str(value)
df[column + '_' + name.replace(' ', '_')] =\
d.apply(lambda c: c[value] if value in c else 0)
df.drop(column, axis=1, inplace=True)
[docs]def binarize_clusters(df, column, n_clusters, train=None):
series = df[column]
series = series.dropna()
from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters)
series = pd.DataFrame(series)
kmeans.fit(series[train] if train is not None else series)
clusters = kmeans.cluster_centers_[:, 0].astype(int)
df[column + '_cluster'] =\
pd.Series(kmeans.predict(series), index=series.index).apply(lambda d: clusters[d])
# use all_classes to handle nulls
binarize(df, {column + '_cluster': clusters}, all_classes=True)
return df
[docs]def train_test_subset(df, train, test, drop=False):
"""
narrows df to train | test
then narrows train and test to that
"""
if drop:
df.drop(df.index[~(train | test)], inplace=True)
else:
df = df[(train | test)]
train = train.loc[df.index]
test = test.loc[df.index]
return df, train, test
[docs]def normalize(X, train=None):
Xfit = X[train] if train is not None else X
sigma = Xfit.std(ddof=0)
sigma.loc[sigma == 0] = 1
mu = Xfit.mean()
X = (X - mu) / sigma
return X
[docs]class Normalize(Step):
[docs] def run(self, X, train=None):
return normalize(X, train=train)
[docs]def impute(X, value=None, train=None, dropna=True, inplace=True):
"""
Performs mean imputation on a pandas dataframe.
Args:
train: an optional training mask with which to compute the mean
value: instead of computing the mean, use this as the value argument to fillna
dropna: whether to drop all null columns
inplace: whether to perform the imputation inplace
Returns: the imputed DataFrame
"""
if value is None:
Xfit = X[train] if train is not None else X
value = Xfit.mean()
else:
if train is not None:
raise ValueError("Cannot pass both train and value arguments")
if dropna:
null_columns = value.index[value.isnull()]
if len(null_columns) > 0:
logging.info('Dropping null columns: \n\t%s' % null_columns)
if inplace:
X.drop(null_columns, axis=1, inplace=True)
else:
X = X.drop(null_columns, axis=1, inplace=False)
if inplace:
X.fillna(value.dropna(), inplace=True)
else:
X = X.fillna(value.dropna(), inplace=False)
return X
[docs]def select_regexes(strings, regexes):
"""
select subset of strings matching a regex
treats strings as a set
"""
strings = set(strings)
select = set()
if isinstance(strings, collections.Iterable):
for r in regexes:
s = set(filter(re.compile('^' + r + '$').search, strings))
strings -= s
select |= s
return select
else:
raise ValueError("exclude should be iterable")
[docs]def exclude_regexes(strings, exclude, include=None):
e = select_regexes(strings, exclude)
i = select_regexes(strings, include) if include is not None else set()
return set(strings).difference(e).union(i)
[docs]def select_features(df, exclude, include=None, inplace=False):
include = exclude_regexes(strings=df.columns, exclude=exclude, include=include)
exclude = df.columns.difference(include)
df2 = df.drop(exclude, axis=1, inplace=inplace)
return df if inplace else df2
[docs]def null_columns(df, train=None):
if train is not None:
df = df[train]
nulcols = df.isnull().sum() > 0
return nulcols[nulcols].index
[docs]def infinite_columns(df):
columns = []
for c in df.columns:
try:
_assert_all_finite(df[c])
except(ValueError):
columns.append(c)
return columns
[docs]def non_numeric_columns(df):
columns = []
for c in df.columns:
try:
df[c].astype(float)
except(ValueError):
columns.append(c)
return columns
[docs]def date_censor_sql(date_column, today, column=None):
"""
if today is None, then no censoring
otherwise replace each column with:
CASE WHEN {date_column} < '{today}' THEN {column} ELSE null END
"""
if column is None:
column = date_column
if today is None:
return column
else:
return "(CASE WHEN {date_column} < '{today}' THEN {column} ELSE null END)".format(
date_column=date_column, today=today, column=column)
# group 1 is the table name, group 2 is the query whose result is the table
extract_sql_regex = r'CREATE\s+TABLE\s+([^(\s]*)\s+AS\s*\(([^;]*)\);'
[docs]def revise_helper(query):
"""
given sql containing a "CREATE TABLE {table_name} AS ({query})"
returns table_name, query
"""
match = re.search(extract_sql_regex, query, re.DOTALL | re.I)
return match.group(1), match.group(2)
[docs]def revise_sql(query, id_column, output_table, max_date_column,
min_date_column, date_column, date, source_id_column=None):
"""
Given an expensive query that aggregates temporal data,
Revise the results to censor before a particular date
"""
if source_id_column is None:
source_id_column = id_column
if hasattr(id_column, '__iter__'):
id_column = str.join(', ', id_column)
if hasattr(source_id_column, '__iter__'):
source_id_column = str.join(', ', source_id_column)
sql_vars = dict(query=query, id_column=id_column, output_table=output_table,
max_date_column=max_date_column, min_date_column=min_date_column,
date_column=date_column, date=date, source_id_column=source_id_column)
sql_vars['ids_query'] = """
SELECT {id_column} FROM {output_table}
WHERE {max_date_column} >= '{date}' AND {min_date_column} < '{date}'""" .format(**sql_vars)
sql_vars['revised_query'] = query.replace(
'1=1',
"(({source_id_column}) in (select * from ids_query) and {date_column} < '{date}')"
.format(**sql_vars))
new_query = """
with ids_query as ({ids_query})
select * from ({revised_query}) t
""".format(**sql_vars)
return new_query
[docs]class Revise(Step):
def __init__(self, sql, id_column, max_date_column, min_date_column,
date_column, date, from_sql_args=None, source_id_column=None, **kwargs):
"""
revise a query to the specified date
sql: a path to a file or a string containing sql
id_column: the entity id column(s) linking the query result with its source tables
max_date_column: the maximum date column name for an entry in the result
min_date_column: the minimum date column name for an entry in the result
date_column: name of the date column in the source
date: the date to revise at
from_sql_args: dictionary of keyword arguments to pass input FromSQL steps,
e.g. target=True, parse_dates
"""
Step.__init__(self, sql=sql, id_column=id_column,
max_date_column=max_date_column, min_date_column=min_date_column,
date_column=date_column, date=date, source_id_column=source_id_column,
from_sql_args=from_sql_args, **kwargs)
if os.path.exists(sql):
self.dependencies = [os.path.abspath(sql)]
sql = util.read_file(sql)
table, query = revise_helper(sql)
revised_sql = revise_sql(
query=query, id_column=id_column, output_table=table,
max_date_column=max_date_column, min_date_column=min_date_column,
date_column=date_column, date=date, source_id_column=source_id_column)
if from_sql_args is None:
from_sql_args = {}
self.inputs = [MapResults(
# by depending on table, revised query is given the right dependencies
[FromSQL(table=table, **from_sql_args),
FromSQL(revised_sql, tables=[table], **from_sql_args)],
mapping=['source', 'revised'])]
[docs] def run(self, source, revised):
subset = (source[self.min_date_column] < self.date) &\
(source[self.max_date_column] < self.date)
# revsied might have bad dtypes because it's small
# and pandas type inference isn't great
# if so, convert to source dtypes
if not revised.dtypes.equals(source.dtypes):
revised = revised.astype(source.dtypes.to_dict())
return pd.concat((source[subset], revised), copy=False)
[docs]def date_select(df, date_column, date, delta, max_date_column=None):
"""
given a series an end date and number of days, return subset in the date range
if delta is None then there is no starting date
if max_date_column is specified then look for rows where the interval
[date_column, max_date_column] intersects [date-delta, date+delta)
"""
delta = parse_delta(delta)
if delta:
start_date = date - delta
if not max_date_column:
df = df.query("%s < '%s'" % (date_column, date))
if delta:
df = df.query("%s >= '%s'" % (date_column, start_date))
else:
# event not entirely after
df = df.query("not ({min} >= '{end}' and {max} >= '{end}')".format(
min=date_column, max=max_date_column, end=date))
if delta:
# event not entirely before
df = df.query("not ({min} < '{start}' and {max} < '{start}')".format(
min=date_column, max=max_date_column, start=start_date))
return df
[docs]def date_censor(df, date_columns, date):
"""
a dictionary of date_column: [dependent_column1, ...] pairs
censor the dependent columns when the date column is before the given end_date
then censor the date column itself
"""
for date_column, censor_columns in date_columns.items():
for censor_column in censor_columns:
df[censor_column] = df[censor_column].where(df[date_column] < date)
df[date_column] = df[date_column].where(df[date_column] < date)
return df
delta_chars = {
'y': 'years', 'm': 'months', 'w': 'weeks', 'd': 'days', 'h': 'hours',
'M': 'minutes', 's': 'seconds', 'u': 'microseconds'
}
delta_regex = re.compile('^([0-9]+)(u|s|M|h|d|m|y)$')
[docs]def parse_delta(s):
"""
parse a string to a delta
'all' is represented by None
"""
if s == 'all':
return None
else:
ls = delta_regex.findall(s)
if len(ls) == 1:
return relativedelta(**{delta_chars[ls[0][1]]: int(ls[0][0])})
else:
raise ValueError('Invalid delta string: %s' % s)