drain package

Submodules

drain.aggregate module

Classes that facilitate transformation and aggregation of dataframes.

This module provides wrappers to simplify grouping and aggregation of dataframes by index, and to consequently perform arithmetics on the aggregated dataframes.

Examples:

The following code would take a Pandas dataframe df, and produce a dataframe res_df that is indexed by name. The result would have a column count that gives the number of rows per name; a column that gives the sum of score per name, divided by the sum of arrests per name; and finally two columns that give the sum of score per name, and the sum of squared score per name:

aggregates = [Count(),
              Proportion('score','arrests'),
              Aggregate(['score', lambda x: x.score**2],'sum')
              ]
res_df = Aggregator(df, aggregates).aggregate('name')

Note how we specify all definitions before performing the actual aggregation, and how Aggregator.aggregate() then takes an index to group by.

Aggregator also caches individual transformations of columns, as to reduce redundant calculations.

Classes that endusers interface with are Aggregate, Fraction, Count, and Proportion (all of which specify outcome columns and row-wise aggregation functions), and Aggregator (which takes an input dataframe and an index by which rows are being grouped).

class drain.aggregate.Aggregate(column_def, agg_func, name=None, fname=None, astype=None)[source]

Bases: drain.aggregate.ColumnIdentity

A highly convenient wrapper around ColumnReductions.

Example::
Aggregate([‘arrests’,’income’,’age’], [‘min’,’max’,’mean’])

This would create 3x3 columns: arrests_min, arrests_max, arrests_mean, and so on.

class drain.aggregate.Aggregator(df, column_functions)[source]

Bases: object

Binds column functions to a dataframe and allows for aggregation by a given index.

aggregate(index)[source]

Performs a groupby of the unique Columns by index, as constructed from self.df.

Args:
index (str, or pd.Index): Index or column name of self.df.
Returns:
pd.DataFrame: A dataframe, aggregated by index, that contains the result
of the various ColumnFunctions, and named accordingly.
get_reduced(column_reductions)[source]

This function gets called by ColumnFunction._apply(). After a ColumnFunction has been passed to Aggregator’s constructor, the ColumnFunction can use this function to request the populated, aggregated columns that correspond to its ColumnReductions.

Args:
column_reduction (list[ColumnReduction])
Returns:
pd.DataFrame: A dataframe, where the column names are ColumnReductions.
class drain.aggregate.ColumnFunction(column_reductions, names)[source]

Bases: object

Abstract base class for functions on reduced Columns; names the outcomes.

Having obtained Columns that have been created and aggregated along rows, with the same index - ColumnReductions, in other words - we might want to perform arithmetics on these ColumnReductions, such as element-wise division of one by the other. These transformations are handled by ColumnFunctions.

Note: ColumnFunction guarantees an apply_and_name(aggregator). This is the
lowest-level function in this module that actually returns ‘populated’ DataFrames (as provided by the aggregator). Children of this class thus include functions on pairs of popoulated dataframes, such as division and addition.
apply_and_name(aggregator)[source]

Fetches the row-aggregated input columns for this ColumnFunction.

Args:
aggregator (Aggregator)
Returns:
pd.DataFrame: The dataframe has columns with names self.names
that were created by this ColumnFunction, and is indexed by the index that was passed to aggregator.aggregate(index).
class drain.aggregate.ColumnIdentity(column_reductions, names)[source]

Bases: drain.aggregate.ColumnFunction

The simplest non-abstract ColumnFunction.

class drain.aggregate.ColumnReduction(column, agg_func)[source]

Bases: object

Wraps and hashes a Column together with a function that aggregates across rows.

class drain.aggregate.Count(definition=None, name=None, prop=None, prop_only=False, prop_name=None, astype=None, prop_astype=None)[source]

Bases: drain.aggregate.Fraction

Define various counts, sums, and proportions of Columns.

Examples::

Count() # count the number of rows per grouped index Count(‘Arrests’) # sum column ‘Arrests’ by grouped index Count(‘Arrests’,prop=True) # as above, but also add a column that is normalized,

i.e. where each group’s sum is divided by the size of that group

Count(‘Arrests’,prop=True, prop_only=True) # as above, but exclude the raw sum Count(‘Arrests’, prop=lambda x: x.score**2) # create a column with the sum of Arrests

per grouped index, and also create a column with the sum of Arrests per grouped index divided by the sum of score**2 per grouped index.

Count([‘Arrests’,’Stops’]) # create both Count(‘Arrests’) and Count(‘Stops’)

By default names are set similar to this example: ‘count’, ‘Arrests_count’, ‘Arrests_prop’, ‘Arrests_prop_score’, etc.

class drain.aggregate.Fraction(numerator, denominator, name='{numerator}_per_{denominator}', include_numerator=False, include_denominator=False, include_fraction=True)[source]

Bases: drain.aggregate.ColumnFunction

Divides all pairs of column reductions from two column functions.

Example::
Fraction(Aggregate([‘arrests’,’score’], ‘sum’), Aggregate(‘score’,’mean’))

This would create two columns: arrests_sum_per_score_mean, and score_sum_per_score_mean.

class drain.aggregate.Proportion(definition, denom_def=True, name=None, denom_name=None, astype=None, denom_astype=None)[source]

Bases: drain.aggregate.Count

Convenience wrapper for count.

Example::
Proportion(‘Arrests’,’Inspections’)

Creates a column ‘Arrests_prop_Inspections’, which divides the sum of ‘Arrests’ per group by the sum of ‘Inspections’ per group.

drain.aggregate.aggregate_counts(l)[source]
drain.aggregate.aggregate_list(l)[source]
drain.aggregate.aggregate_set(l)[source]
drain.aggregate.date_max(d)[source]
drain.aggregate.date_min(d)[source]

groupby()[‘date_colum’].aggregate(‘min’) returns a float? convert it back to a timestamp

drain.aggregate.days(date1, date2)[source]

returns a lambda that determines the number of days between the two dates the dates can be strings (column names) or actual dates e.g. Aggregate(days(‘date’, today), [‘min’,’max’], ‘days_since’) TODO: should there be a Days(AggregateBase) for this? handle naming

drain.aggregation module

class drain.aggregation.AggregationBase(insert_args, aggregator_args, concat_args, parallel=False, prefix=None, inputs=None)[source]

Bases: drain.step.Step

AggregationBase uses aggregate.Aggregator to aggregate data. It can include aggregations over multiple indexes and multiple data transformations (e.g. subsets). The combinations can be run in parallel and can be returned disjoint or concatenated. Finally the results may be pivoted and joined to other datasets.

args_prefix(args)[source]
argument_names
fillna_value(df, left, **concat_args)[source]

This method gives subclasses the opportunity to define how join() fills missing values. Return value must be compatible with DataFrame.fillna() value argument. Examples:

  • return 0: replace missing values with zero
  • return df.mean(): replace missing values with column mean

This default implimentation fills counts with zero. TODO: identify counts more robustly instead of relying on column name

Typically fill other fields with mean but can’t do that during the join
because that would leak information across a train/test split
get_aggregator(**kwargs)[source]

Given the arguments, return an aggregator

This method exists to allow subclasses to use Aggregator objects efficiently, i.e. only apply AggregateSeries once per set of Aggregates. If the set of Aggregates depends on some or none of the arguments the subclass need not recreate Aggregators

get_concat_result()[source]
join(left)[source]
load()[source]

Load this step’s result from its dump directory

run(*args, **kwargs)[source]
select(df, args, inplace=False)[source]

After joining, selects a subset of arguments df: the result of a call to self.join(left) args: a collcetion of arguments to select, as accepted by drain.util.list_expand:

  • a tuple corresponding to concat_args,
    e.g. [(‘District’, ‘12h’), (‘Distict’, ‘24h’)]
  • a dict to be exanded into the above,
    e.g. {‘District’: [‘12h’, ‘24h’]}
class drain.aggregation.AggregationJoin(inputs, **kwargs)[source]

Bases: drain.step.Step

first input is left and second input is aggregation if left step returned a dict, use MapResults to clarify e.g.:

mapping=[{‘aux’: None}]
run(aggregations, left)[source]
class drain.aggregation.SimpleAggregation(inputs, indexes, prefix=None, parallel=False)[source]

Bases: drain.aggregation.AggregationBase

A simple AggreationBase subclass with a single aggregrator The only argument is the index An implementation need only define an aggregates attributes, see test_aggregation.SimpleCrimeAggregation for an example.

arguments
get_aggregator(**kwargs)[source]

Given the arguments, return an aggregator

This method exists to allow subclasses to use Aggregator objects efficiently, i.e. only apply AggregateSeries once per set of Aggregates. If the set of Aggregates depends on some or none of the arguments the subclass need not recreate Aggregators

class drain.aggregation.SpacetimeAggregation(spacedeltas, dates, date_column, parallel=False, max_date_column=None, censor_columns=None, aggregator_args=None, concat_args=None, inputs=None, prefix=None)[source]

Bases: drain.aggregation.AggregationBase

SpacetimeAggregation is an Aggregation over space and time. Specifically, the index is a spatial index and an additional date and delta argument select

a subset of the data to aggregate.
We assume that the index and deltas are independent of the date,
so every date is aggregated to all spacedeltas

By default the aggregator_args are date and delta (i.e. independent of aggregation index). To change that, pass aggregator_args=[‘date’, ‘delta’, ‘index’] and override get_aggregator

to accept an index argument.
Note that dates should be datetime.datetime, not numpy.datetime64, for yaml serialization
and to work with dateutil.relativedelta.
However since pandas automatically turns a datetime column in the index into datetime64
DatetimeIndex, the left dataframe passed to join() should use datetime64!

See test_aggregation.SpacetimeCrimeAggregation for an example.

arguments
get_aggregates(date, delta)[source]
get_aggregator(date, delta)[source]

Given the arguments, return an aggregator

This method exists to allow subclasses to use Aggregator objects efficiently, i.e. only apply AggregateSeries once per set of Aggregates. If the set of Aggregates depends on some or none of the arguments the subclass need not recreate Aggregators

get_data(date, delta)[source]
indexes
join(left)[source]
spacedeltas = None
spacedeltas is a dict of the form {name: (index, deltas)}
where deltas is an array of delta strings

dates are end dates for the aggregators

class drain.aggregation.SpacetimeAggregationJoin(inputs, lag=None, **kwargs)[source]

Bases: drain.aggregation.AggregationJoin

Specify a temporal lag between the aggregations and left Useful for simulating a delay in receipt of aggregation data sources

run(aggregations, left)[source]

drain.data module

class drain.data.ClassificationData(inputs=None, dependencies=None, **kwargs)[source]

Bases: drain.step.Step

run()[source]
class drain.data.Column(definition, astype=None)[source]

Bases: object

Defines a new or existing column that can be calculated from a dataframe.

Column accepts as definition a string that refers to an existing column, or a lambda function that returns a pd.Series, or a constant, in which case it creates a pd.Series of that constant.

Columns are hashed based on their definition and type.

apply(df)[source]

Takes a pd.DataFrame and returns the newly defined column, i.e. a pd.Series that has the same index as df.

class drain.data.CreateDatabase(inputs=None, dependencies=None, **kwargs)[source]

Bases: drain.step.Step

run()[source]
class drain.data.CreateEngine(inputs=None, dependencies=None, **kwargs)[source]

Bases: drain.step.Step

run()[source]
class drain.data.FromSQL(query=None, to_str=None, table=None, tables=None, inputs=None, auto_parse_dates=True, **read_sql_kwargs)[source]

Bases: drain.step.Step

run(engine)[source]
class drain.data.Merge(inputs=None, dependencies=None, **kwargs)[source]

Bases: drain.step.Step

run(*dfs)[source]
class drain.data.Normalize(inputs=None, dependencies=None, **kwargs)[source]

Bases: drain.step.Step

run(X, train=None)[source]
class drain.data.Revise(sql, id_column, max_date_column, min_date_column, date_column, date, from_sql_args=None, source_id_column=None, **kwargs)[source]

Bases: drain.step.Step

run(source, revised)[source]
class drain.data.ToHDF(objects_to_ascii=False, **kwargs)[source]

Bases: drain.step.Step

write DataFrames to an HDF store pass put_arguments (format, mode, data_columns, etc.) to init pass DataFrames by name via inputs

dump()[source]
load()[source]

Load this step’s result from its dump directory

run(**kwargs)[source]
class drain.data.ToSQL(table_name, **kwargs)[source]

Bases: drain.step.Step

Step for util.PgSQLDatabase.to_sql() inputs:

df is the DataFrame to import db is an instance of PgSQLDatabase

defaults to CreateDatabase()
TODO: once drain Steps have outputs,
include psql/schema/name
run(df, db)[source]
drain.data.binarize(df, category_classes, all_classes=True, drop=True, astype=None, inplace=True, min_freq=None)[source]

Binarize specified categoricals. Works inplace!

Args:
  • df: the DataFrame whose columns to binarize
  • category_classes: either a dict of (column : [class1, class2, …]) pairs
    or a collection of column names, in which case classes are given using df[column].unique()
  • all_classes: when False, the last class is skipped
  • drop: when True, the original categorical columns are dropped
  • astype: a type for the resulting binaries, e.g. np.float32.
    When None, use the defualt (bool).
  • inplace: whether to modify the DataFrame inplace
Returns:
the DataFrame with binarized columns
drain.data.binarize_clusters(df, column, n_clusters, train=None)[source]
drain.data.binarize_sets(df, columns, cast=False, drop=True, min_freq=None)[source]

Create dummies for the elements of a set-valued column. Operates in place. Args:

df: data frame columns: either a dictionary of column: values pairs or a collection of columns. cast: whether or not to cast values to set drop: whether or not to drop the binarized columns

TODO: make interface same as binarize(). merge the two?

drain.data.counts_to_dicts(df, column)[source]

convert (values, counts) as returned by aggregate.aggregate_counts() to dicts makes expand_counts much faster

drain.data.date_censor(df, date_columns, date)[source]

a dictionary of date_column: [dependent_column1, …] pairs censor the dependent columns when the date column is before the given end_date then censor the date column itself

drain.data.date_censor_sql(date_column, today, column=None)[source]

if today is None, then no censoring otherwise replace each column with:

CASE WHEN {date_column} < ‘{today}’ THEN {column} ELSE null END
drain.data.date_select(df, date_column, date, delta, max_date_column=None)[source]

given a series an end date and number of days, return subset in the date range if delta is None then there is no starting date if max_date_column is specified then look for rows where the interval

[date_column, max_date_column] intersects [date-delta, date+delta)
drain.data.exclude_regexes(strings, exclude, include=None)[source]
drain.data.expand_counts(df, column, values=None)[source]

expand a column containing value:count dictionaries

drain.data.expand_dates(df, columns=[])[source]

generate year, month, day features from specified date features

drain.data.impute(X, value=None, train=None, dropna=True, inplace=True)[source]

Performs mean imputation on a pandas dataframe. Args:

train: an optional training mask with which to compute the mean value: instead of computing the mean, use this as the value argument to fillna dropna: whether to drop all null columns inplace: whether to perform the imputation inplace

Returns: the imputed DataFrame

drain.data.infinite_columns(df)[source]
drain.data.non_numeric_columns(df)[source]
drain.data.normalize(X, train=None)[source]
drain.data.null_columns(df, train=None)[source]
drain.data.parse_delta(s)[source]

parse a string to a delta ‘all’ is represented by None

drain.data.prefix_columns(df, prefix, ignore=[])[source]
drain.data.revise_helper(query)[source]

given sql containing a “CREATE TABLE {table_name} AS ({query})” returns table_name, query

drain.data.revise_sql(query, id_column, output_table, max_date_column, min_date_column, date_column, date, source_id_column=None)[source]

Given an expensive query that aggregates temporal data, Revise the results to censor before a particular date

drain.data.select_features(df, exclude, include=None, inplace=False)[source]
drain.data.select_regexes(strings, regexes)[source]

select subset of strings matching a regex treats strings as a set

drain.data.train_test_subset(df, train, test, drop=False)[source]

narrows df to train | test then narrows train and test to that

drain.dedupe module

drain.dedupe.components_to_df(components, id_func=None)[source]

Convert components to a join table with columns id1, id2 Args:

components: A collection of components, each of which is a set of vertex ids.
If a dictionary, then the key is the id for the component. Otherwise, the component id is determined by applying id_func to the component.
id_func: If components is a dictionary, this should be None. Otherwise,
this is a callable that, given a set of vertices, deermines the id. If components is not a dict and id_func is None, it defaults to min.
Returns: A dataframe representing the one-to-many relationship between
component names (id1) and their members (id2).
drain.dedupe.follow(id, edges, directed=False, _visited=None)[source]

Follow the a graph to find the nodes connected to a given node. Args:

id: the id of the starting node edges: a pandas DataFrame of edges. Each row is an edge with two columns containing

the ids of the vertices.
directed: If True, edges are directed from first column to second column.
Otherwise edges are undirected.

_visited: used internally for recursion

Returns: the set of all nodes connected to the starting node.

drain.dedupe.get_components(edges, vertices=None)[source]

Return connected components from graph determined by edges matrix Args:

edges: DataFrame of (undirected) edges. vertices: set of vertices in graph. Defaults to union of all vertices in edges.
Returns:
set of connected components, each of which is a set of vertices.
drain.dedupe.insert_singletons(source_table, dest_table, id_column, engine)[source]

drain.drake module

drain.drake.get_drake_data(steps)[source]

Returns: a dictionary of outputs mapped to inputs Note that an output is either a target or a leaf node in the

step tree
drain.drake.get_inputs(step, target)[source]

Traverse input parents tree returning all steps which are targets or not targets (depending on argument target). Stop traversing at parent targets

drain.drake.get_inputs_helper(step, ignore, target)[source]

Recursion helper used by get_inputs()

drain.drake.is_step_filename(filename)[source]
drain.drake.is_target_filename(filename)[source]
drain.drake.to_drake_step(inputs, output)[source]
Args:
inputs: collection of input Steps output: output Step

Returns: a string of the drake step for the given inputs and output

drain.drake.to_drakefile(steps, preview=True, debug=False, input_drakefile=None, bindir=None)[source]
Args:
steps: collection of drain.step.Step objects for which to
generate a drakefile
preview: boolean, when False will create directories for output
steps. When True do not touch filesystem.

debug: run python with ‘-m pdb’ drakefile: path to drakefile to include bindir: path to drake binaries, defaults to ../bin/

Returns:
a string representation of the drakefile

drain.explore module

drain.metrics module

drain.metrics.baseline(y_true, y_score=None)[source]
Number of positive labels divided by number of labels,
or zero if there are no labels
drain.metrics.count(y_true, y_score=None, countna=False)[source]

Counts the number of examples. If countna is False then only count labeled examples, i.e. those with y_true not NaN

drain.metrics.count_series(y_true, y_score, countna=False)[source]

Returns series whose i-th entry is the number of examples in the top i

drain.metrics.precision(y_true, y_score, k=None, return_bounds=False)[source]
If return_bounds is False then returns precision on the
labeled examples in the top k.
If return_bounds is True the returns a tuple containing:
  • precision on the labeled examples in the top k
  • number of labeled examples in the top k
  • lower bound of precision in the top k, assuming all
    unlabaled examples are False
  • upper bound of precision in the top k, assuming all
    unlabaled examples are True
drain.metrics.precision_series(y_true, y_score, k=None)[source]

Returns series of length k whose i-th entry is the precision in the top i TODO: extrapolate here

drain.metrics.recall(y_true, y_score, k=None, value=True)[source]

Returns recall (number of positive examples) in the top k If value is False then counts number of negative examples TODO: add prop argument to return recall proportion instead of count

drain.metrics.recall_series(y_true, y_score, k=None, value=True)[source]

Returns series of length k whose i-th entry is the recall in the top i

drain.metrics.roc_auc(y_true, y_score)[source]

Returns are under the ROC curve

drain.model module

class drain.model.Fit(inputs, return_estimator=True, return_feature_importances=False)[source]

Bases: drain.model.FitPredict

class drain.model.FitPredict(inputs, return_estimator=False, return_feature_importances=True, return_predictions=True, prefit=False, predict_train=False)[source]

Bases: drain.step.Step

Step which can fit a scikit-learn estimator and make predictions.

dump()[source]
load()[source]

Load this step’s result from its dump directory

run(estimator, X, y=None, train=None, test=None, aux=None, sample_weight=None, feature_importances=None)[source]
class drain.model.InverseProbabilityWeights(inputs=None, dependencies=None, **kwargs)[source]

Bases: drain.step.Step

run(y, train=None, **kwargs)[source]
class drain.model.LogisticRegression[source]

Bases: object

fit(X, y, **kwargs)[source]
predict_proba(X)[source]
class drain.model.Predict(inputs, return_estimator=False, return_feature_importances=False)[source]

Bases: drain.model.FitPredict

class drain.model.PredictProduct(inputs=None, dependencies=None, **kwargs)[source]

Bases: drain.step.Step

run(**kwargs)[source]
drain.model.apply_forest(run)[source]
drain.model.baseline(predict_step, **kwargs)
drain.model.count(predict_step, **kwargs)
drain.model.count_series(predict_step, **kwargs)
drain.model.feature_importance(estimator, X)[source]
drain.model.forests(**kwargs)[source]
drain.model.function(predict_step, **kwargs)
drain.model.lift(predict_step, **kwargs)[source]
drain.model.lift_series(predict_step, **kwargs)[source]
drain.model.logits(**kwargs)[source]
drain.model.make_metric(function)[source]
drain.model.overlap(self, other, **kwargs)[source]
drain.model.perturb(estimator, X, bins, columns=None)[source]

Predict on peturbations of a feature vector estimator: a fitted sklearn estimator index: the index of the example to perturb bins: a dictionary of column:bins arrays columns: list of columns if bins doesn’t cover all columns TODO make this work when index is multiple rows

drain.model.precision(predict_step, **kwargs)
drain.model.precision_series(predict_step, **kwargs)
drain.model.proximity(run, ix, k)[source]
drain.model.rank(self, **kwargs)[source]
drain.model.recall(predict_step, prop=True, **kwargs)[source]
drain.model.recall_series(predict_step, prop=True, **kwargs)[source]
drain.model.roc_auc(predict_step, **kwargs)
drain.model.similarity(self, other, **kwargs)[source]
drain.model.svms(**kwargs)[source]
drain.model.to_float(predict_step, **kwargs)
drain.model.true_score(y, outcome='true', score='score', **subset_args)[source]
drain.model.y_score(estimator, X)[source]

Score examples from a new matrix X Args:

estimator: an sklearn estimator object X: design matrix with the same features that the estimator was trained on

Returns: a vector of scores of the same length as X

Note that estimator.predict_proba is preferred but when unavailable (e.g. SVM without probability calibration) decision_function is used.

drain.model.y_subset(y, query=None, aux=None, subset=None, dropna=False, outcome='true', k=None, p=None, ascending=False, score='score', p_of='notnull')[source]

Subset a model “y” dataframe Args:

query: operates on y, or aux if present subset: takes a dataframe or index thereof and subsets to that dropna: means drop missing outcomes return: top k (count) or p (proportion) if specified p_of: specifies what the proportion is relative to

‘notnull’ means proportion is relative to labeled count ‘true’ means proportion is relative to positive count ‘all’ means proportion is relative to total count

drain.serialize module

drain.serialize.configure()[source]

Configures YAML parser for Step serialization and deserialization Called in drain/__init__.py

drain.serialize.load(filename)[source]

Load step from yaml file Args:

filename: a target or step.yaml filename
drain.serialize.step_multi_constructor(loader, tag_suffix, node)[source]
drain.serialize.step_multi_representer(dumper, data)[source]

drain.step module

class drain.step.Arguments(args=None, kwargs=None)[source]

Bases: object

A simple wrapper for positional and keyword arguments

class drain.step.Call(_base, _method_name=None, inputs=None, **kwargs)[source]

Bases: drain.step.Step

run(*args, **update_kwargs)[source]
class drain.step.GetItem(step, key=None)[source]

Bases: drain.step.Step

Given a step that returns a dict, this Step grabs a single value from it.

run(*args, **kwargs)[source]
class drain.step.MapResults(inputs, mapping)[source]

Bases: drain.step.Step

This step maps the results of its inputs into a new form of arguments and keyword arguments. It is a useful connector between steps.

DEFAULT = 1
run(*args, **kwargs)[source]
class drain.step.Step(inputs=None, dependencies=None, **kwargs)[source]

Bases: object

dump()[source]
execute(inputs=None, output=None, load_targets=False)[source]

Run this step, recursively running or loading inputs. Used in bin/run_step.py which is run by drake. Args:

inputs: collection of steps that should be loaded output: step that should be dumped after it is run load_targets (boolean): load all steps which are targets.

This argument is not used by run_step.py because target does not get serialized. But it can be useful for running steps directly.
get_arguments(**include)[source]

return a shallow copy of self._kwargs passing {key}=False will pop the {key} from the dict e.g. get_arguments(inputs=False) returns all keywords except inputs

get_input(value, _search=None)[source]

Searches the tree for a step Args:

value: The value to search for. If value is a string then the search looks for
a step of that name. If the value is a type, it looks for a step of that type.

Returns: The first step found via a depth-first search.

get_inputs(_visited=None)[source]

Returns: the set of all input steps

load()[source]

Load this step’s result from its dump directory

run()[source]
setup_dump()[source]

Set up dump, creating directories and writing step.yaml file containing yaml dump of this step.

{drain.PATH}/{self._digest}/
step.yaml dump/
drain.step.load(steps, reload=False)[source]

safely load steps in place, excluding those that fail Args:

steps: the steps to load
drain.step.merge_results(inputs, arguments=None)[source]
Merges results to form arguments to run(). There are two cases for each result:
  • dictionary: dictionaries get merged and passed as keyword arguments
  • list: lists get concatenated to positional arguments
  • Arguments: kwargs gets merged and args gets appended
  • else: concatenated and passed as postitional arguments
Args:
inputs: the inputs whose results to merge arguments: an optional existing Arguments object to merge into

drain.util module

class drain.util.PgSQLDatabase(engine, schema=None, meta=None)[source]

Bases: pandas.io.sql.SQLDatabase

read_sql(query, raise_on_error=True, **kwargs)[source]

Read SQL query into a DataFrame.

sql : string
SQL query to be executed.
index_col : string, optional, default: None
Column name to use as index for the returned DataFrame object.
coerce_float : boolean, default True
Attempt to convert values of non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets.
params : list, tuple or dict, optional, default: None
List of parameters to pass to execute method. The syntax used to pass parameters is database driver dependent. Check your database driver documentation for which of the five syntax styles, described in PEP 249’s paramstyle, is supported. Eg. for psycopg2, uses %(name)s so use params={‘name’ : ‘value’}
parse_dates : list or dict, default: None
  • List of column names to parse as dates.
  • Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times, or is one of (D, s, ns, ms, us) in case of parsing integer timestamps.
  • Dict of {column_name: arg dict}, where the arg dict corresponds to the keyword arguments of pandas.to_datetime() Especially useful with databases without native Datetime support, such as SQLite.
chunksize : int, default None
If specified, return an iterator where chunksize is the number of rows to include in each chunk.

DataFrame

read_sql_table : Read SQL database table into a DataFrame read_sql

read_table(name, schema=None)[source]

Read SQL database table into a DataFrame.

table_name : string
Name of SQL table in database.
index_col : string, optional, default: None
Column to set as index.
coerce_float : boolean, default True
Attempts to convert values of non-string, non-numeric objects (like decimal.Decimal) to floating point. This can result in loss of precision.
parse_dates : list or dict, default: None
  • List of column names to parse as dates.
  • Dict of {column_name: format string} where format string is strftime compatible in case of parsing string times, or is one of (D, s, ns, ms, us) in case of parsing integer timestamps.
  • Dict of {column_name: arg}, where the arg corresponds to the keyword arguments of pandas.to_datetime(). Especially useful with databases without native Datetime support, such as SQLite.
columns : list, default: None
List of column names to select from SQL table.
schema : string, default None
Name of SQL schema in database to query (if database flavor supports this). If specified, this overwrites the default schema of the SQL database object.
chunksize : int, default None
If specified, return an iterator where chunksize is the number of rows to include in each chunk.

DataFrame

pandas.read_sql_table SQLDatabase.read_query

to_sql(frame, name, if_exists='fail', index=True, index_label=None, schema=None, chunksize=None, dtype=None, pk=None, prefixes=None, raise_on_error=True)[source]

Write records stored in a DataFrame to a SQL database.

frame : DataFrame name : string

Name of SQL table
if_exists : {‘fail’, ‘replace’, ‘append’}, default ‘fail’
  • fail: If table exists, do nothing.
  • replace: If table exists, drop it, recreate it, and insert data.
  • append: If table exists, insert data. Create if does not exist.
index : boolean, default True
Write DataFrame index as a column
index_label : string or sequence, default None
Column label for index column(s). If None is given (default) and index is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex.
schema : string, default None
Name of SQL schema in database to write to (if database flavor supports this). If specified, this overwrites the default schema of the SQLDatabase object.
chunksize : int, default None
If not None, then rows will be written in batches of this size at a time. If None, all rows will be written at once.
dtype : dict of column name to SQL type, default None
Optional specifying the datatype for columns. The SQL type should be a SQLAlchemy type.

pk: name of column(s) to set as primary keys

drain.util.conditional_join(left, right, left_on, right_on, condition, lsuffix='_left', rsuffix='_right')[source]
drain.util.create_db()[source]
drain.util.create_engine()[source]
drain.util.cross_join(left, right, lsuffix='_left', rsuffix='_right')[source]
drain.util.date_ceil(month, day)[source]
drain.util.date_floor(month, day)[source]
drain.util.date_to_days(date)[source]

Number of days since epoch

drain.util.dict_diff(dicts)[source]

Subset dictionaries to keys which map to multiple values

drain.util.dict_expand(d, prefix=None)[source]

Recursively expand subdictionaries returning dictionary dict_expand({1:{2:3}, 4:5}) = {(1,2):3, 4:5}

drain.util.dict_filter_none(d)[source]

filter none values from dict

drain.util.dict_merge(*dict_args)[source]

Given any number of dicts, shallow copy and merge into a new dict, precedence goes to key value pairs in latter dicts.

drain.util.dict_product(*d, **kwargs)[source]

cartesian product of dict whose values are lists Args:

d: dictionary to take product of. multiple dictionaries will first
be merged by dict_merge

kwargs: additional kwargs for convenience

Returns:
a list of dictionaries with the same keys as d and kwargs
drain.util.dict_subset(d, keys)[source]
drain.util.dict_update_union(d1, d2)[source]

update a set-valued dictionary when key exists, union sets

drain.util.drop_collinear(df, tol=0.1, verbose=True)[source]
drain.util.drop_constant_column_levels(df)[source]

drop the levels of a multi-level column dataframe which are constant operates in place

drain.util.eqattr(object1, object2, attr)[source]
drain.util.execute_sql(sql, engine)[source]
drain.util.get_attr(name)[source]

get a class or function by name

drain.util.get_collection_values(a)[source]
drain.util.get_collinear(df, tol=0.1, verbose=False)[source]
drain.util.get_subdirs(directory)[source]

Returns: a list of subdirectories of the given directory

drain.util.indent(s, n_spaces=2, initial=True)[source]

Indent all new lines Args:

n_spaces: number of spaces to use for indentation initial: whether or not to start with an indent
drain.util.init_object(name, **kwargs)[source]
drain.util.intersect(sets)[source]
drain.util.is_instance_collection(c, cls)[source]
Args:
c: any object cls: a class or a list/tuple of classes
Returns: True if c is a non-empty collection of objects, each of which
is an instance of one of the specified classes.
drain.util.list_expand(d, prefix=None)[source]

Recursively expand dictionaries into lists e.g. list_expand({1:{2:[3,4]}, 5:[6]}) == [(1,2,3), (1,2,4), (5,6)]

drain.util.list_filter_none(l)[source]

filter none values from a list

drain.util.make_list(a)[source]
drain.util.make_tuple(a)[source]
drain.util.mode(series)[source]

pandas mode is “empty if nothing has 2+ occurrences.” this method always returns something:

nan if the series is empty/nan), breaking ties arbitrarily
drain.util.mtime(path)[source]
drain.util.nunique(iterable)[source]
drain.util.parse_dates(df, inplace=True, *args, **kwargs)[source]

Parse all datetime.date and datetime.datetime columns

drain.util.randdates(start, end, size)[source]
drain.util.randtimedelta(low, high, size)[source]
drain.util.read_file(*args, **kwargs)[source]
drain.util.set_dtypes(df, dtypes)[source]
drain.util.timestamp(year, month, day)[source]

Convenient constructor for pandas Timestamp

drain.util.to_float(*args)[source]

cast numpy arrays to float32 if there’s more than one, return an array

drain.util.touch(path)[source]
drain.util.union(sets)[source]

Module contents