Phase 04 - Lesson 16
Construa um pipeline de visão completo — Projeto final
Um sistema de visão em produção é uma cadeia de modelos e regras costurada por contratos de dados. As peças já existem nesta fase; o projeto final as conecta de ponta a ponta.
Tipo: Build Linguagens: Python Pré-requisitos: Fase 4, Lições 01-15 Tempo: ~120 minutos
Objetivos de aprendizagem
- Projetar um pipeline de visão em produção que detecta objetos, os classifica e emite JSON estruturado — com todos os caminhos de falha tratados
- Conectar um detector (Mask R-CNN ou YOLO), um classificador (ConvNeXt-Tiny) e um contrato de dados (Pydantic) em um único serviço
- Fazer benchmark do pipeline de ponta a ponta e identificar o primeiro gargalo (geralmente o pré-processamento, depois o detector)
- Entregar um serviço FastAPI mínimo que aceita o upload de uma imagem, executa o pipeline e retorna detecções com classificações
O problema
Modelos de visão individuais são úteis; produtos de visão são cadeias deles. Uma auditoria de prateleira de varejo é um detector mais um classificador de produtos mais um pipeline de OCR de preços. A direção autônoma é um detector 2D mais um detector 3D mais um segmentador mais um rastreador mais um planejador. Uma pré-triagem médica é um segmentador mais um classificador de regiões mais uma interface para o clínico.
Conectar essas cadeias é a parte que separa um protótipo de ML de um produto. Toda interface entre modelos é um novo lugar para bugs. Toda transformação de coordenadas, toda normalização, todo redimensionamento de máscara é uma candidata a falha silenciosa. Um pipeline é tão forte quanto sua interface mais fraca.
Este projeto final monta o pipeline mínimo viável: detecção + classificação + saída estruturada + uma camada de serviço. Todo o resto da Fase 4 encaixa nesse esqueleto: troque o Mask R-CNN por YOLOv8, adicione uma cabeça de OCR, adicione um ramo de segmentação, adicione um rastreador. A arquitetura é estável; as peças são plugáveis.
O conceito
O pipeline
flowchart LR
REQ["Requisição HTTP<br/>+ bytes da imagem"] --> LOAD["Decodificar<br/>+ pré-processar"]
LOAD --> DET["Detector<br/>(YOLO / Mask R-CNN)"]
DET --> CROP["Recortar + redimensionar<br/>cada detecção"]
CROP --> CLS["Classificador<br/>(ConvNeXt-Tiny)"]
CLS --> AGG["Agregar<br/>detecções + classes"]
AGG --> SCHEMA["Validação<br/>Pydantic"]
SCHEMA --> RESP["Resposta JSON"]
REQ -.->|erro| RESP
style DET fill:#fef3c7,stroke:#d97706
style CLS fill:#dbeafe,stroke:#2563eb
style SCHEMA fill:#dcfce7,stroke:#16a34a
Sete estágios. Os dois estágios de modelo são caros; os outros cinco estágios são onde os bugs vivem.
Contratos de dados com Pydantic
Toda fronteira de modelo se torna um objeto tipado. Isso transforma falhas silenciosas em falhas barulhentas.
Detection(
box: tuple[float, float, float, float], # (x1, y1, x2, y2), absolute pixels
score: float, # [0, 1]
class_id: int, # from detector's label map
mask: Optional[list[list[int]]], # RLE-encoded if present
)
PipelineResult(
image_id: str,
detections: list[Detection],
classifications: list[Classification],
inference_ms: float,
)
Quando um detector retorna caixas em (cx, cy, w, h) em vez de (x1, y1, x2, y2), a validação do Pydantic falha na fronteira e você descobre imediatamente, em vez de depurar um recorte downstream que silenciosamente retorna regiões vazias.
Para onde vai a latência
Três verdades valem em quase todo pipeline de visão:
- O pré-processamento costuma ser o maior bloco isolado. Decodificar JPEGs, converter espaços de cor, redimensionar — essas operações são limitadas por CPU e fáceis de esquecer.
- O detector domina o tempo de GPU. 70-90% do tempo de GPU está no forward pass de detecção.
- O pós-processamento (NMS, codificação/decodificação RLE) é barato na GPU, caro na CPU. Sempre faça o profiling com o alvo real.
Conhecer a distribuição é o que transforma a otimização em uma lista priorizada.
Modos de falha
- Detecções vazias — retorne uma lista vazia, não quebre. Registre no log.
- Caixas fora dos limites — limite ao tamanho da imagem antes de recortar.
- Recortes minúsculos — pule a classificação para caixas menores que a entrada mínima do classificador.
- Upload corrompido — resposta 400 com um código de erro específico, não 500.
- Falha ao carregar o modelo — falhe na inicialização do serviço, não na primeira requisição.
Um pipeline de produção trata cada um desses casos sem escrever um try/except genérico que esconde a falha. Toda falha recebe um código nomeado e uma resposta.
Batching
Um serviço de produção atende múltiplos clientes. Agrupar detecções e classificações entre requisições multiplica o throughput. O trade-off: latência extra por esperar um lote encher. Configuração típica: coletar requisições por até 20ms, agrupá-las, processar, distribuir as respostas. torchserve e triton fazem isso nativamente; serviços pequenos com carga previsível implementam o próprio micro-batcher.
Construa
Passo 1: Contratos de dados
from pydantic import BaseModel, Field
from typing import List, Optional, Tuple
class Detection(BaseModel):
box: Tuple[float, float, float, float]
score: float = Field(ge=0, le=1)
class_id: int = Field(ge=0)
mask_rle: Optional[str] = None
class Classification(BaseModel):
detection_index: int
class_id: int
class_name: str
score: float = Field(ge=0, le=1)
class PipelineResult(BaseModel):
image_id: str
detections: List[Detection]
classifications: List[Classification]
inference_ms: float
Cinco segundos de código economizam uma hora de depuração em qualquer pipeline sério.
Passo 2: Uma classe Pipeline mínima
import time
import numpy as np
import torch
from PIL import Image
class VisionPipeline:
def __init__(self, detector, classifier, class_names,
device="cpu", min_crop=32):
self.detector = detector.to(device).eval()
self.classifier = classifier.to(device).eval()
self.class_names = class_names
self.device = device
self.min_crop = min_crop
def preprocess(self, image):
"""
image: PIL.Image or np.ndarray (H, W, 3) uint8
returns: CHW float tensor on device
"""
if isinstance(image, Image.Image):
image = np.asarray(image.convert("RGB"))
tensor = torch.from_numpy(image).permute(2, 0, 1).float() / 255.0
return tensor.to(self.device)
@torch.no_grad()
def detect(self, image_tensor):
return self.detector([image_tensor])[0]
@torch.no_grad()
def classify(self, crops):
if len(crops) == 0:
return []
batch = torch.stack(crops).to(self.device)
logits = self.classifier(batch)
probs = logits.softmax(-1)
scores, cls = probs.max(-1)
return list(zip(cls.tolist(), scores.tolist()))
def run(self, image, image_id="anonymous"):
t0 = time.perf_counter()
tensor = self.preprocess(image)
det = self.detect(tensor)
crops = []
detections = []
valid_indices = []
for i, (box, score, cls) in enumerate(zip(det["boxes"], det["scores"], det["labels"])):
x1, y1, x2, y2 = [max(0, int(b)) for b in box.tolist()]
x2 = min(x2, tensor.shape[-1])
y2 = min(y2, tensor.shape[-2])
detections.append(Detection(
box=(x1, y1, x2, y2),
score=float(score),
class_id=int(cls),
))
if (x2 - x1) < self.min_crop or (y2 - y1) < self.min_crop:
continue
crop = tensor[:, y1:y2, x1:x2]
crop = torch.nn.functional.interpolate(
crop.unsqueeze(0),
size=(224, 224),
mode="bilinear",
align_corners=False,
)[0]
crops.append(crop)
valid_indices.append(i)
class_preds = self.classify(crops)
classifications = []
for valid_idx, (cls_id, cls_score) in zip(valid_indices, class_preds):
classifications.append(Classification(
detection_index=valid_idx,
class_id=int(cls_id),
class_name=self.class_names[cls_id],
score=float(cls_score),
))
return PipelineResult(
image_id=image_id,
detections=detections,
classifications=classifications,
inference_ms=(time.perf_counter() - t0) * 1000,
)
Toda interface é tipada. Todo caminho de falha tem uma decisão de tratamento específica.
Passo 3: Conecte um detector e um classificador
from torchvision.models.detection import maskrcnn_resnet50_fpn_v2
from torchvision.models import convnext_tiny
# Use ImageNet-pretrained weights for a realistic pipeline without training
detector = maskrcnn_resnet50_fpn_v2(weights="DEFAULT")
classifier = convnext_tiny(weights="DEFAULT")
class_names = [f"imagenet_class_{i}" for i in range(1000)]
pipe = VisionPipeline(detector, classifier, class_names)
# Smoke test with a synthetic image
test_image = (np.random.rand(400, 600, 3) * 255).astype(np.uint8)
result = pipe.run(test_image, image_id="demo")
print(result.model_dump_json(indent=2)[:500])
Passo 4: Serviço FastAPI
from fastapi import FastAPI, UploadFile, HTTPException
from io import BytesIO
app = FastAPI()
pipe = None # initialised on startup
@app.on_event("startup")
def load():
global pipe
detector = maskrcnn_resnet50_fpn_v2(weights="DEFAULT").eval()
classifier = convnext_tiny(weights="DEFAULT").eval()
pipe = VisionPipeline(detector, classifier, class_names=[f"c{i}" for i in range(1000)])
@app.post("/detect")
async def detect_endpoint(file: UploadFile):
if file.content_type not in {"image/jpeg", "image/png", "image/webp"}:
raise HTTPException(status_code=400, detail="unsupported image type")
data = await file.read()
try:
img = Image.open(BytesIO(data)).convert("RGB")
except Exception:
raise HTTPException(status_code=400, detail="cannot decode image")
result = pipe.run(img, image_id=file.filename or "upload")
return result.model_dump()
Execute com uvicorn main:app --host 0.0.0.0 --port 8000. Teste com curl -F 'file=@dog.jpg' http://localhost:8000/detect.
Passo 5: Faça benchmark do pipeline
import time
def benchmark(pipe, num_runs=20, image_size=(400, 600)):
img = (np.random.rand(*image_size, 3) * 255).astype(np.uint8)
pipe.run(img) # warm up
stages = {"preprocess": [], "detect": [], "classify": [], "total": []}
for _ in range(num_runs):
t0 = time.perf_counter()
tensor = pipe.preprocess(img)
t1 = time.perf_counter()
det = pipe.detect(tensor)
t2 = time.perf_counter()
crops = []
for box in det["boxes"]:
x1, y1, x2, y2 = [max(0, int(b)) for b in box.tolist()]
x2 = min(x2, tensor.shape[-1])
y2 = min(y2, tensor.shape[-2])
if (x2 - x1) >= pipe.min_crop and (y2 - y1) >= pipe.min_crop:
crop = tensor[:, y1:y2, x1:x2]
crop = torch.nn.functional.interpolate(
crop.unsqueeze(0), size=(224, 224), mode="bilinear", align_corners=False
)[0]
crops.append(crop)
pipe.classify(crops)
t3 = time.perf_counter()
stages["preprocess"].append((t1 - t0) * 1000)
stages["detect"].append((t2 - t1) * 1000)
stages["classify"].append((t3 - t2) * 1000)
stages["total"].append((t3 - t0) * 1000)
for stage, times in stages.items():
times.sort()
print(f"{stage:12s} p50={times[len(times)//2]:7.1f} ms p95={times[int(len(times)*0.95)]:7.1f} ms")
Saída típica em CPU: preprocess ~3 ms, detect 300-500 ms, classify 20-40 ms, total 350-550 ms. Em GPU, detect fica em 20-40 ms e o preprocess + classify passam a importar mais em termos relativos.
Use
Templates de produção convergem para a mesma estrutura, mais:
- Versionamento de modelos — sempre registre no log o nome do modelo e o hash dos pesos na resposta.
- IDs de trace por requisição — registre no log o tempo de cada estágio para cada requisição, para que você possa correlacionar respostas lentas com estágios.
- Caminho de fallback — se o classificador estourar o tempo limite, retorne as detecções sem classificações em vez de falhar a requisição inteira.
- Filtros de segurança — filtros de NSFW / PII rodam após a classificação, antes de a resposta deixar o serviço.
- Endpoint em lote — um
/detect_batchque aceita uma lista de URLs de imagens para processamento em massa.
Para servir em produção, torchserve, Triton Inference Server e BentoML lidam com batching, versionamento, métricas e health checks de fábrica. Rodar FastAPI diretamente é adequado para protótipos e produtos de pequena escala.
Entregue
Esta lição produz:
outputs/prompt-vision-service-shape-reviewer.md— um prompt que revisa o código de um serviço de visão em busca de violações de contrato/formato de resposta e nomeia o primeiro bug que quebra o sistema.outputs/skill-pipeline-budget-planner.md— uma skill que, dadas uma latência e um throughput alvo, atribui um orçamento de tempo a cada estágio do pipeline e sinaliza qual estágio vai estourar seu orçamento primeiro.
Exercícios
- (Fácil) Execute o pipeline em 10 imagens de qualquer dataset aberto. Reporte o tempo médio por estágio e a distribuição da contagem de detecções por imagem.
- (Médio) Adicione um campo de saída de máscara ao
Detectione codifique-o como RLE. Verifique se o JSON permanece abaixo de 1MB mesmo para uma imagem com 10 objetos. - (Difícil) Adicione um micro-batcher na frente do classificador: colete recortes por até 10 ms, classifique todos em uma única chamada de GPU, retorne os resultados por requisição. Meça o ganho de throughput a 5 requisições concorrentes por segundo e a latência adicionada.
Termos-chave
| Termo | O que as pessoas dizem | O que de fato significa |
|---|---|---|
| Pipeline | "O sistema" | Uma cadeia ordenada de etapas de pré-processamento, inferência e pós-processamento com uma interface tipada entre cada par |
| Contrato de dados | "O schema" | Definições Pydantic / dataclass às quais a entrada e a saída de cada estágio se conformam; captura bugs de integração na fronteira |
| Pré-processamento | "Antes do modelo" | Decodificação, conversão de cor, redimensionamento, normalização; geralmente o maior consumidor de tempo de CPU |
| Pós-processamento | "Depois do modelo" | NMS, redimensionamento de máscara, threshold, codificação RLE; barato na GPU, caro na CPU |
| Microbatcher | "Coletar e então encaminhar" | Agregador que espera uma janela fixa por múltiplas requisições e executa um único forward pass em lote |
| Trace ID | "Id da requisição" | Identificador por requisição registrado em cada estágio, para que requisições lentas possam ser rastreadas de ponta a ponta |
| Código de falha | "Erro nomeado" | Código de erro específico por classe de falha em vez de um 500 genérico; habilita a lógica de retry do cliente |
| Health check | "Sonda de prontidão" | Endpoint barato que reporta se o serviço consegue responder; os balanceadores de carga dependem disso |
Leitura adicional
- Full Stack Deep Learning — Deploying Models — a visão geral canônica do deploy de ML em produção
- BentoML docs — framework de serving com batching, versionamento e métricas
- torchserve docs — a biblioteca oficial de serving do PyTorch
- NVIDIA Triton Inference Server — serving de alto throughput com batching e suporte a múltiplos modelos