Phase 02 - Lesson 13
Pipelines de ML
This lesson includes a graded coding exercise that runs in your browser, unlocked with lifetime access.
Um modelo não é um produto. Um pipeline é. O pipeline inclui tudo, desde dados brutos até previsões implementadas, e cada etapa deve ser reproduzível.
Tipo: Construir Idioma: Python Pré-requisitos: Fase 2, Lição 12 (Ajuste de hiperparâmetros) Tempo: ~120 minutos
Objetivos de aprendizagem
- Crie um pipeline de ML do zero que encadeie imputação, dimensionamento, codificação e treinamento de modelo em um único objeto reproduzível
- Identificar cenários de vazamento de dados e explicar como os pipelines os evitam, ajustando transformadores apenas nos dados de treinamento
- Construa um ColumnTransformer que aplique diferentes pré-processamentos a recursos numéricos e categóricos
- Implementar a serialização de pipeline e demonstrar que o mesmo pipeline instalado produz resultados idênticos em treinamento e produção
O problema
Você tem um notebook que carrega dados, preenche valores ausentes com a mediana, dimensiona recursos, treina um modelo e imprime precisão. Funciona. Você envia.
Um mês depois, alguém retreina o modelo e obtém resultados diferentes. A mediana foi calculada no conjunto de dados completo, incluindo dados de teste (vazamento de dados). Os parâmetros de escala não foram salvos, portanto a inferência utiliza estatísticas diferentes. O código de engenharia de características foi copiado e colado entre o treinamento e a veiculação, e as cópias divergiram. Uma coluna categórica ganhou um novo valor na produção que o codificador nunca viu.
Estas não são hipotéticas. Esses são os motivos mais comuns pelos quais os sistemas de ML falham na produção. Os pipelines resolvem todos eles empacotando cada etapa de transformação em um objeto único, ordenado e reproduzível.
O Conceito
O que é um Pipeline
Um pipeline é uma sequência ordenada de transformações de dados seguida por um modelo. Cada etapa recebe a saída da etapa anterior como entrada. Todo o pipeline é ajustado uma vez nos dados de treinamento. No momento da inferência, o mesmo pipeline ajustado transforma novos dados e produz previsões.
flowchart LR
A[Raw Data] --> B[Impute Missing Values]
B --> C[Scale Numeric Features]
C --> D[Encode Categoricals]
D --> E[Train Model]
E --> F[Prediction]
O pipeline garante:
- As transformações são ajustadas apenas nos dados de treinamento (sem vazamento)
- As mesmas transformações são aplicadas no tempo de inferência
- O objeto inteiro pode ser serializado e implantado como um artefato
- A validação cruzada aplica o pipeline por dobra, evitando vazamentos sutis
Vazamento de dados: o assassino silencioso
O vazamento de dados ocorre quando informações do conjunto de testes ou dados futuros contaminam o treinamento. Pipelines evitam as formas mais comuns.
Vazamento (errado):
X = df.drop("target", axis=1)
y = df["target"]
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_train, X_test = X_scaled[:800], X_scaled[800:]
y_train, y_test = y[:800], y[800:]
O escalador viu dados de teste. A média e o desvio padrão incluem amostras de teste. Isso aumenta as estimativas de precisão.
Correto:
X_train, X_test = X[:800], X[800:]
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
Com um pipeline, você não precisa pensar nisso. O pipeline lida com isso automaticamente.
sklearn Pipeline
transformadores de cadeias Pipeline do sklearn e um estimador. Ele expõe .fit(), .predict() e .score() que aplicam todas as etapas em ordem.
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
pipe = Pipeline([
("scaler", StandardScaler()),
("model", LogisticRegression()),
])
pipe.fit(X_train, y_train)
predictions = pipe.predict(X_test)
Quando você liga para pipe.fit(X_train, y_train):
- Scaler chama
fit_transformem X_train - Chamadas de modelo
fitno X_train dimensionado
Quando você liga para pipe.predict(X_test):
- O escalonador chama
transform(não fit_transform) em X_test - Chamadas de modelo
predictno X_test escalonado
O escalador nunca vê os dados de teste durante o ajuste. Este é o ponto principal.
ColumnTransformer: Pipelines diferentes para colunas diferentes
Conjuntos de dados reais possuem colunas numéricas e categóricas que precisam de pré-processamento diferente. ColumnTransformer lida com isso.
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer
numeric_pipe = Pipeline([
("impute", SimpleImputer(strategy="median")),
("scale", StandardScaler()),
])
categorical_pipe = Pipeline([
("impute", SimpleImputer(strategy="most_frequent")),
("encode", OneHotEncoder(handle_unknown="ignore")),
])
preprocessor = ColumnTransformer([
("num", numeric_pipe, ["age", "income", "score"]),
("cat", categorical_pipe, ["city", "gender", "plan"]),
])
full_pipeline = Pipeline([
("preprocess", preprocessor),
("model", GradientBoostingClassifier()),
])
O handle_unknown="ignore" em OneHotEncoder é crítico para a produção. Quando uma nova categoria aparece (uma cidade que o modelo nunca viu), ela produz um vetor zero em vez de falhar.
Acompanhamento de experimentos
Um pipeline torna o treinamento reproduzível, mas você também precisa acompanhar o que aconteceu nos experimentos: quais hiperparâmetros foram usados, qual versão do conjunto de dados, quais eram as métricas, qual código estava em execução.
MLflow é a solução de código aberto mais comum:
import mlflow
with mlflow.start_run():
mlflow.log_param("max_depth", 5)
mlflow.log_param("n_estimators", 100)
mlflow.log_param("learning_rate", 0.1)
pipe.fit(X_train, y_train)
accuracy = pipe.score(X_test, y_test)
mlflow.log_metric("accuracy", accuracy)
mlflow.sklearn.log_model(pipe, "model")
Cada execução é registrada com parâmetros, métricas, artefatos e o modelo completo. Você pode comparar execuções, reproduzir qualquer experimento e implantar qualquer versão do modelo.
Weights & Biases (wandb) fornece a mesma funcionalidade com um painel hospedado:
import wandb
wandb.init(project="my-pipeline")
wandb.config.update({"max_depth": 5, "n_estimators": 100})
pipe.fit(X_train, y_train)
accuracy = pipe.score(X_test, y_test)
wandb.log({"accuracy": accuracy})
Controle de versão do modelo
Após o acompanhamento do experimento, você precisa gerenciar as versões do modelo. Qual modelo está em produção? Qual é a encenação? Qual foi o da semana passada?
O Registro de Modelo do MLflow fornece:
- Rastreamento de versão: Cada modelo salvo recebe um número de versão
- Transições de palco: "Encenação", "Produção", "Arquivado"
- Fluxo de trabalho de aprovação: os modelos devem ser explicitamente promovidos para produção
- Reversão: Volte para uma versão anterior instantaneamente
Versionamento de dados com DVC
O código é versionado com git. Os dados também devem ser versionados, mas o git não pode lidar com arquivos grandes. DVC (Data Version Control) resolve isso.
dvc init
dvc add data/training.csv
git add data/training.csv.dvc data/.gitignore
git commit -m "Track training data"
dvc push
O DVC armazena os dados reais em armazenamento remoto (S3, GCS, Azure) e mantém um pequeno arquivo .dvc no git que registra o hash. Quando você faz checkout de um commit do git, dvc checkout restaura os dados exatos que foram usados.
Isso significa que cada commit do git fixa o código e os dados. Reprodutibilidade total.
Experimentos reproduzíveis
Um experimento reproduzível requer quatro coisas:
- Sementes aleatórias fixas: Definir sementes para numpy, aleatório e estrutura (tocha, sklearn)
- Dependências fixadas: requisitos.txt ou poesia.lock com versões exatas
- Dados versionados: DVC ou similar
- Arquivos de configuração: Todos os hiperparâmetros em uma configuração, não codificados
import numpy as np
import random
def set_seed(seed=42):
random.seed(seed)
np.random.seed(seed)
try:
import torch
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
except ImportError:
pass
Do Notebook à Produção Pipeline
flowchart TD
A[Jupyter Notebook] --> B[Extract functions]
B --> C[Build Pipeline object]
C --> D[Add config file for hyperparameters]
D --> E[Add experiment tracking]
E --> F[Add data validation]
F --> G[Add tests]
G --> H[Package for deployment]
style A fill:#fdd,stroke:#333
style H fill:#dfd,stroke:#333
A progressão típica:
- Exploração de notebook: Experimentos rápidos, visualizações, ideias de recursos
- Extrair funções: Mover pré-processamento, engenharia de características e avaliação em módulos
- Construir Pipeline: Encadear transformações em um sklearn Pipeline ou classe personalizada
- Gerenciamento de configuração: Mova todos os hiperparâmetros para uma configuração YAML/JSON
- Acompanhamento de experimentos: Adicionar registro de MLflow ou wandb
- Validação de dados: Verifique esquema, distribuições e padrões de valores ausentes antes do treinamento
- Testes: Testes de unidade para transformadores, testes de integração para o pipeline completo
- Implantação: Serialize o pipeline, envolva-o em um API (FastAPI, Flask), conteinerize
Erros Pipeline Comuns
| Erro | Por que é ruim | Correção |
|---|---|---|
| Ajustando os dados completos antes da divisão | Vazamento de dados | Use Pipeline com cross_val_score |
| Engenharia de características fora do pipeline | Diferentes transformações em treinar vs servir | Coloque todas as transformações no Pipeline |
| Não tratando categorias desconhecidas | Queda de produção em novos valores | OneHotEncoder(handle_unknown="ignorar") |
| Nomes de colunas codificados | Quebra quando o esquema muda | Use listas de nomes de colunas de config |
| Sem validação de dados | Previsões silenciosamente erradas sobre dados ruins | Adicionar verificações de esquema antes da previsão |
| Distorção de treinamento/serviço | Modelo vê recursos diferentes na produção | Um objeto Pipeline para ambos |
Construa
O código em code/pipeline.py cria um pipeline de ML completo do zero:
Etapa 1: transformador personalizado
class CustomTransformer:
def __init__(self):
self.means = None
self.stds = None
def fit(self, X):
self.means = np.mean(X, axis=0)
self.stds = np.std(X, axis=0)
self.stds[self.stds == 0] = 1.0
return self
def transform(self, X):
return (X - self.means) / self.stds
def fit_transform(self, X):
return self.fit(X).transform(X)
Etapa 2: Pipeline do zero
class PipelineFromScratch:
def __init__(self, steps):
self.steps = steps
def fit(self, X, y=None):
X_current = X.copy()
for name, step in self.steps[:-1]:
X_current = step.fit_transform(X_current)
name, model = self.steps[-1]
model.fit(X_current, y)
return self
def predict(self, X):
X_current = X.copy()
for name, step in self.steps[:-1]:
X_current = step.transform(X_current)
name, model = self.steps[-1]
return model.predict(X_current)
Etapa 3: validação cruzada com Pipeline
O código demonstra como a validação cruzada com um pipeline evita o vazamento de dados: o escalonador é ajustado separadamente nos dados de treinamento de cada dobra.
Etapa 4: Produção completa Pipeline com sklearn
Um pipeline completo com ColumnTransformer, vários caminhos de pré-processamento e um modelo, treinado com validação cruzada adequada e registro de experimentos.
Envie
Esta lição produz:
outputs/prompt-ml-pipeline.md– uma habilidade para construir e depurar pipelines de MLcode/pipeline.py-- um pipeline completo do zero por meio do sklearn
Exercícios
Crie um pipeline que lide com um conjunto de dados com 3 colunas numéricas e 2 colunas categóricas. Use
ColumnTransformerpara aplicar imputação mediana + escala para números e imputação mais frequente + codificação one-hot para categóricos. Treine com validação cruzada de 5 vezes.Introduza deliberadamente o vazamento de dados: ajuste o escalonador no conjunto de dados completo antes de dividir. Compare a pontuação de validação cruzada (vazada) com a pontuação de validação cruzada do pipeline (limpa). Quão grande é a diferença?
Serialize seu pipeline com
joblib.dump. Carregue-o em um script separado e execute previsões. Verifique se as previsões são idênticas.Adicione um transformador personalizado ao pipeline que cria recursos polinomiais (grau 2) para as duas colunas numéricas mais importantes. Para onde deve ir no pipeline?
Configure o rastreamento do MLflow para o pipeline. Execute 5 experimentos com hiperparâmetros diferentes. Use a UI do MLflow (
mlflow ui) para comparar execuções e escolher o melhor modelo.
Termos-chave
| Prazo | O que as pessoas dizem | O que isso realmente significa |
|---|---|---|
| Pipeline | “Cadeia de transformações + modelo” | Uma sequência ordenada de transformadores instalados e um modelo, aplicado como uma unidade para evitar fugas |
| Vazamento de dados | “Informações de teste vazaram para treinamento” | Usando informações de fora do conjunto de treinamento para construir o modelo, inflacionando as estimativas de desempenho |
| ColumnTransformer | "Pré-processamento diferente por coluna" | Aplica diferentes pipelines a diferentes subconjuntos de colunas, combinando resultados |
| Acompanhamento de experimentos | "Registrando suas corridas" | Registrando parâmetros, métricas, artefatos e versões de código para cada execução de treinamento |
| Fluxo ML | "Rastrear e implantar modelos" | Plataforma de código aberto para rastreamento de experimentos, registro de modelo e implantação |
| DVC | "Git para dados" | Sistema de controle de versão para grandes arquivos de dados, armazenando hashes em git e dados em armazenamento remoto |
| Cadastro de modelo | "Catálogo de versões de modelo" | Um sistema que rastreia versões de modelos com rótulos de palco (encenação, produção, arquivado) |
| Distorção de treinamento/serviço | “Funcionou no notebook” | Diferenças entre como os dados são processados durante o treinamento e a inferência, causando erros silenciosos |
| Reprodutibilidade | “Mesmo código, mesmo resultado” | A capacidade de obter resultados idênticos a partir do mesmo código, dados e configuração |
Leitura Adicional
- scikit-learn Pipeline documentos - a referência oficial do pipeline
- Documentação do MLflow - rastreamento de experimentos e registro de modelo
- documentação DVC - versionamento de dados
- Sculley et al., Hidden Technical Debt in Machine Learning Systems (2015) - o artigo seminal sobre a complexidade dos sistemas de ML
- Práticas recomendadas de ML do Google: regras de ML - conselhos práticos de ML de produção