Phase 04 - Lesson 16
Construye un pipeline de visión completo — Proyecto final
Un sistema de visión en producción es una cadena de modelos y reglas cosida con contratos de datos. Las piezas ya existen en esta fase; el proyecto final las conecta de extremo a extremo.
Tipo: Build Lenguajes: Python Requisitos previos: Fase 4, Lecciones 01-15 Tiempo: ~120 minutos
Objetivos de aprendizaje
- Diseñar un pipeline de visión en producción que detecte objetos, los clasifique y emita JSON estructurado — con todas las rutas de fallo gestionadas
- Conectar un detector (Mask R-CNN o YOLO), un clasificador (ConvNeXt-Tiny) y un contrato de datos (Pydantic) en un solo servicio
- Hacer benchmark del pipeline de extremo a extremo e identificar el primer cuello de botella (normalmente el preprocesamiento, luego el detector)
- Entregar un servicio FastAPI mínimo que acepte la carga de una imagen, ejecute el pipeline y devuelva detecciones con clasificaciones
El problema
Los modelos de visión individuales son útiles; los productos de visión son cadenas de ellos. Una auditoría de estantería de retail es un detector más un clasificador de productos más un pipeline de OCR de precios. La conducción autónoma es un detector 2D más un detector 3D más un segmentador más un rastreador más un planificador. Un pre-cribado médico es un segmentador más un clasificador de regiones más una interfaz para el clínico.
Conectar esas cadenas es la parte que separa un prototipo de ML de un producto. Cada interfaz entre modelos es un nuevo lugar para bugs. Cada transformación de coordenadas, cada normalización, cada redimensionamiento de máscara es una candidata a fallo silencioso. Un pipeline es tan fuerte como su interfaz más débil.
Este proyecto final monta el pipeline mínimo viable: detección + clasificación + salida estructurada + una capa de servicio. Todo lo demás de la Fase 4 encaja en este esqueleto: cambia Mask R-CNN por YOLOv8, agrega una cabeza de OCR, agrega una rama de segmentación, agrega un rastreador. La arquitectura es estable; las piezas son enchufables.
El concepto
El pipeline
flowchart LR
REQ["Solicitud HTTP<br/>+ bytes de la imagen"] --> LOAD["Decodificar<br/>+ preprocesar"]
LOAD --> DET["Detector<br/>(YOLO / Mask R-CNN)"]
DET --> CROP["Recortar + redimensionar<br/>cada detección"]
CROP --> CLS["Clasificador<br/>(ConvNeXt-Tiny)"]
CLS --> AGG["Agregar<br/>detecciones + clases"]
AGG --> SCHEMA["Validación<br/>Pydantic"]
SCHEMA --> RESP["Respuesta JSON"]
REQ -.->|error| RESP
style DET fill:#fef3c7,stroke:#d97706
style CLS fill:#dbeafe,stroke:#2563eb
style SCHEMA fill:#dcfce7,stroke:#16a34a
Siete etapas. Las dos etapas de modelo son caras; las otras cinco etapas son donde viven los bugs.
Contratos de datos con Pydantic
Cada frontera de modelo se convierte en un objeto tipado. Esto transforma los fallos silenciosos en fallos ruidosos.
Detection(
box: tuple[float, float, float, float], # (x1, y1, x2, y2), absolute pixels
score: float, # [0, 1]
class_id: int, # from detector's label map
mask: Optional[list[list[int]]], # RLE-encoded if present
)
PipelineResult(
image_id: str,
detections: list[Detection],
classifications: list[Classification],
inference_ms: float,
)
Cuando un detector devuelve cajas en (cx, cy, w, h) en lugar de (x1, y1, x2, y2), la validación de Pydantic falla en la frontera y te enteras de inmediato, en vez de depurar un recorte downstream que silenciosamente devuelve regiones vacías.
A dónde se va la latencia
Tres verdades se cumplen en casi todo pipeline de visión:
- El preprocesamiento suele ser el bloque individual más grande. Decodificar JPEGs, convertir espacios de color, redimensionar — estas operaciones están limitadas por la CPU y son fáciles de olvidar.
- El detector domina el tiempo de GPU. El 70-90% del tiempo de GPU está en el forward pass de detección.
- El posprocesamiento (NMS, codificación/decodificación RLE) es barato en GPU, caro en CPU. Haz siempre el profiling con el objetivo real.
Conocer la distribución es lo que convierte la optimización en una lista priorizada.
Modos de fallo
- Detecciones vacías — devuelve una lista vacía, no falles. Registra en el log.
- Cajas fuera de los límites — recórtalas al tamaño de la imagen antes de hacer el crop.
- Recortes diminutos — omite la clasificación para cajas más pequeñas que la entrada mínima del clasificador.
- Carga corrupta — respuesta 400 con un código de error específico, no 500.
- Fallo al cargar el modelo — falla en el arranque del servicio, no en la primera solicitud.
Un pipeline de producción gestiona cada uno de estos casos sin escribir un try/except genérico que oculte el fallo. Cada fallo recibe un código con nombre y una respuesta.
Batching
Un servicio de producción atiende a múltiples clientes. Agrupar detecciones y clasificaciones entre solicitudes multiplica el throughput. El trade-off: latencia extra por esperar a que un lote se llene. Configuración típica: recolectar solicitudes hasta por 20ms, agruparlas, procesar, distribuir las respuestas. torchserve y triton lo hacen de forma nativa; los servicios pequeños con carga predecible implementan su propio micro-batcher.
Constrúyelo
Paso 1: Contratos de datos
from pydantic import BaseModel, Field
from typing import List, Optional, Tuple
class Detection(BaseModel):
box: Tuple[float, float, float, float]
score: float = Field(ge=0, le=1)
class_id: int = Field(ge=0)
mask_rle: Optional[str] = None
class Classification(BaseModel):
detection_index: int
class_id: int
class_name: str
score: float = Field(ge=0, le=1)
class PipelineResult(BaseModel):
image_id: str
detections: List[Detection]
classifications: List[Classification]
inference_ms: float
Cinco segundos de código ahorran una hora de depuración en cualquier pipeline serio.
Paso 2: Una clase Pipeline mínima
import time
import numpy as np
import torch
from PIL import Image
class VisionPipeline:
def __init__(self, detector, classifier, class_names,
device="cpu", min_crop=32):
self.detector = detector.to(device).eval()
self.classifier = classifier.to(device).eval()
self.class_names = class_names
self.device = device
self.min_crop = min_crop
def preprocess(self, image):
"""
image: PIL.Image or np.ndarray (H, W, 3) uint8
returns: CHW float tensor on device
"""
if isinstance(image, Image.Image):
image = np.asarray(image.convert("RGB"))
tensor = torch.from_numpy(image).permute(2, 0, 1).float() / 255.0
return tensor.to(self.device)
@torch.no_grad()
def detect(self, image_tensor):
return self.detector([image_tensor])[0]
@torch.no_grad()
def classify(self, crops):
if len(crops) == 0:
return []
batch = torch.stack(crops).to(self.device)
logits = self.classifier(batch)
probs = logits.softmax(-1)
scores, cls = probs.max(-1)
return list(zip(cls.tolist(), scores.tolist()))
def run(self, image, image_id="anonymous"):
t0 = time.perf_counter()
tensor = self.preprocess(image)
det = self.detect(tensor)
crops = []
detections = []
valid_indices = []
for i, (box, score, cls) in enumerate(zip(det["boxes"], det["scores"], det["labels"])):
x1, y1, x2, y2 = [max(0, int(b)) for b in box.tolist()]
x2 = min(x2, tensor.shape[-1])
y2 = min(y2, tensor.shape[-2])
detections.append(Detection(
box=(x1, y1, x2, y2),
score=float(score),
class_id=int(cls),
))
if (x2 - x1) < self.min_crop or (y2 - y1) < self.min_crop:
continue
crop = tensor[:, y1:y2, x1:x2]
crop = torch.nn.functional.interpolate(
crop.unsqueeze(0),
size=(224, 224),
mode="bilinear",
align_corners=False,
)[0]
crops.append(crop)
valid_indices.append(i)
class_preds = self.classify(crops)
classifications = []
for valid_idx, (cls_id, cls_score) in zip(valid_indices, class_preds):
classifications.append(Classification(
detection_index=valid_idx,
class_id=int(cls_id),
class_name=self.class_names[cls_id],
score=float(cls_score),
))
return PipelineResult(
image_id=image_id,
detections=detections,
classifications=classifications,
inference_ms=(time.perf_counter() - t0) * 1000,
)
Cada interfaz es tipada. Cada ruta de fallo tiene una decisión de gestión específica.
Paso 3: Conecta un detector y un clasificador
from torchvision.models.detection import maskrcnn_resnet50_fpn_v2
from torchvision.models import convnext_tiny
# Use ImageNet-pretrained weights for a realistic pipeline without training
detector = maskrcnn_resnet50_fpn_v2(weights="DEFAULT")
classifier = convnext_tiny(weights="DEFAULT")
class_names = [f"imagenet_class_{i}" for i in range(1000)]
pipe = VisionPipeline(detector, classifier, class_names)
# Smoke test with a synthetic image
test_image = (np.random.rand(400, 600, 3) * 255).astype(np.uint8)
result = pipe.run(test_image, image_id="demo")
print(result.model_dump_json(indent=2)[:500])
Paso 4: Servicio FastAPI
from fastapi import FastAPI, UploadFile, HTTPException
from io import BytesIO
app = FastAPI()
pipe = None # initialised on startup
@app.on_event("startup")
def load():
global pipe
detector = maskrcnn_resnet50_fpn_v2(weights="DEFAULT").eval()
classifier = convnext_tiny(weights="DEFAULT").eval()
pipe = VisionPipeline(detector, classifier, class_names=[f"c{i}" for i in range(1000)])
@app.post("/detect")
async def detect_endpoint(file: UploadFile):
if file.content_type not in {"image/jpeg", "image/png", "image/webp"}:
raise HTTPException(status_code=400, detail="unsupported image type")
data = await file.read()
try:
img = Image.open(BytesIO(data)).convert("RGB")
except Exception:
raise HTTPException(status_code=400, detail="cannot decode image")
result = pipe.run(img, image_id=file.filename or "upload")
return result.model_dump()
Ejecútalo con uvicorn main:app --host 0.0.0.0 --port 8000. Pruébalo con curl -F 'file=@dog.jpg' http://localhost:8000/detect.
Paso 5: Haz benchmark del pipeline
import time
def benchmark(pipe, num_runs=20, image_size=(400, 600)):
img = (np.random.rand(*image_size, 3) * 255).astype(np.uint8)
pipe.run(img) # warm up
stages = {"preprocess": [], "detect": [], "classify": [], "total": []}
for _ in range(num_runs):
t0 = time.perf_counter()
tensor = pipe.preprocess(img)
t1 = time.perf_counter()
det = pipe.detect(tensor)
t2 = time.perf_counter()
crops = []
for box in det["boxes"]:
x1, y1, x2, y2 = [max(0, int(b)) for b in box.tolist()]
x2 = min(x2, tensor.shape[-1])
y2 = min(y2, tensor.shape[-2])
if (x2 - x1) >= pipe.min_crop and (y2 - y1) >= pipe.min_crop:
crop = tensor[:, y1:y2, x1:x2]
crop = torch.nn.functional.interpolate(
crop.unsqueeze(0), size=(224, 224), mode="bilinear", align_corners=False
)[0]
crops.append(crop)
pipe.classify(crops)
t3 = time.perf_counter()
stages["preprocess"].append((t1 - t0) * 1000)
stages["detect"].append((t2 - t1) * 1000)
stages["classify"].append((t3 - t2) * 1000)
stages["total"].append((t3 - t0) * 1000)
for stage, times in stages.items():
times.sort()
print(f"{stage:12s} p50={times[len(times)//2]:7.1f} ms p95={times[int(len(times)*0.95)]:7.1f} ms")
Salida típica en CPU: preprocess ~3 ms, detect 300-500 ms, classify 20-40 ms, total 350-550 ms. En GPU, detect queda en 20-40 ms y el preprocess + classify pasan a importar más en términos relativos.
Úsalo
Las plantillas de producción convergen a la misma estructura, más:
- Versionado de modelos — registra siempre en el log el nombre del modelo y el hash de los pesos en la respuesta.
- IDs de trace por solicitud — registra en el log el tiempo de cada etapa para cada solicitud, para que puedas correlacionar respuestas lentas con etapas.
- Ruta de fallback — si el clasificador supera el tiempo límite, devuelve las detecciones sin clasificaciones en lugar de fallar la solicitud entera.
- Filtros de seguridad — los filtros de NSFW / PII se ejecutan tras la clasificación, antes de que la respuesta salga del servicio.
- Endpoint por lotes — un
/detect_batchque acepta una lista de URLs de imágenes para procesamiento masivo.
Para servir en producción, torchserve, Triton Inference Server y BentoML gestionan el batching, el versionado, las métricas y los health checks de fábrica. Ejecutar FastAPI directamente es adecuado para prototipos y productos de pequeña escala.
Entrégalo
Esta lección produce:
outputs/prompt-vision-service-shape-reviewer.md— un prompt que revisa el código de un servicio de visión en busca de violaciones de contrato/formato de respuesta y nombra el primer bug que rompe el sistema.outputs/skill-pipeline-budget-planner.md— una skill que, dadas una latencia y un throughput objetivo, asigna un presupuesto de tiempo a cada etapa del pipeline y señala qué etapa superará su presupuesto primero.
Ejercicios
- (Fácil) Ejecuta el pipeline en 10 imágenes de cualquier dataset abierto. Reporta el tiempo promedio por etapa y la distribución del conteo de detecciones por imagen.
- (Medio) Agrega un campo de salida de máscara a
Detectiony codifícalo como RLE. Verifica que el JSON permanezca por debajo de 1MB incluso para una imagen con 10 objetos. - (Difícil) Agrega un micro-batcher delante del clasificador: recolecta recortes hasta por 10 ms, clasifícalos todos en una sola llamada de GPU, devuelve los resultados por solicitud. Mide la ganancia de throughput a 5 solicitudes concurrentes por segundo y la latencia añadida.
Términos clave
| Término | Lo que dice la gente | Lo que realmente significa |
|---|---|---|
| Pipeline | "El sistema" | Una cadena ordenada de pasos de preprocesamiento, inferencia y posprocesamiento con una interfaz tipada entre cada par |
| Contrato de datos | "El schema" | Definiciones Pydantic / dataclass a las que se conforman la entrada y la salida de cada etapa; captura bugs de integración en la frontera |
| Preprocesamiento | "Antes del modelo" | Decodificación, conversión de color, redimensionamiento, normalización; normalmente el mayor consumidor de tiempo de CPU |
| Posprocesamiento | "Después del modelo" | NMS, redimensionamiento de máscara, threshold, codificación RLE; barato en GPU, caro en CPU |
| Microbatcher | "Recolectar y luego reenviar" | Agregador que espera una ventana fija para múltiples solicitudes y ejecuta un único forward pass por lotes |
| Trace ID | "Id de solicitud" | Identificador por solicitud registrado en cada etapa, para que las solicitudes lentas puedan rastrearse de extremo a extremo |
| Código de fallo | "Error con nombre" | Código de error específico por clase de fallo en vez de un 500 genérico; habilita la lógica de retry del cliente |
| Health check | "Sonda de disponibilidad" | Endpoint barato que reporta si el servicio puede responder; los balanceadores de carga dependen de esto |
Lectura adicional
- Full Stack Deep Learning — Deploying Models — la visión general canónica del despliegue de ML en producción
- BentoML docs — framework de serving con batching, versionado y métricas
- torchserve docs — la biblioteca oficial de serving de PyTorch
- NVIDIA Triton Inference Server — serving de alto throughput con batching y soporte para múltiples modelos