Phase 02 - Lesson 13

Canalizaciones de machine learning

This lesson includes a graded coding exercise that runs in your browser, unlocked with lifetime access.

Un modelo no es un producto. Un oleoducto lo es. El proceso abarca desde datos sin procesar hasta predicciones implementadas, y cada paso debe ser reproducible.

Tipo: Construcción Idioma: Python Requisitos previos: Fase 2, Lección 12 (Ajuste de hiperparámetros) Tiempo: ~120 minutos

Objetivos de aprendizaje

  • Cree una canalización de machine learning desde cero que encadene la imputación, el escalado, la codificación y el entrenamiento de modelos en un único objeto reproducible.
  • Identificar escenarios de fuga de datos y explicar cómo las tuberías los previenen instalando transformadores solo en datos de entrenamiento.
  • Construya un ColumnTransformer que aplique un preprocesamiento diferente a características numéricas y categóricas
  • Implementar la serialización de tuberías y demostrar que la misma tubería instalada produce resultados idénticos en entrenamiento y producción.

El problema

Tiene un cuaderno que carga datos, completa los valores faltantes con la mediana, escala características, entrena un modelo e imprime precisión. Funciona. Lo envías.

Un mes después, alguien vuelve a entrenar el modelo y obtiene resultados diferentes. La mediana se calculó en el conjunto de datos completo, incluidos los datos de prueba (fuga de datos). Los parámetros de escala no se guardaron, por lo que la inferencia utiliza estadísticas diferentes. El código de ingeniería de características se copió y pegó entre el entrenamiento y la publicación, y las copias divergieron. Una columna categórica obtuvo en producción un nuevo valor que el codificador nunca había visto.

Estos no son hipotéticos. Son las razones más comunes por las que los sistemas ML fallan en producción. Los pipelines los resuelven todos empaquetando cada paso de transentrenamiento en un objeto único, ordenado y reproducible.

El concepto

Qué es un Pipeline

Una canalización es una secuencia ordenada de transformaciones de datos seguidas de un modelo. Cada paso toma como entrada la salida del paso anterior. Todo el proceso se ajusta una vez a los datos de entrenamiento. En el momento de la inferencia, el mismo proceso transforma nuevos datos y produce predicciones.

flowchart LR
    A[Raw Data] --> B[Impute Missing Values]
    B --> C[Scale Numeric Features]
    C --> D[Encode Categoricals]
    D --> E[Train Model]
    E --> F[Prediction]

El oleoducto garantiza:

  • Las transformaciones se ajustan solo a los datos de entrenamiento (sin fugas)
  • Las mismas transformaciones se aplican en el momento de la inferencia.
  • Todo el objeto se puede serializar e implementar como un solo artefacto.
  • La validación cruzada aplica la tubería por pliegue, evitando fugas sutiles

Fuga de datos: el asesino silencioso

La fuga de datos ocurre cuando la inentrenamiento del conjunto de pruebas o datos futuros contaminan el entrenamiento. Los oleoductos previenen las formas más comunes.

Con fugas (incorrecto):

X = df.drop("target", axis=1)
y = df["target"]

scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)

X_train, X_test = X_scaled[:800], X_scaled[800:]
y_train, y_test = y[:800], y[800:]

El escalador vio datos de prueba. La media y la desviación estándar incluyen muestras de prueba. Esto infla las estimaciones de precisión.

Correcto:

X_train, X_test = X[:800], X[800:]

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

Con una tubería, no es necesario pensar en esto. La tubería lo maneja automáticamente.

aprender Pipeline

Transformadores de cadenas Pipeline de sklearn y un estimador. Expone .fit(), .predict() y .score() que aplican todos los pasos en orden.

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression

pipe = Pipeline([
    ("scaler", StandardScaler()),
    ("model", LogisticRegression()),
])

pipe.fit(X_train, y_train)
predictions = pipe.predict(X_test)

Cuando llamas pipe.fit(X_train, y_train):

  1. Scaler llama a fit_transform en X_train
  2. Llamadas del modelo fit en el X_train escalado

Cuando llamas pipe.predict(X_test):

  1. Scaler llama a transform (no fit_transform) en X_test
  2. Llamadas de modelo predict en el X_test escalado

El escalador nunca ve los datos de la prueba durante la adaptación. Éste es el punto.

ColumnTransformer: Diferentes canalizaciones para diferentes columnas

Los conjuntos de datos reales tienen columnas numéricas y categóricas que necesitan un procesamiento previo diferente. ColumnTransformer maneja esto.

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.impute import SimpleImputer

numeric_pipe = Pipeline([
    ("impute", SimpleImputer(strategy="median")),
    ("scale", StandardScaler()),
])

categorical_pipe = Pipeline([
    ("impute", SimpleImputer(strategy="most_frequent")),
    ("encode", OneHotEncoder(handle_unknown="ignore")),
])

preprocessor = ColumnTransformer([
    ("num", numeric_pipe, ["age", "income", "score"]),
    ("cat", categorical_pipe, ["city", "gender", "plan"]),
])

full_pipeline = Pipeline([
    ("preprocess", preprocessor),
    ("model", GradientBoostingClassifier()),
])

El handle_unknown="ignore" en OneHotEncoder es fundamental para la producción. Cuando aparece una nueva categoría (una ciudad que el modelo nunca ha visto), produce un vector cero en lugar de colapsar.

Seguimiento de experimentos

Una canalización hace que el entrenamiento sea reproducible, pero también es necesario realizar un seguimiento de lo que sucedió en los experimentos: qué hiperparámetros se utilizaron, qué versión del conjunto de datos, cuáles fueron las métricas, qué código se estaba ejecutando.

MLflow es la solución de código abierto más común:

import mlflow

with mlflow.start_run():
    mlflow.log_param("max_depth", 5)
    mlflow.log_param("n_estimators", 100)
    mlflow.log_param("learning_rate", 0.1)

    pipe.fit(X_train, y_train)
    accuracy = pipe.score(X_test, y_test)

    mlflow.log_metric("accuracy", accuracy)
    mlflow.sklearn.log_model(pipe, "model")

Cada ejecución se registra con parámetros, métricas, artefactos y el modelo completo. Puede comparar ejecuciones, reproducir cualquier experimento e implementar cualquier versión de modelo.

Weights & Biases (wandb) proporciona la misma funcionalidad con un panel alojado:

import wandb

wandb.init(project="my-pipeline")
wandb.config.update({"max_depth": 5, "n_estimators": 100})

pipe.fit(X_train, y_train)
accuracy = pipe.score(X_test, y_test)

wandb.log({"accuracy": accuracy})

Versionado del modelo

Después del seguimiento del experimento, debe administrar las versiones del modelo. ¿Qué modelo está en producción? ¿Cuál es la puesta en escena? ¿Cuál fue el de la semana pasada?

El Registro de modelos de MLflow proporciona:

  • Seguimiento de versiones: Cada modelo guardado obtiene un número de versión
  • Transiciones de escenario: "Puesta en escena", "Producción", "Archivado"
  • Flujo de trabajo de aprobación: Los modelos deben promocionarse explícitamente a producción.
  • Revertir: volver a una versión anterior al instante

Versionado de datos con DVC

El código está versionado con git. Los datos también deben tener versiones, pero git no puede manejar archivos grandes. DVC (Control de versiones de datos) resuelve esto.

dvc init
dvc add data/training.csv
git add data/training.csv.dvc data/.gitignore
git commit -m "Track training data"
dvc push

DVC almacena los datos reales en un almacenamiento remoto (S3, GCS, Azure) y mantiene un pequeño archivo .dvc en git que registra el hash. Cuando revisas una confirmación de git, dvc checkout restaura los datos exactos que se usaron.

Esto significa que cada confirmación de git fija tanto el código como los datos. Reproducibilidad total.

Experimentos reproducibles

Un experimento reproducible requiere cuatro cosas:

  1. Semillas aleatorias fijas: Establezca semillas para numpy, random y el marco (antorcha, sklearn)
  2. Dependencias fijadas: requisitos.txt o poesía.lock con versiones exactas
  3. Datos versionados: DVC o similar
  4. Archivos de configuración: Todos los hiperparámetros en una configuración, no codificados
import numpy as np
import random

def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    try:
        import torch
        torch.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
    except ImportError:
        pass

Del portátil a la producción Pipeline

flowchart TD
    A[Jupyter Notebook] --> B[Extract functions]
    B --> C[Build Pipeline object]
    C --> D[Add config file for hyperparameters]
    D --> E[Add experiment tracking]
    E --> F[Add data validation]
    F --> G[Add tests]
    G --> H[Package for deployment]

    style A fill:#fdd,stroke:#333
    style H fill:#dfd,stroke:#333

La progresión típica:

  1. Exploración con cuaderno: Experimentos rápidos, visualizaciones e ideas de funciones.
  2. Extraer funciones: Mover el preprocesamiento, la ingeniería de características y la evaluación a módulos
  3. Compilación Pipeline: Transformaciones en cadena en un sklearn Pipeline o clase personalizada
  4. Gestión de configuración: Mueva todos los hiperparámetros a una configuración YAML/JSON
  5. Seguimiento de experimentos: Agregue registro de MLflow o wandb
  6. Validación de datos: Verifique el esquema, las distribuciones y los patrones de valores faltantes antes del entrenamiento.
  7. Pruebas: Pruebas unitarias para transformadores, pruebas de integración para el pipeline completo
  8. Implementación: Serializar la canalización, envolverla en un API (FastAPI, Flask), colocar en contenedores

Errores comunes Pipeline

Error Por qué es malo Arreglar
Ajustar los datos completos antes de dividirlos Fuga de datos Utilice Pipeline con cross_val_score
Ingeniería de características fuera de la tubería Diferentes transformaciones en tren vs servicio Coloque todas las transformaciones en Pipeline
No manejar categorías desconocidas Caída de la producción por nuevos valores OneHotEncoder(handle_unknown="ignorar")
Nombres de columnas codificados Se rompe cuando cambia el esquema Utilice listas de nombres de columnas de config
Sin validación de datos Predicciones silenciosamente erróneas sobre datos incorrectos Agregar comprobaciones de esquema antes de la predicción
Sesgo de entrenamiento/servicio El modelo ve diferentes características en prod Un objeto Pipeline para ambos

Constrúyelo

El código en code/pipeline.py crea una canalización de machine learning completa desde cero:

Paso 1: Transformador personalizado

class CustomTransformer:
    def __init__(self):
        self.means = None
        self.stds = None

    def fit(self, X):
        self.means = np.mean(X, axis=0)
        self.stds = np.std(X, axis=0)
        self.stds[self.stds == 0] = 1.0
        return self

    def transform(self, X):
        return (X - self.means) / self.stds

    def fit_transform(self, X):
        return self.fit(X).transform(X)

Paso 2: Pipeline desde cero

class PipelineFromScratch:
    def __init__(self, steps):
        self.steps = steps

    def fit(self, X, y=None):
        X_current = X.copy()
        for name, step in self.steps[:-1]:
            X_current = step.fit_transform(X_current)
        name, model = self.steps[-1]
        model.fit(X_current, y)
        return self

    def predict(self, X):
        X_current = X.copy()
        for name, step in self.steps[:-1]:
            X_current = step.transform(X_current)
        name, model = self.steps[-1]
        return model.predict(X_current)

Paso 3: Validación cruzada con Pipeline

El código demuestra cómo la validación cruzada con una canalización evita la fuga de datos: el escalador se ajusta por separado a los datos de entrenamiento de cada pliegue.

Paso 4: Producción completa Pipeline con sklearn

Una canalización completa con ColumnTransformer, múltiples rutas de preprocesamiento y un modelo, entrenado con validación cruzada y registro de experimentos adecuados.

Envíalo

Esta lección produce:

  • outputs/prompt-ml-pipeline.md: una habilidad para crear y depurar canalizaciones de ML
  • code/pipeline.py -- un proceso completo desde cero a través de sklearn

Ejercicios

  1. Cree una canalización que maneje un conjunto de datos con 3 columnas numéricas y 2 columnas categóricas. Utilice ColumnTransformer para aplicar imputación de mediana + escala a números y imputación más frecuente + codificación one-hot a categóricos. Entrene con validación cruzada quíntuple.

  2. Introduzca deliberadamente la fuga de datos: ajuste el escalador en el conjunto de datos completo antes de dividirlo. Compare la puntuación de validación cruzada (con fugas) con la puntuación de validación cruzada de la canalización (limpia). ¿Qué tan grande es la diferencia?

  3. Serializa tu canalización con joblib.dump. Cárguelo en un script separado y ejecute predicciones. Verifique que las predicciones sean idénticas.

  4. Agregue un transformador personalizado a la tubería que cree características polinómicas (grado 2) para las dos columnas numéricas más importantes. ¿A dónde debería ir en el proceso?

  5. Configure el seguimiento de MLflow para la canalización. Ejecute 5 experimentos con diferentes hiperparámetros. Utilice la interfaz de usuario de MLflow (mlflow ui) para comparar ejecuciones y elegir el mejor modelo.

Términos clave

Término Lo que dice la gente Lo que realmente significa
Pipeline "Cadena de transformaciones + modelo" Una secuencia ordenada de transformadores instalados y un modelo, aplicados como una sola unidad para evitar fugas
Fuga de datos "Se filtró inentrenamiento de la prueba en el entrenamiento" Utilizar inentrenamiento externa al conjunto de entrenamiento para construir el modelo, inflar las estimaciones de rendimiento
ColumnTransformer "Preprocesamiento diferente por columna" Aplica diferentes canalizaciones a diferentes subconjuntos de columnas, combinando resultados
Seguimiento de experimentos "Registrar tus carreras" Registro de parámetros, métricas, artefactos y versiones de código para cada ejecución de entrenamiento
flujo ml "Seguimiento e implementación de modelos" Plataforma de código abierto para seguimiento de experimentos, registro de modelos e implementación
DVC "Git para datos" Sistema de control de versiones para archivos de datos de gran tamaño, almacenamiento de hashes en git y datos en almacenamiento remoto
Registro de modelos "Catálogo de versiones de modelos" Un sistema que rastrea versiones de modelos con etiquetas de escenario (puesta en escena, producción, archivado)
Sesgo de entrenamiento/servicio "Funcionó en el cuaderno" Diferencias entre cómo se procesan los datos durante el entrenamiento versus la inferencia, lo que provoca errores silenciosos
Reproducibilidad "Mismo código, mismo resultado" La capacidad de obtener resultados idénticos con el mismo código, datos y configuración

Lectura adicional

0 lifetime access. Curriculum based on AI Engineering from Scratch by Rohit Ghumare (MIT, used under attribution).