drain package¶
Submodules¶
drain.aggregate module¶
Classes that facilitate transformation and aggregation of dataframes.
This module provides wrappers to simplify grouping and aggregation of dataframes by index, and to consequently perform arithmetics on the aggregated dataframes.
Examples:
The following code would take a Pandas dataframe
df, and produce a dataframeres_dfthat is indexed byname. The result would have a columncountthat gives the number of rows pername; a column that gives the sum ofscorepername, divided by the sum ofarrestspername; and finally two columns that give the sum ofscorepername, and the sum of squaredscorepername:aggregates = [Count(), Proportion('score','arrests'), Aggregate(['score', lambda x: x.score**2],'sum') ] res_df = Aggregator(df, aggregates).aggregate('name')Note how we specify all definitions before performing the actual aggregation, and how
Aggregator.aggregate()then takes anindexto group by.
Aggregator also caches individual transformations of columns, as to reduce redundant calculations.
Classes that endusers interface with are Aggregate, Fraction, Count, and Proportion (all of which specify outcome columns and row-wise aggregation functions), and Aggregator (which takes an input dataframe and an index by which rows are being grouped).
-
class
drain.aggregate.Aggregate(column_def, agg_func, name=None, fname=None, astype=None)[source]¶ Bases:
drain.aggregate.ColumnIdentityA highly convenient wrapper around ColumnReductions.
- Example::
- Aggregate([‘arrests’,’income’,’age’], [‘min’,’max’,’mean’])
This would create 3x3 columns: arrests_min, arrests_max, arrests_mean, and so on.
-
class
drain.aggregate.Aggregator(df, column_functions)[source]¶ Bases:
objectBinds column functions to a dataframe and allows for aggregation by a given index.
-
aggregate(index)[source]¶ Performs a groupby of the unique Columns by index, as constructed from self.df.
- Args:
- index (str, or pd.Index): Index or column name of self.df.
- Returns:
- pd.DataFrame: A dataframe, aggregated by index, that contains the result
- of the various ColumnFunctions, and named accordingly.
-
get_reduced(column_reductions)[source]¶ This function gets called by ColumnFunction._apply(). After a ColumnFunction has been passed to Aggregator’s constructor, the ColumnFunction can use this function to request the populated, aggregated columns that correspond to its ColumnReductions.
- Args:
- column_reduction (list[ColumnReduction])
- Returns:
- pd.DataFrame: A dataframe, where the column names are ColumnReductions.
-
-
class
drain.aggregate.ColumnFunction(column_reductions, names)[source]¶ Bases:
objectAbstract base class for functions on reduced Columns; names the outcomes.
Having obtained Columns that have been created and aggregated along rows, with the same index - ColumnReductions, in other words - we might want to perform arithmetics on these ColumnReductions, such as element-wise division of one by the other. These transformations are handled by ColumnFunctions.
- Note: ColumnFunction guarantees an apply_and_name(aggregator). This is the
- lowest-level function in this module that actually returns ‘populated’ DataFrames (as provided by the aggregator). Children of this class thus include functions on pairs of popoulated dataframes, such as division and addition.
-
apply_and_name(aggregator)[source]¶ Fetches the row-aggregated input columns for this ColumnFunction.
- Args:
- aggregator (Aggregator)
- Returns:
- pd.DataFrame: The dataframe has columns with names self.names
- that were created by this ColumnFunction, and is indexed by the index that was passed to aggregator.aggregate(index).
-
class
drain.aggregate.ColumnIdentity(column_reductions, names)[source]¶ Bases:
drain.aggregate.ColumnFunctionThe simplest non-abstract ColumnFunction.
-
class
drain.aggregate.ColumnReduction(column, agg_func)[source]¶ Bases:
objectWraps and hashes a Column together with a function that aggregates across rows.
-
class
drain.aggregate.Count(definition=None, name=None, prop=None, prop_only=False, prop_name=None, astype=None, prop_astype=None)[source]¶ Bases:
drain.aggregate.FractionDefine various counts, sums, and proportions of Columns.
- Examples::
Count() # count the number of rows per grouped index Count(‘Arrests’) # sum column ‘Arrests’ by grouped index Count(‘Arrests’,prop=True) # as above, but also add a column that is normalized,
i.e. where each group’s sum is divided by the size of that groupCount(‘Arrests’,prop=True, prop_only=True) # as above, but exclude the raw sum Count(‘Arrests’, prop=lambda x: x.score**2) # create a column with the sum of Arrests
per grouped index, and also create a column with the sum of Arrests per grouped index divided by the sum of score**2 per grouped index.Count([‘Arrests’,’Stops’]) # create both Count(‘Arrests’) and Count(‘Stops’)
By default names are set similar to this example: ‘count’, ‘Arrests_count’, ‘Arrests_prop’, ‘Arrests_prop_score’, etc.
-
class
drain.aggregate.Fraction(numerator, denominator, name='{numerator}_per_{denominator}', include_numerator=False, include_denominator=False, include_fraction=True)[source]¶ Bases:
drain.aggregate.ColumnFunctionDivides all pairs of column reductions from two column functions.
- Example::
- Fraction(Aggregate([‘arrests’,’score’], ‘sum’), Aggregate(‘score’,’mean’))
This would create two columns: arrests_sum_per_score_mean, and score_sum_per_score_mean.
-
class
drain.aggregate.Proportion(definition, denom_def=True, name=None, denom_name=None, astype=None, denom_astype=None)[source]¶ Bases:
drain.aggregate.CountConvenience wrapper for count.
- Example::
- Proportion(‘Arrests’,’Inspections’)
Creates a column ‘Arrests_prop_Inspections’, which divides the sum of ‘Arrests’ per group by the sum of ‘Inspections’ per group.
-
drain.aggregate.date_min(d)[source]¶ groupby()[‘date_colum’].aggregate(‘min’) returns a float? convert it back to a timestamp
-
drain.aggregate.days(date1, date2)[source]¶ returns a lambda that determines the number of days between the two dates the dates can be strings (column names) or actual dates e.g. Aggregate(days(‘date’, today), [‘min’,’max’], ‘days_since’) TODO: should there be a Days(AggregateBase) for this? handle naming
drain.aggregation module¶
-
class
drain.aggregation.AggregationBase(insert_args, aggregator_args, concat_args, parallel=False, prefix=None, inputs=None)[source]¶ Bases:
drain.step.StepAggregationBase uses aggregate.Aggregator to aggregate data. It can include aggregations over multiple indexes and multiple data transformations (e.g. subsets). The combinations can be run in parallel and can be returned disjoint or concatenated. Finally the results may be pivoted and joined to other datasets.
-
argument_names¶
-
fillna_value(df, left, **concat_args)[source]¶ This method gives subclasses the opportunity to define how join() fills missing values. Return value must be compatible with DataFrame.fillna() value argument. Examples:
- return 0: replace missing values with zero
- return df.mean(): replace missing values with column mean
This default implimentation fills counts with zero. TODO: identify counts more robustly instead of relying on column name
- Typically fill other fields with mean but can’t do that during the join
- because that would leak information across a train/test split
-
get_aggregator(**kwargs)[source]¶ Given the arguments, return an aggregator
This method exists to allow subclasses to use Aggregator objects efficiently, i.e. only apply AggregateSeries once per set of Aggregates. If the set of Aggregates depends on some or none of the arguments the subclass need not recreate Aggregators
-
select(df, args, inplace=False)[source]¶ After joining, selects a subset of arguments df: the result of a call to self.join(left) args: a collcetion of arguments to select, as accepted by drain.util.list_expand:
- a tuple corresponding to concat_args,
- e.g. [(‘District’, ‘12h’), (‘Distict’, ‘24h’)]
- a dict to be exanded into the above,
- e.g. {‘District’: [‘12h’, ‘24h’]}
-
-
class
drain.aggregation.AggregationJoin(inputs, **kwargs)[source]¶ Bases:
drain.step.Stepfirst input is left and second input is aggregation if left step returned a dict, use MapResults to clarify e.g.:
mapping=[{‘aux’: None}]
-
class
drain.aggregation.SimpleAggregation(inputs, indexes, prefix=None, parallel=False)[source]¶ Bases:
drain.aggregation.AggregationBaseA simple AggreationBase subclass with a single aggregrator The only argument is the index An implementation need only define an aggregates attributes, see test_aggregation.SimpleCrimeAggregation for an example.
-
arguments¶
-
get_aggregator(**kwargs)[source]¶ Given the arguments, return an aggregator
This method exists to allow subclasses to use Aggregator objects efficiently, i.e. only apply AggregateSeries once per set of Aggregates. If the set of Aggregates depends on some or none of the arguments the subclass need not recreate Aggregators
-
-
class
drain.aggregation.SpacetimeAggregation(spacedeltas, dates, date_column, parallel=False, max_date_column=None, censor_columns=None, aggregator_args=None, concat_args=None, inputs=None, prefix=None)[source]¶ Bases:
drain.aggregation.AggregationBaseSpacetimeAggregation is an Aggregation over space and time. Specifically, the index is a spatial index and an additional date and delta argument select
a subset of the data to aggregate.- We assume that the index and deltas are independent of the date,
- so every date is aggregated to all spacedeltas
By default the aggregator_args are date and delta (i.e. independent of aggregation index). To change that, pass aggregator_args=[‘date’, ‘delta’, ‘index’] and override get_aggregator
to accept an index argument.- Note that dates should be datetime.datetime, not numpy.datetime64, for yaml serialization
- and to work with dateutil.relativedelta.
- However since pandas automatically turns a datetime column in the index into datetime64
- DatetimeIndex, the left dataframe passed to join() should use datetime64!
See test_aggregation.SpacetimeCrimeAggregation for an example.
-
arguments¶
-
get_aggregator(date, delta)[source]¶ Given the arguments, return an aggregator
This method exists to allow subclasses to use Aggregator objects efficiently, i.e. only apply AggregateSeries once per set of Aggregates. If the set of Aggregates depends on some or none of the arguments the subclass need not recreate Aggregators
-
indexes¶
-
spacedeltas= None¶ - spacedeltas is a dict of the form {name: (index, deltas)}
- where deltas is an array of delta strings
dates are end dates for the aggregators
-
class
drain.aggregation.SpacetimeAggregationJoin(inputs, lag=None, **kwargs)[source]¶ Bases:
drain.aggregation.AggregationJoinSpecify a temporal lag between the aggregations and left Useful for simulating a delay in receipt of aggregation data sources
drain.data module¶
-
class
drain.data.ClassificationData(inputs=None, dependencies=None, **kwargs)[source]¶ Bases:
drain.step.Step
-
class
drain.data.Column(definition, astype=None)[source]¶ Bases:
objectDefines a new or existing column that can be calculated from a dataframe.
Column accepts as
definitiona string that refers to an existing column, or a lambda function that returns a pd.Series, or a constant, in which case it creates a pd.Series of that constant.Columns are hashed based on their definition and type.
-
class
drain.data.CreateDatabase(inputs=None, dependencies=None, **kwargs)[source]¶ Bases:
drain.step.Step
-
class
drain.data.CreateEngine(inputs=None, dependencies=None, **kwargs)[source]¶ Bases:
drain.step.Step
-
class
drain.data.FromSQL(query=None, to_str=None, table=None, tables=None, inputs=None, auto_parse_dates=True, **read_sql_kwargs)[source]¶ Bases:
drain.step.Step
-
class
drain.data.Merge(inputs=None, dependencies=None, **kwargs)[source]¶ Bases:
drain.step.Step
-
class
drain.data.Normalize(inputs=None, dependencies=None, **kwargs)[source]¶ Bases:
drain.step.Step
-
class
drain.data.Revise(sql, id_column, max_date_column, min_date_column, date_column, date, from_sql_args=None, source_id_column=None, **kwargs)[source]¶ Bases:
drain.step.Step
-
class
drain.data.ToHDF(objects_to_ascii=False, **kwargs)[source]¶ Bases:
drain.step.Stepwrite DataFrames to an HDF store pass put_arguments (format, mode, data_columns, etc.) to init pass DataFrames by name via inputs
-
class
drain.data.ToSQL(table_name, **kwargs)[source]¶ Bases:
drain.step.StepStep for util.PgSQLDatabase.to_sql() inputs:
df is the DataFrame to import db is an instance of PgSQLDatabase
defaults to CreateDatabase()- TODO: once drain Steps have outputs,
- include psql/schema/name
-
drain.data.binarize(df, category_classes, all_classes=True, drop=True, astype=None, inplace=True, min_freq=None)[source]¶ Binarize specified categoricals. Works inplace!
- Args:
- df: the DataFrame whose columns to binarize
- category_classes: either a dict of (column : [class1, class2, …]) pairs
- or a collection of column names, in which case classes are given using df[column].unique()
- all_classes: when False, the last class is skipped
- drop: when True, the original categorical columns are dropped
- astype: a type for the resulting binaries, e.g. np.float32.
- When None, use the defualt (bool).
- inplace: whether to modify the DataFrame inplace
- Returns:
- the DataFrame with binarized columns
-
drain.data.binarize_sets(df, columns, cast=False, drop=True, min_freq=None)[source]¶ Create dummies for the elements of a set-valued column. Operates in place. Args:
df: data frame columns: either a dictionary of column: values pairs or a collection of columns. cast: whether or not to cast values to set drop: whether or not to drop the binarized columnsTODO: make interface same as binarize(). merge the two?
-
drain.data.counts_to_dicts(df, column)[source]¶ convert (values, counts) as returned by aggregate.aggregate_counts() to dicts makes expand_counts much faster
-
drain.data.date_censor(df, date_columns, date)[source]¶ a dictionary of date_column: [dependent_column1, …] pairs censor the dependent columns when the date column is before the given end_date then censor the date column itself
-
drain.data.date_censor_sql(date_column, today, column=None)[source]¶ if today is None, then no censoring otherwise replace each column with:
CASE WHEN {date_column} < ‘{today}’ THEN {column} ELSE null END
-
drain.data.date_select(df, date_column, date, delta, max_date_column=None)[source]¶ given a series an end date and number of days, return subset in the date range if delta is None then there is no starting date if max_date_column is specified then look for rows where the interval
[date_column, max_date_column] intersects [date-delta, date+delta)
-
drain.data.expand_counts(df, column, values=None)[source]¶ expand a column containing value:count dictionaries
-
drain.data.expand_dates(df, columns=[])[source]¶ generate year, month, day features from specified date features
-
drain.data.impute(X, value=None, train=None, dropna=True, inplace=True)[source]¶ Performs mean imputation on a pandas dataframe. Args:
train: an optional training mask with which to compute the mean value: instead of computing the mean, use this as the value argument to fillna dropna: whether to drop all null columns inplace: whether to perform the imputation inplaceReturns: the imputed DataFrame
-
drain.data.revise_helper(query)[source]¶ given sql containing a “CREATE TABLE {table_name} AS ({query})” returns table_name, query
-
drain.data.revise_sql(query, id_column, output_table, max_date_column, min_date_column, date_column, date, source_id_column=None)[source]¶ Given an expensive query that aggregates temporal data, Revise the results to censor before a particular date
drain.dedupe module¶
-
drain.dedupe.components_to_df(components, id_func=None)[source]¶ Convert components to a join table with columns id1, id2 Args:
- components: A collection of components, each of which is a set of vertex ids.
- If a dictionary, then the key is the id for the component. Otherwise, the component id is determined by applying id_func to the component.
- id_func: If components is a dictionary, this should be None. Otherwise,
- this is a callable that, given a set of vertices, deermines the id. If components is not a dict and id_func is None, it defaults to min.
- Returns: A dataframe representing the one-to-many relationship between
- component names (id1) and their members (id2).
-
drain.dedupe.follow(id, edges, directed=False, _visited=None)[source]¶ Follow the a graph to find the nodes connected to a given node. Args:
id: the id of the starting node edges: a pandas DataFrame of edges. Each row is an edge with two columns containing
the ids of the vertices.- directed: If True, edges are directed from first column to second column.
- Otherwise edges are undirected.
_visited: used internally for recursion
Returns: the set of all nodes connected to the starting node.
-
drain.dedupe.get_components(edges, vertices=None)[source]¶ Return connected components from graph determined by edges matrix Args:
edges: DataFrame of (undirected) edges. vertices: set of vertices in graph. Defaults to union of all vertices in edges.- Returns:
- set of connected components, each of which is a set of vertices.
drain.drake module¶
-
drain.drake.get_drake_data(steps)[source]¶ Returns: a dictionary of outputs mapped to inputs Note that an output is either a target or a leaf node in the
step tree
-
drain.drake.get_inputs(step, target)[source]¶ Traverse input parents tree returning all steps which are targets or not targets (depending on argument target). Stop traversing at parent targets
-
drain.drake.to_drake_step(inputs, output)[source]¶ - Args:
- inputs: collection of input Steps output: output Step
Returns: a string of the drake step for the given inputs and output
-
drain.drake.to_drakefile(steps, preview=True, debug=False, input_drakefile=None, bindir=None)[source]¶ - Args:
- steps: collection of drain.step.Step objects for which to
- generate a drakefile
- preview: boolean, when False will create directories for output
- steps. When True do not touch filesystem.
debug: run python with ‘-m pdb’ drakefile: path to drakefile to include bindir: path to drake binaries, defaults to ../bin/
- Returns:
- a string representation of the drakefile
drain.explore module¶
drain.metrics module¶
-
drain.metrics.baseline(y_true, y_score=None)[source]¶ - Number of positive labels divided by number of labels,
- or zero if there are no labels
-
drain.metrics.count(y_true, y_score=None, countna=False)[source]¶ Counts the number of examples. If countna is False then only count labeled examples, i.e. those with y_true not NaN
-
drain.metrics.count_series(y_true, y_score, countna=False)[source]¶ Returns series whose i-th entry is the number of examples in the top i
-
drain.metrics.precision(y_true, y_score, k=None, return_bounds=False)[source]¶ - If return_bounds is False then returns precision on the
- labeled examples in the top k.
- If return_bounds is True the returns a tuple containing:
- precision on the labeled examples in the top k
- number of labeled examples in the top k
- lower bound of precision in the top k, assuming all
- unlabaled examples are False
- upper bound of precision in the top k, assuming all
- unlabaled examples are True
-
drain.metrics.precision_series(y_true, y_score, k=None)[source]¶ Returns series of length k whose i-th entry is the precision in the top i TODO: extrapolate here
-
drain.metrics.recall(y_true, y_score, k=None, value=True)[source]¶ Returns recall (number of positive examples) in the top k If value is False then counts number of negative examples TODO: add prop argument to return recall proportion instead of count
drain.model module¶
-
class
drain.model.Fit(inputs, return_estimator=True, return_feature_importances=False)[source]¶ Bases:
drain.model.FitPredict
-
class
drain.model.FitPredict(inputs, return_estimator=False, return_feature_importances=True, return_predictions=True, prefit=False, predict_train=False)[source]¶ Bases:
drain.step.StepStep which can fit a scikit-learn estimator and make predictions.
-
class
drain.model.InverseProbabilityWeights(inputs=None, dependencies=None, **kwargs)[source]¶ Bases:
drain.step.Step
-
class
drain.model.Predict(inputs, return_estimator=False, return_feature_importances=False)[source]¶ Bases:
drain.model.FitPredict
-
class
drain.model.PredictProduct(inputs=None, dependencies=None, **kwargs)[source]¶ Bases:
drain.step.Step
-
drain.model.baseline(predict_step, **kwargs)¶
-
drain.model.count(predict_step, **kwargs)¶
-
drain.model.count_series(predict_step, **kwargs)¶
-
drain.model.function(predict_step, **kwargs)¶
-
drain.model.perturb(estimator, X, bins, columns=None)[source]¶ Predict on peturbations of a feature vector estimator: a fitted sklearn estimator index: the index of the example to perturb bins: a dictionary of column:bins arrays columns: list of columns if bins doesn’t cover all columns TODO make this work when index is multiple rows
-
drain.model.precision(predict_step, **kwargs)¶
-
drain.model.precision_series(predict_step, **kwargs)¶
-
drain.model.roc_auc(predict_step, **kwargs)¶
-
drain.model.to_float(predict_step, **kwargs)¶
-
drain.model.y_score(estimator, X)[source]¶ Score examples from a new matrix X Args:
estimator: an sklearn estimator object X: design matrix with the same features that the estimator was trained onReturns: a vector of scores of the same length as X
Note that estimator.predict_proba is preferred but when unavailable (e.g. SVM without probability calibration) decision_function is used.
-
drain.model.y_subset(y, query=None, aux=None, subset=None, dropna=False, outcome='true', k=None, p=None, ascending=False, score='score', p_of='notnull')[source]¶ Subset a model “y” dataframe Args:
query: operates on y, or aux if present subset: takes a dataframe or index thereof and subsets to that dropna: means drop missing outcomes return: top k (count) or p (proportion) if specified p_of: specifies what the proportion is relative to
‘notnull’ means proportion is relative to labeled count ‘true’ means proportion is relative to positive count ‘all’ means proportion is relative to total count
drain.serialize module¶
-
drain.serialize.configure()[source]¶ Configures YAML parser for Step serialization and deserialization Called in drain/__init__.py
drain.step module¶
-
class
drain.step.Arguments(args=None, kwargs=None)[source]¶ Bases:
objectA simple wrapper for positional and keyword arguments
-
class
drain.step.Call(_base, _method_name=None, inputs=None, **kwargs)[source]¶ Bases:
drain.step.Step
-
class
drain.step.GetItem(step, key=None)[source]¶ Bases:
drain.step.StepGiven a step that returns a dict, this Step grabs a single value from it.
-
class
drain.step.MapResults(inputs, mapping)[source]¶ Bases:
drain.step.StepThis step maps the results of its inputs into a new form of arguments and keyword arguments. It is a useful connector between steps.
-
DEFAULT= 1¶
-
-
class
drain.step.Step(inputs=None, dependencies=None, **kwargs)[source]¶ Bases:
object-
execute(inputs=None, output=None, load_targets=False)[source]¶ Run this step, recursively running or loading inputs. Used in bin/run_step.py which is run by drake. Args:
inputs: collection of steps that should be loaded output: step that should be dumped after it is run load_targets (boolean): load all steps which are targets.
This argument is not used by run_step.py because target does not get serialized. But it can be useful for running steps directly.
-
get_arguments(**include)[source]¶ return a shallow copy of self._kwargs passing {key}=False will pop the {key} from the dict e.g. get_arguments(inputs=False) returns all keywords except inputs
-
-
drain.step.load(steps, reload=False)[source]¶ safely load steps in place, excluding those that fail Args:
steps: the steps to load
-
drain.step.merge_results(inputs, arguments=None)[source]¶ - Merges results to form arguments to run(). There are two cases for each result:
- dictionary: dictionaries get merged and passed as keyword arguments
- list: lists get concatenated to positional arguments
- Arguments: kwargs gets merged and args gets appended
- else: concatenated and passed as postitional arguments
- Args:
- inputs: the inputs whose results to merge arguments: an optional existing Arguments object to merge into
drain.util module¶
-
class
drain.util.PgSQLDatabase(engine, schema=None, meta=None)[source]¶ Bases:
pandas.io.sql.SQLDatabase-
read_sql(query, raise_on_error=True, **kwargs)[source]¶ Read SQL query into a DataFrame.
- sql : string
- SQL query to be executed.
- index_col : string, optional, default: None
- Column name to use as index for the returned DataFrame object.
- coerce_float : boolean, default True
- Attempt to convert values of non-string, non-numeric objects (like decimal.Decimal) to floating point, useful for SQL result sets.
- params : list, tuple or dict, optional, default: None
- List of parameters to pass to execute method. The syntax used to pass parameters is database driver dependent. Check your database driver documentation for which of the five syntax styles, described in PEP 249’s paramstyle, is supported. Eg. for psycopg2, uses %(name)s so use params={‘name’ : ‘value’}
- parse_dates : list or dict, default: None
- List of column names to parse as dates.
- Dict of
{column_name: format string}where format string is strftime compatible in case of parsing string times, or is one of (D, s, ns, ms, us) in case of parsing integer timestamps. - Dict of
{column_name: arg dict}, where the arg dict corresponds to the keyword arguments ofpandas.to_datetime()Especially useful with databases without native Datetime support, such as SQLite.
- chunksize : int, default None
- If specified, return an iterator where chunksize is the number of rows to include in each chunk.
DataFrame
read_sql_table : Read SQL database table into a DataFrame read_sql
-
read_table(name, schema=None)[source]¶ Read SQL database table into a DataFrame.
- table_name : string
- Name of SQL table in database.
- index_col : string, optional, default: None
- Column to set as index.
- coerce_float : boolean, default True
- Attempts to convert values of non-string, non-numeric objects (like decimal.Decimal) to floating point. This can result in loss of precision.
- parse_dates : list or dict, default: None
- List of column names to parse as dates.
- Dict of
{column_name: format string}where format string is strftime compatible in case of parsing string times, or is one of (D, s, ns, ms, us) in case of parsing integer timestamps. - Dict of
{column_name: arg}, where the arg corresponds to the keyword arguments ofpandas.to_datetime(). Especially useful with databases without native Datetime support, such as SQLite.
- columns : list, default: None
- List of column names to select from SQL table.
- schema : string, default None
- Name of SQL schema in database to query (if database flavor supports this). If specified, this overwrites the default schema of the SQL database object.
- chunksize : int, default None
- If specified, return an iterator where chunksize is the number of rows to include in each chunk.
DataFrame
pandas.read_sql_table SQLDatabase.read_query
-
to_sql(frame, name, if_exists='fail', index=True, index_label=None, schema=None, chunksize=None, dtype=None, pk=None, prefixes=None, raise_on_error=True)[source]¶ Write records stored in a DataFrame to a SQL database.
frame : DataFrame name : string
Name of SQL table- if_exists : {‘fail’, ‘replace’, ‘append’}, default ‘fail’
- fail: If table exists, do nothing.
- replace: If table exists, drop it, recreate it, and insert data.
- append: If table exists, insert data. Create if does not exist.
- index : boolean, default True
- Write DataFrame index as a column
- index_label : string or sequence, default None
- Column label for index column(s). If None is given (default) and index is True, then the index names are used. A sequence should be given if the DataFrame uses MultiIndex.
- schema : string, default None
- Name of SQL schema in database to write to (if database flavor supports this). If specified, this overwrites the default schema of the SQLDatabase object.
- chunksize : int, default None
- If not None, then rows will be written in batches of this size at a time. If None, all rows will be written at once.
- dtype : dict of column name to SQL type, default None
- Optional specifying the datatype for columns. The SQL type should be a SQLAlchemy type.
pk: name of column(s) to set as primary keys
-
-
drain.util.conditional_join(left, right, left_on, right_on, condition, lsuffix='_left', rsuffix='_right')[source]¶
-
drain.util.dict_expand(d, prefix=None)[source]¶ Recursively expand subdictionaries returning dictionary dict_expand({1:{2:3}, 4:5}) = {(1,2):3, 4:5}
-
drain.util.dict_merge(*dict_args)[source]¶ Given any number of dicts, shallow copy and merge into a new dict, precedence goes to key value pairs in latter dicts.
-
drain.util.dict_product(*d, **kwargs)[source]¶ cartesian product of dict whose values are lists Args:
- d: dictionary to take product of. multiple dictionaries will first
- be merged by dict_merge
kwargs: additional kwargs for convenience
- Returns:
- a list of dictionaries with the same keys as d and kwargs
-
drain.util.dict_update_union(d1, d2)[source]¶ update a set-valued dictionary when key exists, union sets
-
drain.util.drop_constant_column_levels(df)[source]¶ drop the levels of a multi-level column dataframe which are constant operates in place
-
drain.util.indent(s, n_spaces=2, initial=True)[source]¶ Indent all new lines Args:
n_spaces: number of spaces to use for indentation initial: whether or not to start with an indent
-
drain.util.is_instance_collection(c, cls)[source]¶ - Args:
- c: any object cls: a class or a list/tuple of classes
- Returns: True if c is a non-empty collection of objects, each of which
- is an instance of one of the specified classes.
-
drain.util.list_expand(d, prefix=None)[source]¶ Recursively expand dictionaries into lists e.g. list_expand({1:{2:[3,4]}, 5:[6]}) == [(1,2,3), (1,2,4), (5,6)]
-
drain.util.mode(series)[source]¶ pandas mode is “empty if nothing has 2+ occurrences.” this method always returns something:
nan if the series is empty/nan), breaking ties arbitrarily
-
drain.util.parse_dates(df, inplace=True, *args, **kwargs)[source]¶ Parse all datetime.date and datetime.datetime columns