Phase 02 - Lesson 13

Pipelines de ML

This lesson includes a graded coding exercise that runs in your browser, unlocked with lifetime access.

Um modelo não é um produto. Um pipeline é. O pipeline inclui tudo, desde dados brutos até previsões implementadas, e cada etapa deve ser reproduzível.

Tipo: Construir Idioma: Python Pré-requisitos: Fase 2, Lição 12 (Ajuste de hiperparâmetros) Tempo: ~120 minutos

Objetivos de aprendizagem

  • Crie um pipeline de ML do zero que encadeie imputação, dimensionamento, codificação e treinamento de modelo em um único objeto reproduzível
  • Identificar cenários de vazamento de dados e explicar como os pipelines os evitam, ajustando transformadores apenas nos dados de treinamento
  • Construa um ColumnTransformer que aplique diferentes pré-processamentos a recursos numéricos e categóricos
  • Implementar a serialização de pipeline e demonstrar que o mesmo pipeline instalado produz resultados idênticos em treinamento e produção

O problema

Você tem um notebook que carrega dados, preenche valores ausentes com a mediana, dimensiona recursos, treina um modelo e imprime precisão. Funciona. Você envia.

Um mês depois, alguém retreina o modelo e obtém resultados diferentes. A mediana foi calculada no conjunto de dados completo, incluindo dados de teste (vazamento de dados). Os parâmetros de escala não foram salvos, portanto a inferência utiliza estatísticas diferentes. O código de engenharia de características foi copiado e colado entre o treinamento e a veiculação, e as cópias divergiram. Uma coluna categórica ganhou um novo valor na produção que o codificador nunca viu.

Estas não são hipotéticas. Esses são os motivos mais comuns pelos quais os sistemas de ML falham na produção. Os pipelines resolvem todos eles empacotando cada etapa de transformação em um objeto único, ordenado e reproduzível.

O Conceito

O que é um Pipeline

Um pipeline é uma sequência ordenada de transformações de dados seguida por um modelo. Cada etapa recebe a saída da etapa anterior como entrada. Todo o pipeline é ajustado uma vez nos dados de treinamento. No momento da inferência, o mesmo pipeline ajustado transforma novos dados e produz previsões.

flowchart LR
    A[Raw Data] --> B[Impute Missing Values]
    B --> C[Scale Numeric Features]
    C --> D[Encode Categoricals]
    D --> E[Train Model]
    E --> F[Prediction]

O pipeline garante:

  • As transformações são ajustadas apenas nos dados de treinamento (sem vazamento)
  • As mesmas transformações são aplicadas no tempo de inferência
  • O objeto inteiro pode ser serializado e implantado como um artefato
  • A validação cruzada aplica o pipeline por dobra, evitando vazamentos sutis

Vazamento de dados: o assassino silencioso

O vazamento de dados ocorre quando informações do conjunto de testes ou dados futuros contaminam o treinamento. Pipelines evitam as formas mais comuns.

Vazamento (errado):

X = df.drop("target", axis=1)
y = df["target"]

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

X_train, X_test = X_scaled[:800], X_scaled[800:]
y_train, y_test = y[:800], y[800:]

O escalador viu dados de teste. A média e o desvio padrão incluem amostras de teste. Isso aumenta as estimativas de precisão.

Correto:

X_train, X_test = X[:800], X[800:]

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

Com um pipeline, você não precisa pensar nisso. O pipeline lida com isso automaticamente.

sklearn Pipeline

transformadores de cadeias Pipeline do sklearn e um estimador. Ele expõe .fit(), .predict() e .score() que aplicam todas as etapas em ordem.

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

pipe = Pipeline([
    ("scaler", StandardScaler()),
    ("model", LogisticRegression()),
])

pipe.fit(X_train, y_train)
predictions = pipe.predict(X_test)

Quando você liga para pipe.fit(X_train, y_train):

  1. Scaler chama fit_transform em X_train
  2. Chamadas de modelo fit no X_train dimensionado

Quando você liga para pipe.predict(X_test):

  1. O escalonador chama transform (não fit_transform) em X_test
  2. Chamadas de modelo predict no X_test escalonado

O escalador nunca vê os dados de teste durante o ajuste. Este é o ponto principal.

ColumnTransformer: Pipelines diferentes para colunas diferentes

Conjuntos de dados reais possuem colunas numéricas e categóricas que precisam de pré-processamento diferente. ColumnTransformer lida com isso.

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

numeric_pipe = Pipeline([
    ("impute", SimpleImputer(strategy="median")),
    ("scale", StandardScaler()),
])

categorical_pipe = Pipeline([
    ("impute", SimpleImputer(strategy="most_frequent")),
    ("encode", OneHotEncoder(handle_unknown="ignore")),
])

preprocessor = ColumnTransformer([
    ("num", numeric_pipe, ["age", "income", "score"]),
    ("cat", categorical_pipe, ["city", "gender", "plan"]),
])

full_pipeline = Pipeline([
    ("preprocess", preprocessor),
    ("model", GradientBoostingClassifier()),
])

O handle_unknown="ignore" em OneHotEncoder é crítico para a produção. Quando uma nova categoria aparece (uma cidade que o modelo nunca viu), ela produz um vetor zero em vez de falhar.

Acompanhamento de experimentos

Um pipeline torna o treinamento reproduzível, mas você também precisa acompanhar o que aconteceu nos experimentos: quais hiperparâmetros foram usados, qual versão do conjunto de dados, quais eram as métricas, qual código estava em execução.

MLflow é a solução de código aberto mais comum:

import mlflow

with mlflow.start_run():
    mlflow.log_param("max_depth", 5)
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("learning_rate", 0.1)

    pipe.fit(X_train, y_train)
    accuracy = pipe.score(X_test, y_test)

    mlflow.log_metric("accuracy", accuracy)
    mlflow.sklearn.log_model(pipe, "model")

Cada execução é registrada com parâmetros, métricas, artefatos e o modelo completo. Você pode comparar execuções, reproduzir qualquer experimento e implantar qualquer versão do modelo.

Weights & Biases (wandb) fornece a mesma funcionalidade com um painel hospedado:

import wandb

wandb.init(project="my-pipeline")
wandb.config.update({"max_depth": 5, "n_estimators": 100})

pipe.fit(X_train, y_train)
accuracy = pipe.score(X_test, y_test)

wandb.log({"accuracy": accuracy})

Controle de versão do modelo

Após o acompanhamento do experimento, você precisa gerenciar as versões do modelo. Qual modelo está em produção? Qual é a encenação? Qual foi o da semana passada?

O Registro de Modelo do MLflow fornece:

  • Rastreamento de versão: Cada modelo salvo recebe um número de versão
  • Transições de palco: "Encenação", "Produção", "Arquivado"
  • Fluxo de trabalho de aprovação: os modelos devem ser explicitamente promovidos para produção
  • Reversão: Volte para uma versão anterior instantaneamente

Versionamento de dados com DVC

O código é versionado com git. Os dados também devem ser versionados, mas o git não pode lidar com arquivos grandes. DVC (Data Version Control) resolve isso.

dvc init
dvc add data/training.csv
git add data/training.csv.dvc data/.gitignore
git commit -m "Track training data"
dvc push

O DVC armazena os dados reais em armazenamento remoto (S3, GCS, Azure) e mantém um pequeno arquivo .dvc no git que registra o hash. Quando você faz checkout de um commit do git, dvc checkout restaura os dados exatos que foram usados.

Isso significa que cada commit do git fixa o código e os dados. Reprodutibilidade total.

Experimentos reproduzíveis

Um experimento reproduzível requer quatro coisas:

  1. Sementes aleatórias fixas: Definir sementes para numpy, aleatório e estrutura (tocha, sklearn)
  2. Dependências fixadas: requisitos.txt ou poesia.lock com versões exatas
  3. Dados versionados: DVC ou similar
  4. Arquivos de configuração: Todos os hiperparâmetros em uma configuração, não codificados
import numpy as np
import random

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    try:
        import torch
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
    except ImportError:
        pass

Do Notebook à Produção Pipeline

flowchart TD
    A[Jupyter Notebook] --> B[Extract functions]
    B --> C[Build Pipeline object]
    C --> D[Add config file for hyperparameters]
    D --> E[Add experiment tracking]
    E --> F[Add data validation]
    F --> G[Add tests]
    G --> H[Package for deployment]

    style A fill:#fdd,stroke:#333
    style H fill:#dfd,stroke:#333

A progressão típica:

  1. Exploração de notebook: Experimentos rápidos, visualizações, ideias de recursos
  2. Extrair funções: Mover pré-processamento, engenharia de características e avaliação em módulos
  3. Construir Pipeline: Encadear transformações em um sklearn Pipeline ou classe personalizada
  4. Gerenciamento de configuração: Mova todos os hiperparâmetros para uma configuração YAML/JSON
  5. Acompanhamento de experimentos: Adicionar registro de MLflow ou wandb
  6. Validação de dados: Verifique esquema, distribuições e padrões de valores ausentes antes do treinamento
  7. Testes: Testes de unidade para transformadores, testes de integração para o pipeline completo
  8. Implantação: Serialize o pipeline, envolva-o em um API (FastAPI, Flask), conteinerize

Erros Pipeline Comuns

Erro Por que é ruim Correção
Ajustando os dados completos antes da divisão Vazamento de dados Use Pipeline com cross_val_score
Engenharia de características fora do pipeline Diferentes transformações em treinar vs servir Coloque todas as transformações no Pipeline
Não tratando categorias desconhecidas Queda de produção em novos valores OneHotEncoder(handle_unknown="ignorar")
Nomes de colunas codificados Quebra quando o esquema muda Use listas de nomes de colunas de config
Sem validação de dados Previsões silenciosamente erradas sobre dados ruins Adicionar verificações de esquema antes da previsão
Distorção de treinamento/serviço Modelo vê recursos diferentes na produção Um objeto Pipeline para ambos

Construa

O código em code/pipeline.py cria um pipeline de ML completo do zero:

Etapa 1: transformador personalizado

class CustomTransformer:
    def __init__(self):
        self.means = None
        self.stds = None

    def fit(self, X):
        self.means = np.mean(X, axis=0)
        self.stds = np.std(X, axis=0)
        self.stds[self.stds == 0] = 1.0
        return self

    def transform(self, X):
        return (X - self.means) / self.stds

    def fit_transform(self, X):
        return self.fit(X).transform(X)

Etapa 2: Pipeline do zero

class PipelineFromScratch:
    def __init__(self, steps):
        self.steps = steps

    def fit(self, X, y=None):
        X_current = X.copy()
        for name, step in self.steps[:-1]:
            X_current = step.fit_transform(X_current)
        name, model = self.steps[-1]
        model.fit(X_current, y)
        return self

    def predict(self, X):
        X_current = X.copy()
        for name, step in self.steps[:-1]:
            X_current = step.transform(X_current)
        name, model = self.steps[-1]
        return model.predict(X_current)

Etapa 3: validação cruzada com Pipeline

O código demonstra como a validação cruzada com um pipeline evita o vazamento de dados: o escalonador é ajustado separadamente nos dados de treinamento de cada dobra.

Etapa 4: Produção completa Pipeline com sklearn

Um pipeline completo com ColumnTransformer, vários caminhos de pré-processamento e um modelo, treinado com validação cruzada adequada e registro de experimentos.

Envie

Esta lição produz:

  • outputs/prompt-ml-pipeline.md – uma habilidade para construir e depurar pipelines de ML
  • code/pipeline.py -- um pipeline completo do zero por meio do sklearn

Exercícios

  1. Crie um pipeline que lide com um conjunto de dados com 3 colunas numéricas e 2 colunas categóricas. Use ColumnTransformer para aplicar imputação mediana + escala para números e imputação mais frequente + codificação one-hot para categóricos. Treine com validação cruzada de 5 vezes.

  2. Introduza deliberadamente o vazamento de dados: ajuste o escalonador no conjunto de dados completo antes de dividir. Compare a pontuação de validação cruzada (vazada) com a pontuação de validação cruzada do pipeline (limpa). Quão grande é a diferença?

  3. Serialize seu pipeline com joblib.dump. Carregue-o em um script separado e execute previsões. Verifique se as previsões são idênticas.

  4. Adicione um transformador personalizado ao pipeline que cria recursos polinomiais (grau 2) para as duas colunas numéricas mais importantes. Para onde deve ir no pipeline?

  5. Configure o rastreamento do MLflow para o pipeline. Execute 5 experimentos com hiperparâmetros diferentes. Use a UI do MLflow (mlflow ui) para comparar execuções e escolher o melhor modelo.

Termos-chave

Prazo O que as pessoas dizem O que isso realmente significa
Pipeline “Cadeia de transformações + modelo” Uma sequência ordenada de transformadores instalados e um modelo, aplicado como uma unidade para evitar fugas
Vazamento de dados “Informações de teste vazaram para treinamento” Usando informações de fora do conjunto de treinamento para construir o modelo, inflacionando as estimativas de desempenho
ColumnTransformer "Pré-processamento diferente por coluna" Aplica diferentes pipelines a diferentes subconjuntos de colunas, combinando resultados
Acompanhamento de experimentos "Registrando suas corridas" Registrando parâmetros, métricas, artefatos e versões de código para cada execução de treinamento
Fluxo ML "Rastrear e implantar modelos" Plataforma de código aberto para rastreamento de experimentos, registro de modelo e implantação
DVC "Git para dados" Sistema de controle de versão para grandes arquivos de dados, armazenando hashes em git e dados em armazenamento remoto
Cadastro de modelo "Catálogo de versões de modelo" Um sistema que rastreia versões de modelos com rótulos de palco (encenação, produção, arquivado)
Distorção de treinamento/serviço “Funcionou no notebook” Diferenças entre como os dados são processados ​​durante o treinamento e a inferência, causando erros silenciosos
Reprodutibilidade “Mesmo código, mesmo resultado” A capacidade de obter resultados idênticos a partir do mesmo código, dados e configuração

Leitura Adicional

0 lifetime access. Curriculum based on AI Engineering from Scratch by Rohit Ghumare (MIT, used under attribution).