Phase 04 - Lesson 16

Construye un pipeline de visión completo — Proyecto final

Un sistema de visión en producción es una cadena de modelos y reglas cosida con contratos de datos. Las piezas ya existen en esta fase; el proyecto final las conecta de extremo a extremo.

Tipo: Build Lenguajes: Python Requisitos previos: Fase 4, Lecciones 01-15 Tiempo: ~120 minutos

Objetivos de aprendizaje

  • Diseñar un pipeline de visión en producción que detecte objetos, los clasifique y emita JSON estructurado — con todas las rutas de fallo gestionadas
  • Conectar un detector (Mask R-CNN o YOLO), un clasificador (ConvNeXt-Tiny) y un contrato de datos (Pydantic) en un solo servicio
  • Hacer benchmark del pipeline de extremo a extremo e identificar el primer cuello de botella (normalmente el preprocesamiento, luego el detector)
  • Entregar un servicio FastAPI mínimo que acepte la carga de una imagen, ejecute el pipeline y devuelva detecciones con clasificaciones

El problema

Los modelos de visión individuales son útiles; los productos de visión son cadenas de ellos. Una auditoría de estantería de retail es un detector más un clasificador de productos más un pipeline de OCR de precios. La conducción autónoma es un detector 2D más un detector 3D más un segmentador más un rastreador más un planificador. Un pre-cribado médico es un segmentador más un clasificador de regiones más una interfaz para el clínico.

Conectar esas cadenas es la parte que separa un prototipo de ML de un producto. Cada interfaz entre modelos es un nuevo lugar para bugs. Cada transformación de coordenadas, cada normalización, cada redimensionamiento de máscara es una candidata a fallo silencioso. Un pipeline es tan fuerte como su interfaz más débil.

Este proyecto final monta el pipeline mínimo viable: detección + clasificación + salida estructurada + una capa de servicio. Todo lo demás de la Fase 4 encaja en este esqueleto: cambia Mask R-CNN por YOLOv8, agrega una cabeza de OCR, agrega una rama de segmentación, agrega un rastreador. La arquitectura es estable; las piezas son enchufables.

El concepto

El pipeline

flowchart LR
    REQ["Solicitud HTTP<br/>+ bytes de la imagen"] --> LOAD["Decodificar<br/>+ preprocesar"]
    LOAD --> DET["Detector<br/>(YOLO / Mask R-CNN)"]
    DET --> CROP["Recortar + redimensionar<br/>cada detección"]
    CROP --> CLS["Clasificador<br/>(ConvNeXt-Tiny)"]
    CLS --> AGG["Agregar<br/>detecciones + clases"]
    AGG --> SCHEMA["Validación<br/>Pydantic"]
    SCHEMA --> RESP["Respuesta JSON"]

    REQ -.->|error| RESP

    style DET fill:#fef3c7,stroke:#d97706
    style CLS fill:#dbeafe,stroke:#2563eb
    style SCHEMA fill:#dcfce7,stroke:#16a34a

Siete etapas. Las dos etapas de modelo son caras; las otras cinco etapas son donde viven los bugs.

Contratos de datos con Pydantic

Cada frontera de modelo se convierte en un objeto tipado. Esto transforma los fallos silenciosos en fallos ruidosos.

Detection(
    box: tuple[float, float, float, float],   # (x1, y1, x2, y2), absolute pixels
    score: float,                              # [0, 1]
    class_id: int,                             # from detector's label map
    mask: Optional[list[list[int]]],           # RLE-encoded if present
)

PipelineResult(
    image_id: str,
    detections: list[Detection],
    classifications: list[Classification],
    inference_ms: float,
)

Cuando un detector devuelve cajas en (cx, cy, w, h) en lugar de (x1, y1, x2, y2), la validación de Pydantic falla en la frontera y te enteras de inmediato, en vez de depurar un recorte downstream que silenciosamente devuelve regiones vacías.

A dónde se va la latencia

Tres verdades se cumplen en casi todo pipeline de visión:

  1. El preprocesamiento suele ser el bloque individual más grande. Decodificar JPEGs, convertir espacios de color, redimensionar — estas operaciones están limitadas por la CPU y son fáciles de olvidar.
  2. El detector domina el tiempo de GPU. El 70-90% del tiempo de GPU está en el forward pass de detección.
  3. El posprocesamiento (NMS, codificación/decodificación RLE) es barato en GPU, caro en CPU. Haz siempre el profiling con el objetivo real.

Conocer la distribución es lo que convierte la optimización en una lista priorizada.

Modos de fallo

  • Detecciones vacías — devuelve una lista vacía, no falles. Registra en el log.
  • Cajas fuera de los límites — recórtalas al tamaño de la imagen antes de hacer el crop.
  • Recortes diminutos — omite la clasificación para cajas más pequeñas que la entrada mínima del clasificador.
  • Carga corrupta — respuesta 400 con un código de error específico, no 500.
  • Fallo al cargar el modelo — falla en el arranque del servicio, no en la primera solicitud.

Un pipeline de producción gestiona cada uno de estos casos sin escribir un try/except genérico que oculte el fallo. Cada fallo recibe un código con nombre y una respuesta.

Batching

Un servicio de producción atiende a múltiples clientes. Agrupar detecciones y clasificaciones entre solicitudes multiplica el throughput. El trade-off: latencia extra por esperar a que un lote se llene. Configuración típica: recolectar solicitudes hasta por 20ms, agruparlas, procesar, distribuir las respuestas. torchserve y triton lo hacen de forma nativa; los servicios pequeños con carga predecible implementan su propio micro-batcher.

Constrúyelo

Paso 1: Contratos de datos

from pydantic import BaseModel, Field
from typing import List, Optional, Tuple

class Detection(BaseModel):
    box: Tuple[float, float, float, float]
    score: float = Field(ge=0, le=1)
    class_id: int = Field(ge=0)
    mask_rle: Optional[str] = None


class Classification(BaseModel):
    detection_index: int
    class_id: int
    class_name: str
    score: float = Field(ge=0, le=1)


class PipelineResult(BaseModel):
    image_id: str
    detections: List[Detection]
    classifications: List[Classification]
    inference_ms: float

Cinco segundos de código ahorran una hora de depuración en cualquier pipeline serio.

Paso 2: Una clase Pipeline mínima

import time
import numpy as np
import torch
from PIL import Image

class VisionPipeline:
    def __init__(self, detector, classifier, class_names,
                 device="cpu", min_crop=32):
        self.detector = detector.to(device).eval()
        self.classifier = classifier.to(device).eval()
        self.class_names = class_names
        self.device = device
        self.min_crop = min_crop

    def preprocess(self, image):
        """
        image: PIL.Image or np.ndarray (H, W, 3) uint8
        returns: CHW float tensor on device
        """
        if isinstance(image, Image.Image):
            image = np.asarray(image.convert("RGB"))
        tensor = torch.from_numpy(image).permute(2, 0, 1).float() / 255.0
        return tensor.to(self.device)

    @torch.no_grad()
    def detect(self, image_tensor):
        return self.detector([image_tensor])[0]

    @torch.no_grad()
    def classify(self, crops):
        if len(crops) == 0:
            return []
        batch = torch.stack(crops).to(self.device)
        logits = self.classifier(batch)
        probs = logits.softmax(-1)
        scores, cls = probs.max(-1)
        return list(zip(cls.tolist(), scores.tolist()))

    def run(self, image, image_id="anonymous"):
        t0 = time.perf_counter()
        tensor = self.preprocess(image)
        det = self.detect(tensor)

        crops = []
        detections = []
        valid_indices = []
        for i, (box, score, cls) in enumerate(zip(det["boxes"], det["scores"], det["labels"])):
            x1, y1, x2, y2 = [max(0, int(b)) for b in box.tolist()]
            x2 = min(x2, tensor.shape[-1])
            y2 = min(y2, tensor.shape[-2])
            detections.append(Detection(
                box=(x1, y1, x2, y2),
                score=float(score),
                class_id=int(cls),
            ))
            if (x2 - x1) < self.min_crop or (y2 - y1) < self.min_crop:
                continue
            crop = tensor[:, y1:y2, x1:x2]
            crop = torch.nn.functional.interpolate(
                crop.unsqueeze(0),
                size=(224, 224),
                mode="bilinear",
                align_corners=False,
            )[0]
            crops.append(crop)
            valid_indices.append(i)

        class_preds = self.classify(crops)

        classifications = []
        for valid_idx, (cls_id, cls_score) in zip(valid_indices, class_preds):
            classifications.append(Classification(
                detection_index=valid_idx,
                class_id=int(cls_id),
                class_name=self.class_names[cls_id],
                score=float(cls_score),
            ))

        return PipelineResult(
            image_id=image_id,
            detections=detections,
            classifications=classifications,
            inference_ms=(time.perf_counter() - t0) * 1000,
        )

Cada interfaz es tipada. Cada ruta de fallo tiene una decisión de gestión específica.

Paso 3: Conecta un detector y un clasificador

from torchvision.models.detection import maskrcnn_resnet50_fpn_v2
from torchvision.models import convnext_tiny

# Use ImageNet-pretrained weights for a realistic pipeline without training
detector = maskrcnn_resnet50_fpn_v2(weights="DEFAULT")
classifier = convnext_tiny(weights="DEFAULT")
class_names = [f"imagenet_class_{i}" for i in range(1000)]

pipe = VisionPipeline(detector, classifier, class_names)

# Smoke test with a synthetic image
test_image = (np.random.rand(400, 600, 3) * 255).astype(np.uint8)
result = pipe.run(test_image, image_id="demo")
print(result.model_dump_json(indent=2)[:500])

Paso 4: Servicio FastAPI

from fastapi import FastAPI, UploadFile, HTTPException
from io import BytesIO

app = FastAPI()
pipe = None  # initialised on startup

@app.on_event("startup")
def load():
    global pipe
    detector = maskrcnn_resnet50_fpn_v2(weights="DEFAULT").eval()
    classifier = convnext_tiny(weights="DEFAULT").eval()
    pipe = VisionPipeline(detector, classifier, class_names=[f"c{i}" for i in range(1000)])

@app.post("/detect")
async def detect_endpoint(file: UploadFile):
    if file.content_type not in {"image/jpeg", "image/png", "image/webp"}:
        raise HTTPException(status_code=400, detail="unsupported image type")
    data = await file.read()
    try:
        img = Image.open(BytesIO(data)).convert("RGB")
    except Exception:
        raise HTTPException(status_code=400, detail="cannot decode image")
    result = pipe.run(img, image_id=file.filename or "upload")
    return result.model_dump()

Ejecútalo con uvicorn main:app --host 0.0.0.0 --port 8000. Pruébalo con curl -F 'file=@dog.jpg' http://localhost:8000/detect.

Paso 5: Haz benchmark del pipeline

import time

def benchmark(pipe, num_runs=20, image_size=(400, 600)):
    img = (np.random.rand(*image_size, 3) * 255).astype(np.uint8)
    pipe.run(img)  # warm up

    stages = {"preprocess": [], "detect": [], "classify": [], "total": []}
    for _ in range(num_runs):
        t0 = time.perf_counter()
        tensor = pipe.preprocess(img)
        t1 = time.perf_counter()
        det = pipe.detect(tensor)
        t2 = time.perf_counter()
        crops = []
        for box in det["boxes"]:
            x1, y1, x2, y2 = [max(0, int(b)) for b in box.tolist()]
            x2 = min(x2, tensor.shape[-1])
            y2 = min(y2, tensor.shape[-2])
            if (x2 - x1) >= pipe.min_crop and (y2 - y1) >= pipe.min_crop:
                crop = tensor[:, y1:y2, x1:x2]
                crop = torch.nn.functional.interpolate(
                    crop.unsqueeze(0), size=(224, 224), mode="bilinear", align_corners=False
                )[0]
                crops.append(crop)
        pipe.classify(crops)
        t3 = time.perf_counter()
        stages["preprocess"].append((t1 - t0) * 1000)
        stages["detect"].append((t2 - t1) * 1000)
        stages["classify"].append((t3 - t2) * 1000)
        stages["total"].append((t3 - t0) * 1000)

    for stage, times in stages.items():
        times.sort()
        print(f"{stage:12s}  p50={times[len(times)//2]:7.1f} ms  p95={times[int(len(times)*0.95)]:7.1f} ms")

Salida típica en CPU: preprocess ~3 ms, detect 300-500 ms, classify 20-40 ms, total 350-550 ms. En GPU, detect queda en 20-40 ms y el preprocess + classify pasan a importar más en términos relativos.

Úsalo

Las plantillas de producción convergen a la misma estructura, más:

  • Versionado de modelos — registra siempre en el log el nombre del modelo y el hash de los pesos en la respuesta.
  • IDs de trace por solicitud — registra en el log el tiempo de cada etapa para cada solicitud, para que puedas correlacionar respuestas lentas con etapas.
  • Ruta de fallback — si el clasificador supera el tiempo límite, devuelve las detecciones sin clasificaciones en lugar de fallar la solicitud entera.
  • Filtros de seguridad — los filtros de NSFW / PII se ejecutan tras la clasificación, antes de que la respuesta salga del servicio.
  • Endpoint por lotes — un /detect_batch que acepta una lista de URLs de imágenes para procesamiento masivo.

Para servir en producción, torchserve, Triton Inference Server y BentoML gestionan el batching, el versionado, las métricas y los health checks de fábrica. Ejecutar FastAPI directamente es adecuado para prototipos y productos de pequeña escala.

Entrégalo

Esta lección produce:

  • outputs/prompt-vision-service-shape-reviewer.md — un prompt que revisa el código de un servicio de visión en busca de violaciones de contrato/formato de respuesta y nombra el primer bug que rompe el sistema.
  • outputs/skill-pipeline-budget-planner.md — una skill que, dadas una latencia y un throughput objetivo, asigna un presupuesto de tiempo a cada etapa del pipeline y señala qué etapa superará su presupuesto primero.

Ejercicios

  1. (Fácil) Ejecuta el pipeline en 10 imágenes de cualquier dataset abierto. Reporta el tiempo promedio por etapa y la distribución del conteo de detecciones por imagen.
  2. (Medio) Agrega un campo de salida de máscara a Detection y codifícalo como RLE. Verifica que el JSON permanezca por debajo de 1MB incluso para una imagen con 10 objetos.
  3. (Difícil) Agrega un micro-batcher delante del clasificador: recolecta recortes hasta por 10 ms, clasifícalos todos en una sola llamada de GPU, devuelve los resultados por solicitud. Mide la ganancia de throughput a 5 solicitudes concurrentes por segundo y la latencia añadida.

Términos clave

Término Lo que dice la gente Lo que realmente significa
Pipeline "El sistema" Una cadena ordenada de pasos de preprocesamiento, inferencia y posprocesamiento con una interfaz tipada entre cada par
Contrato de datos "El schema" Definiciones Pydantic / dataclass a las que se conforman la entrada y la salida de cada etapa; captura bugs de integración en la frontera
Preprocesamiento "Antes del modelo" Decodificación, conversión de color, redimensionamiento, normalización; normalmente el mayor consumidor de tiempo de CPU
Posprocesamiento "Después del modelo" NMS, redimensionamiento de máscara, threshold, codificación RLE; barato en GPU, caro en CPU
Microbatcher "Recolectar y luego reenviar" Agregador que espera una ventana fija para múltiples solicitudes y ejecuta un único forward pass por lotes
Trace ID "Id de solicitud" Identificador por solicitud registrado en cada etapa, para que las solicitudes lentas puedan rastrearse de extremo a extremo
Código de fallo "Error con nombre" Código de error específico por clase de fallo en vez de un 500 genérico; habilita la lógica de retry del cliente
Health check "Sonda de disponibilidad" Endpoint barato que reporta si el servicio puede responder; los balanceadores de carga dependen de esto

Lectura adicional

0 lifetime access. Curriculum based on AI Engineering from Scratch by Rohit Ghumare (MIT, used under attribution).