scikit-learn's Pipeline and Friends, Part II: Efficient Fitting
I gave a talk on my post, scikit-learn's Pipeline and Friends, at work last week. The presentation was rough, but not horrible. It felt fast. And at the end I decided to go off script and fumbled.
Afterwards, I hesitated to share my blog post fearing that I'd be ridiculed. But I reminded myself that this is a journey of learning, and feedback can be useful even if it's negative. So I shared. It's been over a week, and I've yet to receive any negative (or positive) feedback, and I'm okay with that. What I am glad for are the questions I got at the end of my talk, one of which is the topic of this post.
At the request of remaining anonymous, I'll write out the (edited) question without citing the author:
Imagine the
"Age"
estimator is computationally expensive (maybe $1e7$ passengers instead of $1e3$). We fit that first.Now I want to play around with my imputation on
"Fare"
and see how that affects my overall survival classifier accuracy.Hopefully every tweak I make to the
"Fare"
imputer isn't going to retrigger the"Age"
estimator?.
Thank you for your question, anon! Let me start by setting everything up to how it was left at the end of part I.
Setup¶
import re
import warnings
import numpy as np
import pandas as pd
from sklearn.compose import ColumnTransformer
from sklearn.impute import KNNImputer, SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.pipeline import FeatureUnion, Pipeline
from sklearn.preprocessing import FunctionTransformer, MinMaxScaler, OneHotEncoder, TargetEncoder
warnings.filterwarnings(action="ignore", module="sklearn")
train = pd.read_csv("train.csv", index_col="PassengerId")
test = pd.read_csv("test.csv", index_col="PassengerId")
X = train.drop(columns="Survived")
y = train.Survived
X_train, X_val, y_train, y_val = train_test_split(X, y, random_state=0, stratify=y)
cont_pipe = Pipeline(
steps=[
("scale", MinMaxScaler()),
("impute", SimpleImputer()),
],
)
cat_ord_cols = ["Sex", "Pclass", "Embarked", "SibSp", "Parch"]
cat_ord = FeatureUnion(
transformer_list=[
("ohe", OneHotEncoder(drop="first", sparse_output=False, max_categories=5)),
("tgt", TargetEncoder(random_state=0)),
],
)
def get_title(
text: str,
title_pattern: str = r"Mrs?|Miss|Master",
) -> str | None:
"""Get a passenger's title if present.
If more than one title found, return title with
the least number of characters.
If no title found, return None.
The default title_pattern will detect:
- Mr
- Mrs
- Miss
- Master
"""
possible_titles: set[str] = set(re.findall(pattern=title_pattern, string=text))
title: list[str] = sorted(possible_titles, key=len)
if title:
return title.pop(0)
get_title_vec = np.vectorize(get_title)
title_func = FunctionTransformer(func=get_title_vec)
title_pipe = Pipeline(
steps=[
("title_func", title_func),
("ohe", OneHotEncoder(drop=["None"], sparse_output=False)),
],
)
age_title_trf = ColumnTransformer(
transformers=[
("title_pipe", title_pipe, ["Name"]),
("age", "passthrough", ["Age"]),
],
remainder="drop",
)
age_pipe = Pipeline(
steps=[
("age_title_trf", age_title_trf),
("impute_knn", KNNImputer()),
],
)
col_trf = ColumnTransformer(
transformers=[
("fare", cont_pipe, ["Fare"]),
("age", age_pipe, ["Age", "Name"]),
("cat_ord", cat_ord, cat_ord_cols),
],
remainder="drop",
)
pipe = Pipeline(
steps=[
("col_trf", col_trf),
("clf", LogisticRegression(random_state=0)),
],
)
pipe.fit(X_train, y_train)
pipe.score(X_val, y_val)
0.8071748878923767
pipe
Pipeline(steps=[('col_trf', ColumnTransformer(transformers=[('fare', Pipeline(steps=[('scale', MinMaxScaler()), ('impute', SimpleImputer())]), ['Fare']), ('age', Pipeline(steps=[('age_title_trf', ColumnTransformer(transformers=[('title_pipe', Pipeline(steps=[('title_func', FunctionTransformer(func=<numpy.vectorize object at 0x00000233EC509390>)), ('ohe', OneHotEnc... sparse_output=False))]), ['Name']), ('age', 'passthrough', ['Age'])])), ('impute_knn', KNNImputer())]), ['Age', 'Name']), ('cat_ord', FeatureUnion(transformer_list=[('ohe', OneHotEncoder(drop='first', max_categories=5, sparse_output=False)), ('tgt', TargetEncoder(random_state=0))]), ['Sex', 'Pclass', 'Embarked', 'SibSp', 'Parch'])])), ('clf', LogisticRegression(random_state=0))])In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
Pipeline(steps=[('col_trf', ColumnTransformer(transformers=[('fare', Pipeline(steps=[('scale', MinMaxScaler()), ('impute', SimpleImputer())]), ['Fare']), ('age', Pipeline(steps=[('age_title_trf', ColumnTransformer(transformers=[('title_pipe', Pipeline(steps=[('title_func', FunctionTransformer(func=<numpy.vectorize object at 0x00000233EC509390>)), ('ohe', OneHotEnc... sparse_output=False))]), ['Name']), ('age', 'passthrough', ['Age'])])), ('impute_knn', KNNImputer())]), ['Age', 'Name']), ('cat_ord', FeatureUnion(transformer_list=[('ohe', OneHotEncoder(drop='first', max_categories=5, sparse_output=False)), ('tgt', TargetEncoder(random_state=0))]), ['Sex', 'Pclass', 'Embarked', 'SibSp', 'Parch'])])), ('clf', LogisticRegression(random_state=0))])
ColumnTransformer(transformers=[('fare', Pipeline(steps=[('scale', MinMaxScaler()), ('impute', SimpleImputer())]), ['Fare']), ('age', Pipeline(steps=[('age_title_trf', ColumnTransformer(transformers=[('title_pipe', Pipeline(steps=[('title_func', FunctionTransformer(func=<numpy.vectorize object at 0x00000233EC509390>)), ('ohe', OneHotEncoder(drop=['None'], sparse_output=False))]), ['Name']), ('age', 'passthrough', ['Age'])])), ('impute_knn', KNNImputer())]), ['Age', 'Name']), ('cat_ord', FeatureUnion(transformer_list=[('ohe', OneHotEncoder(drop='first', max_categories=5, sparse_output=False)), ('tgt', TargetEncoder(random_state=0))]), ['Sex', 'Pclass', 'Embarked', 'SibSp', 'Parch'])])
['Fare']
MinMaxScaler()
SimpleImputer()
['Age', 'Name']
ColumnTransformer(transformers=[('title_pipe', Pipeline(steps=[('title_func', FunctionTransformer(func=<numpy.vectorize object at 0x00000233EC509390>)), ('ohe', OneHotEncoder(drop=['None'], sparse_output=False))]), ['Name']), ('age', 'passthrough', ['Age'])])
['Name']
FunctionTransformer(func=<numpy.vectorize object at 0x00000233EC509390>)
OneHotEncoder(drop=['None'], sparse_output=False)
['Age']
passthrough
KNNImputer()
['Sex', 'Pclass', 'Embarked', 'SibSp', 'Parch']
OneHotEncoder(drop='first', max_categories=5, sparse_output=False)
TargetEncoder(random_state=0)
LogisticRegression(random_state=0)
Components¶
As I mentioned in the previous post,
our pipe
keeps everything in one composable object.
This means we can swap different components in and out,
as well as add or remove steps.
When we fit a Pipeline
,
it's the equivalent of
calling fit
on each estimator in turn, transform
the input and pass it on to the next step.
But do we have to fit our pipe
if certain components are already fit?
Technically, no.
You could fit each individual component separately and then piece the pipe
together.
In most cases I wouldn't recommend this,
but as anon pointed out there are times when we don't want to refit an expensive estimator.
How would we go about doing this?
Let's take our pipe
apart and see what we can do.
Steps¶
Our pipe
has two steps, col_trf
followed by clf
.
[*pipe.named_steps]
['col_trf', 'clf']
The question calls out the "Age"
imputer and the "Fare"
imputer, both of which are in the col_trf
step.
[*pipe.named_steps.col_trf.named_transformers_]
['fare', 'age', 'cat_ord', 'remainder']
Before we start disecting, we need to define two groups: what needs to be fit vs what doesn't need to be refit.
We want to tinker with the "Fare"
imputer, therefore it belongs in the fit group.
We don't want to modify anything else.
So we drill down and pop out the "Fare"
part in the col_trf
, right?
Not so fast.
We have to remind ourselves that everything after the col_trf
will need to be refit as the inputs could change.
Let's begin by separating the col_trf
from the clf
.
col_trf, clf = pipe.named_steps.values()
Transformers¶
A Pipeline
is made of steps.
A ColumnTransformer
is made of transformers.
As our pipe
has already been fit, that means all the underlying steps have been fit, i.e. col_trf
.
We can check this using scikit-learn
's
check_is_fitted
.
from sklearn.utils.validation import check_is_fitted
# check_is_fitted will return None if the estimator is fit.
check_is_fitted(col_trf) is None
True
To access the individual fitted transformers, we use the
named_transformers_
(or transformers_
) attribute rather than transformers
.
[*col_trf.named_transformers_]
['fare', 'age', 'cat_ord', 'remainder']
We can check that each has been fitted.
# Note that we don't treat the "remainder" as a transformer.
# Also note that the columns aren't included in
# the named_transformers_ full output.
[
(name, check_is_fitted(trf) is None)
for name, trf in col_trf.named_transformers_.items()
if name != "remainder"
]
[('fare', True), ('age', True), ('cat_ord', True)]
Separation¶
col_trf.transformers_
is a list
.
That means we can pop
elements out of it.
To separate the "Fare"
imputer from the rest of col_trf
, we'll pop
out the zeroth element.
NOTE:
The
named_transformers_
attribute is read-only. This means usingpop
will not remove a transformer from thecol_trf
.
NOTE 2:
The HTML representation of
col_trf
will not update after popping the"Fare"
imputer out.
fare_name, fare_pipe, fare_cols = col_trf.transformers_.pop(0)
# "fare" was sucessfully removed from col_trf
[*col_trf.named_transformers_]
['age', 'cat_ord', 'remainder']
Tweak¶
Let's play around with the fare_pipe
.
The goal is to show that we can fit a new transformer on "Fare"
, then add it back to the col_trf
.
What does the fare_pipe
currently look like?
fare_pipe
Pipeline(steps=[('scale', MinMaxScaler()), ('impute', SimpleImputer())])In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
Pipeline(steps=[('scale', MinMaxScaler()), ('impute', SimpleImputer())])
MinMaxScaler()
SimpleImputer()
Suppose we want to impute first, then scale with the
StandardScaler
instead of MinMaxScaler
.
from sklearn.preprocessing import StandardScaler
new_fare_pipe = Pipeline(
steps=[
("impute", SimpleImputer()),
("scale", StandardScaler()),
],
)
new_fare_pipe
Pipeline(steps=[('impute', SimpleImputer()), ('scale', StandardScaler())])In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
Pipeline(steps=[('impute', SimpleImputer()), ('scale', StandardScaler())])
SimpleImputer()
StandardScaler()
We now fit our new_fare_pipe
.
new_fare_pipe.fit(X_train[["Fare"]]);
Then add it back into our col_trf
.
# Note that we add a tuple of (name, transformer, list of columns).
col_trf.transformers_.insert(0, (fare_name, new_fare_pipe, fare_cols))
# "fare" has been inserted into col_trf
[*col_trf.named_transformers_]
['fare', 'age', 'cat_ord', 'remainder']
We can see that the new_fare_pipe
is in col_trf
.
col_trf.named_transformers_.fare
Pipeline(steps=[('impute', SimpleImputer()), ('scale', StandardScaler())])In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
Pipeline(steps=[('impute', SimpleImputer()), ('scale', StandardScaler())])
SimpleImputer()
StandardScaler()
And we see that all the transformers in col_trf
are fitted.
[
(name, check_is_fitted(trf) is None)
for name, trf in col_trf.named_transformers_.items()
if name != "remainder"
]
[('fare', True), ('age', True), ('cat_ord', True)]
Together¶
As I mentioned earlier, the clf
step in pipe
will still need to be fit.
To do this without refitting our "expensive" col_trf
again,
we'll need to transform the data with col_trf
and then fit the clf
.
# Transform data before fitting clf
X_train_trf = col_trf.transform(X_train)
X_val_trf = col_trf.transform(X_val)
clf.fit(X_train_trf, y_train)
clf.score(X_val_trf, y_val)
0.8071748878923767
This isn't a great solution, especially if you want to tune a specific part of your col_trf
or pipe
.
A similar question
was asked on Stack Overflow.
And while I like one of the solutions,
it requires a bit more work than we might want.
What else can we do?
Cache¶
Digging a bit further, I came across
this question
and the solution really highlighted the simplicity of the scikit-learn
API.
Pulling straight from the docs, we can create a temporary directory to hold our transformer configurations. Then whenever we modify one (or multiple) component(s), anything that doesn't change will be reloaded from the cache.
Let's make a dummy
FunctionTransformer
that delays training to see if the caching speeds anything up.
from time import sleep
def sleep_identity(x: np.ndarray) -> np.ndarray:
print("Sleeping...")
sleep(2)
print("Awake!")
return x
sleep_trf = FunctionTransformer(func=sleep_identity)
# Make a temporary directory to hold our cached configs/weights/etc.
from tempfile import mkdtemp
cachedir = mkdtemp()
# We define a new pipe to test with unfit transformers.
# All transformers are the same except the sleep_trf at step 1.
cont_pipe = Pipeline(
steps=[
("scale", MinMaxScaler()),
("impute", SimpleImputer()),
],
)
cat_ord_cols = ["Sex", "Pclass", "Embarked", "SibSp", "Parch"]
cat_ord = FeatureUnion(
transformer_list=[
("ohe", OneHotEncoder(drop="first", sparse_output=False, max_categories=5)),
("tgt", TargetEncoder(random_state=0)),
],
)
title_pipe = Pipeline(
steps=[
("title_func", title_func),
("ohe", OneHotEncoder(drop=["None"], sparse_output=False)),
],
)
age_title_trf = ColumnTransformer(
transformers=[
("title_pipe", title_pipe, ["Name"]),
("age", "passthrough", ["Age"]),
],
remainder="drop",
)
age_pipe = Pipeline(
steps=[
("age_title_trf", age_title_trf),
("impute_knn", KNNImputer()),
],
)
col_trf = ColumnTransformer(
transformers=[
("fare", cont_pipe, ["Fare"]),
("age", age_pipe, ["Age", "Name"]),
("cat_ord", cat_ord, cat_ord_cols),
],
remainder="drop",
)
unfit_pipe = Pipeline(
steps=[
("sleep", sleep_trf), # New!!!
("col_trf", col_trf),
("clf", LogisticRegression(random_state=0)),
],
memory=cachedir, # Set the memory to our temporary directory.
verbose=True, # Set verbose to True so we can see the processing logs.
)
unfit_pipe.fit(X_train, y_train);
Sleeping... Awake! [Pipeline] ............. (step 1 of 3) Processing sleep, total= 2.0s [Pipeline] ........... (step 2 of 3) Processing col_trf, total= 0.0s [Pipeline] ............... (step 3 of 3) Processing clf, total= 0.1s
From the verbose output we can see that it took about $2.1$ seconds to fit the unfit_pipe
.
What happens if we try to fit again without changing anything?
unfit_pipe.fit(X_train, y_train);
[Pipeline] ............... (step 3 of 3) Processing clf, total= 0.1s
$0.1$ seconds! The first two steps were skipped because nothing was changed, so the results were loaded from the cache.
Caching is really helpful when tuning a Pipeline
.
And it turns out that we can drop in a new,
unfit transformer, e.g. unfit_fare_pipe
, and we'll still use cached results from the other previously fit transformers.
# Make a new, unfit fare pipeline and replace the one in unfit_pipe.
unfit_fare_pipe = Pipeline(
steps=[
("impute", SimpleImputer()),
("scale", StandardScaler()),
],
)
unfit_pipe.named_steps.col_trf.transformers_[0] = (fare_name, unfit_fare_pipe, fare_cols)
# Checking if unfit_fare_pipe is fitted will result in a NotFittedError.
from sklearn.exceptions import NotFittedError
try:
print(check_is_fitted(unfit_pipe.named_steps.col_trf.named_transformers_.fare) is None)
except NotFittedError:
print(False)
False
unfit_pipe.fit(X_train, y_train);
[Pipeline] ............... (step 3 of 3) Processing clf, total= 0.1s
# The unfit_fare_pipe is now fitted.
check_is_fitted(unfit_pipe.named_steps.col_trf.named_transformers_.fare) is None
True
# Test that the updated unfit_pipe can score the unseen validation data.
unfit_pipe.score(X_val, y_val)
Sleeping... Awake!
0.8071748878923767
Conclusion¶
When constructing a Pipeline
with potentially expensive steps, it may be wise to cache the fitted results.
This allows us to save resources and avoid recomputing or refitting transformers and estimators that we aren't changing.